pbs.lua 9.80 KiB
local pbs = {}
local safer = require("safer")
local util = require("sga.util")
local exec = require("sga.exec")
local cmds = {
   nodeinfo = "pbsnodes -a ",
   qsub = "qsub ",
   qstat = "qstat -fx ",
--- Type of the SGA, returned to the server during registration.
pbs.type = "cluster"
---
-- Execute a new command.
-- @param job The job object: job.data is a writable table for driver data.
-- @param cmd_string The command string
-- @return True if succeded or nil and an error message
function pbs.execute_command(self, job, cmd_string)
   local script_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".script"
   local out_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".out"
   local err_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".err"
   for _, sandbox_path in ipairs(job.sandboxes) do
      local ok, err = lfs.mkdir(sandbox_path)
      if not ok then
         local attr = lfs.attributes(sandbox_path)
         if not (attr and attr.mode == "directory") then
            return nil, "Failed creating job's sandbox "..sandbox_path
         end
      end
   end
   self.exec:write_file(script_filename, "#!/bin/sh\n"..cmd_string.."\n")
   local pbsjid, stderr = self.exec:run(("%s -N %s -V -o %s -e %s %s"):format(cmds.qsub, job.cmd_id, out_filename, err_filename, script_filename))  
   if pbsjid ~= "" then
      pbsjid = pbsjid:gsub("\n", "")
      self.logger:debug("Submitted PBS job: "..pbsjid)
      job.data.script_filename = script_filename
      job.data.out_filename = out_filename
      job.data.err_filename = err_filename
      job.data.pbsjid = pbsjid
      return true
   else
      local err = "Failed submitting job: "..stderr
      return nil, err
   end
end
-- Deletes resources (files) created for a command.
-- @param job The job object
function pbs.cleanup_job(self, job)
   self.exec:remove(job.data.script_filename)
   self.exec:remove(job.data.out_filename)
   self.exec:remove(job.data.err_filename)
   for _, sandbox_path in ipairs(job.sandboxes) do
      local attr = lfs.attributes(sandbox_path)
      if (attr and attr.mode == "directory") then
         local ok, err = util.removedir(sandbox_path)
         if not ok then
            self.logger:error("Failed removing job's sandbox "..sandbox_path)
         end
      end
   end
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
end local function mini_xml(input, current_tag) local out = {} while true do local close, tag = input:match("^<(/?)([^/>]+)>") if close == "/" then input = input:sub(#tag + 4) if tag == current_tag then return out, input end elseif tag then out[tag], input = mini_xml(input:sub(#tag + 3), tag) elseif current_tag then local value = input:match("^([^<]+)") if value then return value:gsub("&lt;", "<"):gsub("&gt;", ">"):gsub("&amp;", "&"), input:sub(#value + 1) end else return out end end end local function qstat(self, job) local jdata, stderr = self.exec:run(cmds.qstat.." "..job.data.pbsjid) if not jdata or jdata == "" then return nil, "Failed running job status command"..(stderr and " - "..stderr) end local data = mini_xml(jdata) if not (data and data.Data) then return nil, "Failed parsing job status data" end job.data.qstat_data = data.Data return data.Data end local pbs_to_sga_state = { C = "FINISHED", E = "FINISHED", R = "RUNNING", Q = "WAITING", W = "WAITING", T = "WAITING", H = "WAITING", } local function get_seconds(time) local h, m, s = time:match("(%d+):(%d%d):(%d%d)") if not h then return 0 end return s + m * 60 + h * 3600 end -- -- Checks if a command has finished. -- @param job The job object -- @return True plus command times if command has terminated, False if command is still running, or nil and a message function pbs.is_command_done(self, job) local data, err = qstat(self, job) if err then return nil, err end job.data.qstat_data = data if pbs_to_sga_state[data.Job.job_state] == "FINISHED" then local donetime = get_seconds(data.Job.resources_used.walltime) return true, donetime, donetime, donetime -- FIXME detailed times else return false end
141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
end local mem_fact = {b = 1, k = 1024, m = 1024^2, g = 1024^3, t = 1024^4} local function get_mem(minfo) local m, um = minfo:match("(%d+)(%a+)") m = m and tonumber(m) or 0 um = um and um:sub(1, 1):lower() or "b" local f = mem_fact[um] if f then return m * f else return m end end pbs.actions = { -- Gets a command current status -- @param job The job object -- @return A table with information for each command component (process) status = function(self, job) -- This function reuses the data periodically fetched by pbs.is_command_done. local data, err = job.data.qstat_data if not data then data, err = qstat(self, job) if not data then return nil, err end end self.logger:debug(util.debug_table(data)) local state = pbs_to_sga_state[data.Job.job_state] local walltime, mem, vmem, cput if state == "FINISHED" then walltime = get_seconds(data.Job.resources_used.walltime) mem = get_mem(data.Job.resources_used.mem) vmem = get_mem(data.Job.resources_used.vmem) cput = get_seconds(data.Job.resources_used.cput) else walltime = self.exec:time() - (tonumber(data.Job.start_time) or self.exec:time()) mem, vmem, cput = 0, 0, 0 end local processes = { { pid = data.Job.Job_Id:match("^(%d+)"), ppid = 0, exec_host = data.Job.exec_host, string = job.cmd_string, state = state, processor_id = "", memory_ram_size_mb = mem, memory_swap_size_mb = 0, cpu_perc = 0, cpu_time_sec = cput, wall_time_sec = walltime, system_time_sec = 0, user_time_sec = 0, virtual_memory_size_mb = vmem, bytes_in_kb = 0, bytes_out_kb = 0, disk_bytes_read_kb = 0, disk_bytes_write_kb = 0, } } return true, { processes = processes } end, } local function get_node_attributes(ndata) local attribs = {} for attr, val in ndata:gmatch("(%S+)%s+=%s+([^\n]+)") do if attr == "status" then
211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
for attr_st, val_st in val:gmatch("(%w+)=([^,]+)") do -- do not overwrite attribute defined in higher level attribs[attr_st] = attribs[attr_st] or val_st end else attribs[attr] = val end end return attribs end local function update_node(self, nname, ndata) local data = self.nodes_data[nname] or {} -- static information data.clock_mhz = 0 -- FIXME do not return fake information! local attribs = get_node_attributes(ndata) local ncpus = attribs.ncpus and tonumber(attribs.ncpus) or 1 data.num_of_cpus = ncpus local physmem = attribs.physmem and get_mem(attribs.physmem) or 0 data.ram_mb = physmem / (1024 * 1024) local totmem = attribs.totmem and get_mem(attribs.totmem) or 0 local swap = (totmem - physmem) data.swap_mb = swap / (1024 * 1024) -- dynamic information local availmem = attribs.availmem and get_mem(attribs.availmem) or 0 local totused = totmem - availmem local uram, uswap if totused <= physmem then uram = totused uswap = 0 else uram = physmem uswap = totused - physmem end data.ram_used_perc = uram * 100 / physmem data.swap_used_perc = uswap * 100 / swap local loadave = attribs.loadave and tonumber(attribs.loadave) or 0 data.load_avg_1min_perc = loadave * 100 / ncpus data.load_avg_5min_perc = data.load_avg_1min_perc -- FIXME fake! data.load_avg_15min_perc = data.load_avg_1min_perc -- FIXME fake! -- FIXME: how to get number of jobs? local jobs = attribs.jobs or "" data.number_of_jobs = 0 self.nodes_data[nname] = data end --- -- Gets available execution nodes and their resources. -- @return A table indexed by node name, each entry contains the node´s resources ("static" data) function pbs.get_nodes(self) local ninfo = self.exec:run(cmds.nodeinfo) if not ninfo or ninfo == "" then return nil, "Failed reading data after "..cmds.nodeinfo end ninfo = "\n"..ninfo -- for uniform matching of node names ninfo = ninfo:gsub("\n(%S+)\n", "<%1>") -- <name> for nname, data in ninfo:gmatch("<(%S+)>([^<]+)") do nname = nname:gsub("%..*", "") -- remove domain, if exists update_node(self, nname, data) end
281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
-- for simplicity, also returns current status data return self.nodes_data end --- -- Get nodes current status. -- @return A table indexed by node name, each entry contains the node´s available resources ("monitoring" data) function pbs.get_nodes_status(self) for nname, _ in pairs(self.nodes_data) do local cmd = cmds.nodeinfo..nname local ninfo = self.exec:run(cmd) if not ninfo then self.logger:error("Failed reading data after "..cmd) else update_node(self, nname, ninfo) end end -- for simplicity, also returns nodes resources return self.nodes_data end -- -- Gets a local path attributes. -- @param path_name The path filename -- @return A table with the file attributes corresponding to path_name, or nil and a message function pbs.check_path(self, path_name) -- FIXME either check path remotely or assume that -- the configuration is correct. return util.check_local_path(path_name) end function pbs.new(config, logger) local exec_driver, err = exec.init(config) if err then return nil, err end local self = safer.table { config = config, logger = logger, nodes_data = {}, exec = exec_driver, } return self end return pbs