An error occurred while loading the file. Please try again.
-
Ana Lucia de Moura authoredef41e735
local pbs = {}
local safer = require("safer")
local util = require("sga.util")
local exec = require("sga.exec")
local cmds = {
nodeinfo = "pbsnodes -a ",
qsub = "qsub ",
qstat = "qstat -fx ",
}
--- Type of the SGA, returned to the server during registration.
pbs.type = "cluster"
---
-- Execute a new command.
-- @param job The job object: job.data is a writable table for driver data.
-- @param cmd_string The command string
-- @return True if succeded or nil and an error message
function pbs.execute_command(self, job, cmd_string)
local script_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".script"
local out_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".out"
local err_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".err"
self.exec:write_file(script_filename, "#!/bin/sh\n"..cmd_string.."\n")
local pbsjid, stderr = self.exec:run(("%s -o %s -e %s %s"):format(cmds.qsub, out_filename, err_filename, script_filename))
if pbsjid ~= "" then
pbsjid = pbsjid:gsub("\n", "")
self.logger:debug("Submitted PBS job: "..pbsjid)
job.data.script_filename = script_filename
job.data.out_filename = out_filename
job.data.err_filename = err_filename
job.data.pbsjid = pbsjid
return true
else
local err = "Failed submitting job: "..stderr
return nil, err
end
end
--
-- Deletes resources (files) created for a command.
-- @param job The job object
function pbs.cleanup_job(self, job)
self.exec:remove(job.data.script_filename)
self.exec:remove(job.data.out_filename)
self.exec:remove(job.data.err_filename)
end
local function mini_xml(input, current_tag)
local out = {}
while true do
local close, tag = input:match("^<(/?)([^/>]+)>")
if close == "/" then
input = input:sub(#tag + 4)
if tag == current_tag then
return out, input
end
elseif tag then
out[tag], input = mini_xml(input:sub(#tag + 3), tag)
elseif current_tag then
local value = input:match("^([^<]+)")
if value then
return value:gsub("<", "<"):gsub(">", ">"):gsub("&", "&"), input:sub(#value + 1)
end
else
return out
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
end
end
end
local function qstat(self, job)
local jdata, stderr = self.exec:run(cmds.qstat.." "..job.data.pbsjid)
if not jdata or jdata == "" then
return nil, "Failed running job status command"..(stderr and " - "..stderr)
end
local data = mini_xml(jdata)
if not (data and data.Data) then
return nil, "Failed parsing job status data"
end
job.data.qstat_data = data.Data
return data.Data
end
local pbs_to_sga_state = {
C = "FINISHED",
E = "FINISHED",
R = "RUNNING",
Q = "WAITING",
W = "WAITING",
T = "WAITING",
H = "WAITING",
}
local function get_seconds(time)
local h, m, s = time:match("(%d+):(%d%d):(%d%d)")
if not h then return 0 end
return s + m * 60 + h * 3600
end
--
-- Checks if a command has finished.
-- @param job The job object
-- @return True plus command times if command has terminated, False if command is still running, or nil and a message
function pbs.is_command_done(self, job)
local data, err = qstat(self, job)
if err then
return nil, err
end
job.data.qstat_data = data
if pbs_to_sga_state[data.Job.job_state] == "FINISHED" then
local donetime = get_seconds(data.Job.resources_used.walltime)
return true, donetime, donetime, donetime -- FIXME detailed times
else
return false
end
end
local mem_fact = {b = 1, k = 1024, m = 1024^2, g = 1024^3, t = 1024^4}
local function get_mem(minfo)
local m, um = minfo:match("(%d+)(%a+)")
m = m and tonumber(m) or 0
um = um and um:sub(1, 1):lower() or "b"
local f = mem_fact[um]
if f then return m * f else return m end
end
pbs.actions = {
-- Gets a command current status
-- @param job The job object
-- @return A table with information for each command component (process)
status = function(self, job)
-- This function reuses the data periodically fetched by pbs.is_command_done.
local data, err = job.data.qstat_data
if not data then