An error occurred while loading the file. Please try again.
-
Isabella Almeida da Silva authoredaa072899
local pbs = {}
local safer = require("safer")
local util = require("sga.util")
local exec = require("sga.exec")
local cmds = {
nodeinfo = "pbsnodes -a ",
qsub = "qsub ",
qstat = "qstat -fx ",
}
--- Type of the SGA, returned to the server during registration.
pbs.type = "cluster"
---
-- Execute a new command.
-- @param job The job object: job.data is a writable table for driver data.
-- @param cmd_string The command string
-- @return True if succeded or nil and an error message
function pbs.execute_command(self, job, cmd_string)
local script_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".script"
local out_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".out"
local err_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".err"
for _, sandbox_path in ipairs(job.sandboxes) do
local ok, err = lfs.mkdir(sandbox_path)
if not ok then
local attr = lfs.attributes(sandbox_path)
if not (attr and attr.mode == "directory") then
return nil, "Failed creating job's sandbox "..sandbox_path
end
end
end
self.exec:write_file(script_filename, "#!/bin/sh\n"..cmd_string.."\n")
local pbsjid, stderr = self.exec:run(("%s -N %s -V -o %s -e %s %s"):format(cmds.qsub, job.cmd_id, out_filename, err_filename, script_filename))
if pbsjid ~= "" then
pbsjid = pbsjid:gsub("\n", "")
self.logger:debug("Submitted PBS job: "..pbsjid)
job.data.script_filename = script_filename
job.data.out_filename = out_filename
job.data.err_filename = err_filename
job.data.pbsjid = pbsjid
return true
else
local err = "Failed submitting job: "..stderr
return nil, err
end
end
--
-- Deletes resources (files) created for a command.
-- @param job The job object
function pbs.cleanup_job(self, job)
self.exec:remove(job.data.script_filename)
self.exec:remove(job.data.out_filename)
self.exec:remove(job.data.err_filename)
for _, sandbox_path in ipairs(job.sandboxes) do
local attr = lfs.attributes(sandbox_path)
if (attr and attr.mode == "directory") then
local ok, err = util.removedir(sandbox_path)
if not ok then
self.logger:error("Failed removing job's sandbox "..sandbox_path)
end
end
end
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
end
local function mini_xml(input, current_tag)
local out = {}
while true do
local close, tag = input:match("^<(/?)([^/>]+)>")
if close == "/" then
input = input:sub(#tag + 4)
if tag == current_tag then
return out, input
end
elseif tag then
out[tag], input = mini_xml(input:sub(#tag + 3), tag)
elseif current_tag then
local value = input:match("^([^<]+)")
if value then
return value:gsub("<", "<"):gsub(">", ">"):gsub("&", "&"), input:sub(#value + 1)
end
else
return out
end
end
end
local function qstat(self, job)
local jdata, stderr = self.exec:run(cmds.qstat.." "..job.data.pbsjid)
if not jdata or jdata == "" then
return nil, "Failed running job status command"..(stderr and " - "..stderr)
end
local data = mini_xml(jdata)
if not (data and data.Data) then
return nil, "Failed parsing job status data"
end
job.data.qstat_data = data.Data
return data.Data
end
local pbs_to_sga_state = {
C = "FINISHED",
E = "FINISHED",
R = "RUNNING",
Q = "WAITING",
W = "WAITING",
T = "WAITING",
H = "WAITING",
}
local function get_seconds(time)
local h, m, s = time:match("(%d+):(%d%d):(%d%d)")
if not h then return 0 end
return s + m * 60 + h * 3600
end
--
-- Checks if a command has finished.
-- @param job The job object
-- @return True plus command times if command has terminated, False if command is still running, or nil and a message
function pbs.is_command_done(self, job)
local data, err = qstat(self, job)
if err then
return nil, err
end
job.data.qstat_data = data
if pbs_to_sga_state[data.Job.job_state] == "FINISHED" then
local donetime = get_seconds(data.Job.resources_used.walltime)
return true, donetime, donetime, donetime -- FIXME detailed times
else
return false
end
141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
end
local mem_fact = {b = 1, k = 1024, m = 1024^2, g = 1024^3, t = 1024^4}
local function get_mem(minfo)
local m, um = minfo:match("(%d+)(%a+)")
m = m and tonumber(m) or 0
um = um and um:sub(1, 1):lower() or "b"
local f = mem_fact[um]
if f then return m * f else return m end
end
pbs.actions = {
-- Gets a command current status
-- @param job The job object
-- @return A table with information for each command component (process)
status = function(self, job)
-- This function reuses the data periodically fetched by pbs.is_command_done.
local data, err = job.data.qstat_data
if not data then
data, err = qstat(self, job)
if not data then
return nil, err
end
end
self.logger:debug(util.debug_table(data))
local state = pbs_to_sga_state[data.Job.job_state]
local walltime, mem, vmem, cput
if state == "FINISHED" then
walltime = get_seconds(data.Job.resources_used.walltime)
mem = get_mem(data.Job.resources_used.mem)
vmem = get_mem(data.Job.resources_used.vmem)
cput = get_seconds(data.Job.resources_used.cput)
else
walltime = self.exec:time() - (tonumber(data.Job.start_time) or self.exec:time())
mem, vmem, cput = 0, 0, 0
end
local processes = {
{
pid = data.Job.Job_Id:match("^(%d+)"),
ppid = 0,
exec_host = data.Job.exec_host,
string = job.cmd_string,
state = state,
processor_id = "",
memory_ram_size_mb = mem,
memory_swap_size_mb = 0,
cpu_perc = 0,
cpu_time_sec = cput,
wall_time_sec = walltime,
system_time_sec = 0,
user_time_sec = 0,
virtual_memory_size_mb = vmem,
bytes_in_kb = 0,
bytes_out_kb = 0,
disk_bytes_read_kb = 0,
disk_bytes_write_kb = 0,
}
}
return true, { processes = processes }
end,
}
local function get_node_attributes(ndata)
local attribs = {}
for attr, val in ndata:gmatch("(%S+)%s+=%s+([^\n]+)") do
if attr == "status" then
211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
for attr_st, val_st in val:gmatch("(%w+)=([^,]+)") do
-- do not overwrite attribute defined in higher level
attribs[attr_st] = attribs[attr_st] or val_st
end
else
attribs[attr] = val
end
end
return attribs
end
local function update_node(self, nname, ndata)
local data = self.nodes_data[nname] or {}
-- static information
data.clock_mhz = 0 -- FIXME do not return fake information!
local attribs = get_node_attributes(ndata)
local ncpus = attribs.ncpus and tonumber(attribs.ncpus) or 1
data.num_of_cpus = ncpus
local physmem = attribs.physmem and get_mem(attribs.physmem) or 0
data.ram_mb = physmem / (1024 * 1024)
local totmem = attribs.totmem and get_mem(attribs.totmem) or 0
local swap = (totmem - physmem)
data.swap_mb = swap / (1024 * 1024)
-- dynamic information
local availmem = attribs.availmem and get_mem(attribs.availmem) or 0
local totused = totmem - availmem
local uram, uswap
if totused <= physmem then
uram = totused
uswap = 0
else
uram = physmem
uswap = totused - physmem
end
data.ram_used_perc = uram * 100 / physmem
data.swap_used_perc = uswap * 100 / swap
local loadave = attribs.loadave and tonumber(attribs.loadave) or 0
data.load_avg_1min_perc = loadave * 100 / ncpus
data.load_avg_5min_perc = data.load_avg_1min_perc -- FIXME fake!
data.load_avg_15min_perc = data.load_avg_1min_perc -- FIXME fake!
-- FIXME: how to get number of jobs?
local jobs = attribs.jobs or ""
data.number_of_jobs = 0
self.nodes_data[nname] = data
end
---
-- Gets available execution nodes and their resources.
-- @return A table indexed by node name, each entry contains the node´s resources ("static" data)
function pbs.get_nodes(self)
local ninfo = self.exec:run(cmds.nodeinfo)
if not ninfo or ninfo == "" then
return nil, "Failed reading data after "..cmds.nodeinfo
end
ninfo = "\n"..ninfo -- for uniform matching of node names
ninfo = ninfo:gsub("\n(%S+)\n", "<%1>") -- <name>
for nname, data in ninfo:gmatch("<(%S+)>([^<]+)") do
nname = nname:gsub("%..*", "") -- remove domain, if exists
update_node(self, nname, data)
end
281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
-- for simplicity, also returns current status data
return self.nodes_data
end
---
-- Get nodes current status.
-- @return A table indexed by node name, each entry contains the node´s available resources ("monitoring" data)
function pbs.get_nodes_status(self)
for nname, _ in pairs(self.nodes_data) do
local cmd = cmds.nodeinfo..nname
local ninfo = self.exec:run(cmd)
if not ninfo then
self.logger:error("Failed reading data after "..cmd)
else
update_node(self, nname, ninfo)
end
end
-- for simplicity, also returns nodes resources
return self.nodes_data
end
--
-- Gets a local path attributes.
-- @param path_name The path filename
-- @return A table with the file attributes corresponding to path_name, or nil and a message
function pbs.check_path(self, path_name)
-- FIXME either check path remotely or assume that
-- the configuration is correct.
return util.check_local_path(path_name)
end
function pbs.new(config, logger)
local exec_driver, err = exec.init(config)
if err then
return nil, err
end
local self = safer.table {
config = config,
logger = logger,
nodes_data = {},
exec = exec_driver,
}
return self
end
return pbs