pbs.lua 9.13 KiB
local pbs = {}
local safer = require("safer")
local util = require("sga.util")
local exec = require("sga.exec")
local cmds = {
   nodeinfo = "pbsnodes -a ",
   qsub = "qsub ",
   qstat = "qstat -fx ",
--- Type of the SGA, returned to the server during registration.
pbs.type = "cluster"
---
-- Execute a new command.
-- @param job The job object: job.data is a writable table for driver data.
-- @param cmd_string The command string
-- @return True if succeded or nil and an error message
function pbs.execute_command(self, job, cmd_string)
   local script_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".script"
   local out_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".out"
   local err_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".err"
   self.exec:write_file(script_filename, "#!/bin/sh\n"..cmd_string.."\n")
   local pbsjid, stderr = self.exec:run(("%s -o %s -e %s %s"):format(cmds.qsub, out_filename, err_filename, script_filename))
   if pbsjid ~= "" then
      pbsjid = pbsjid:gsub("\n", "")
      self.logger:debug("Submitted PBS job: "..pbsjid)
      job.data.script_filename = script_filename
      job.data.out_filename = out_filename
      job.data.err_filename = err_filename
      job.data.pbsjid = pbsjid
      return true
   else
      local err = "Failed submitting job: "..stderr
      return nil, err
   end
end
-- Deletes resources (files) created for a command.
-- @param job The job object
function pbs.cleanup_job(self, job)
   self.exec:remove(job.data.script_filename)
   self.exec:remove(job.data.out_filename)
   self.exec:remove(job.data.err_filename)
end
local function mini_xml(input, current_tag)
   local out = {}
   while true do
      local close, tag = input:match("^<(/?)([^/>]+)>")
      if close == "/" then
         input = input:sub(#tag + 4)
         if tag == current_tag then
            return out, input
         end
      elseif tag then
         out[tag], input = mini_xml(input:sub(#tag + 3), tag)
      elseif current_tag then
         local value = input:match("^([^<]+)")
         if value then
            return value:gsub("&lt;", "<"):gsub("&gt;", ">"):gsub("&amp;", "&"), input:sub(#value + 1)
         end
      else
         return out
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
end end end local function qstat(self, job) local jdata, stderr = self.exec:run(cmds.qstat.." "..job.data.pbsjid) if not jdata or jdata == "" then return nil, "Failed running job status command"..(stderr and " - "..stderr) end local data = mini_xml(jdata) if not (data and data.Data) then return nil, "Failed parsing job status data" end job.data.qstat_data = data.Data return data.Data end local pbs_to_sga_state = { C = "FINISHED", E = "FINISHED", R = "RUNNING", Q = "WAITING", W = "WAITING", T = "WAITING", H = "WAITING", } local function get_seconds(time) local h, m, s = time:match("(%d+):(%d%d):(%d%d)") if not h then return 0 end return s + m * 60 + h * 3600 end -- -- Checks if a command has finished. -- @param job The job object -- @return True plus command times if command has terminated, False if command is still running, or nil and a message function pbs.is_command_done(self, job) local data, err = qstat(self, job) if err then return nil, err end job.data.qstat_data = data if pbs_to_sga_state[data.Job.job_state] == "FINISHED" then local donetime = get_seconds(data.Job.resources_used.walltime) return true, donetime, donetime, donetime -- FIXME detailed times else return false end end local mem_fact = {b = 1, k = 1024, m = 1024^2, g = 1024^3, t = 1024^4} local function get_mem(minfo) local m, um = minfo:match("(%d+)(%a+)") m = m and tonumber(m) or 0 um = um and um:sub(1, 1):lower() or "b" local f = mem_fact[um] if f then return m * f else return m end end pbs.actions = { -- Gets a command current status -- @param job The job object -- @return A table with information for each command component (process) status = function(self, job) -- This function reuses the data periodically fetched by pbs.is_command_done. local data, err = job.data.qstat_data if not data then