An error occurred while loading the file. Please try again.
-
Isabella Almeida da Silva authored
- Usado o mesmo código de criação das sandboxes do driver POSIX
9d174128
local pbs = {}
local safer = require("safer")
local util = require("sga.util")
local exec = require("sga.exec")
local cmds = {
nodeinfo = "pbsnodes -a ",
qsub = "qsub ",
qstat = "qstat -fx ",
}
--- Type of the SGA, returned to the server during registration.
pbs.type = "cluster"
---
-- Execute a new command.
-- @param job The job object: job.data is a writable table for driver data.
-- @param cmd_string The command string
-- @return True if succeded or nil and an error message
function pbs.execute_command(self, job, cmd_string)
local script_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".script"
local out_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".out"
local err_filename = self.config.runtime_data_dir.."/qsub_"..job.jid..".err"
for _, sandbox_path in ipairs(job.sandboxes) do
local ok, err = lfs.mkdir(sandbox_path)
if not ok then
local attr = lfs.attributes(sandbox_path)
if not (attr and attr.mode == "directory") then
return nil, "Failed creating job's sandbox "..sandbox_path
end
end
end
self.exec:write_file(script_filename, "#!/bin/sh\n"..cmd_string.."\n")
local pbsjid, stderr = self.exec:run(("%s -o %s -e %s %s"):format(cmds.qsub, out_filename, err_filename, script_filename))
if pbsjid ~= "" then
pbsjid = pbsjid:gsub("\n", "")
self.logger:debug("Submitted PBS job: "..pbsjid)
job.data.script_filename = script_filename
job.data.out_filename = out_filename
job.data.err_filename = err_filename
job.data.pbsjid = pbsjid
return true
else
local err = "Failed submitting job: "..stderr
return nil, err
end
end
--
-- Deletes resources (files) created for a command.
-- @param job The job object
function pbs.cleanup_job(self, job)
self.exec:remove(job.data.script_filename)
self.exec:remove(job.data.out_filename)
self.exec:remove(job.data.err_filename)
for _, sandbox_path in ipairs(job.sandboxes) do
local ok, err = lfs.rmdir(sandbox_path)
if not ok then
self.logger:error("Failed removing job's sandbox "..sandbox_path)
end
end
end
local function mini_xml(input, current_tag)
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
local out = {}
while true do
local close, tag = input:match("^<(/?)([^/>]+)>")
if close == "/" then
input = input:sub(#tag + 4)
if tag == current_tag then
return out, input
end
elseif tag then
out[tag], input = mini_xml(input:sub(#tag + 3), tag)
elseif current_tag then
local value = input:match("^([^<]+)")
if value then
return value:gsub("<", "<"):gsub(">", ">"):gsub("&", "&"), input:sub(#value + 1)
end
else
return out
end
end
end
local function qstat(self, job)
local jdata, stderr = self.exec:run(cmds.qstat.." "..job.data.pbsjid)
if not jdata or jdata == "" then
return nil, "Failed running job status command"..(stderr and " - "..stderr)
end
local data = mini_xml(jdata)
if not (data and data.Data) then
return nil, "Failed parsing job status data"
end
job.data.qstat_data = data.Data
return data.Data
end
local pbs_to_sga_state = {
C = "FINISHED",
E = "FINISHED",
R = "RUNNING",
Q = "WAITING",
W = "WAITING",
T = "WAITING",
H = "WAITING",
}
local function get_seconds(time)
local h, m, s = time:match("(%d+):(%d%d):(%d%d)")
if not h then return 0 end
return s + m * 60 + h * 3600
end
--
-- Checks if a command has finished.
-- @param job The job object
-- @return True plus command times if command has terminated, False if command is still running, or nil and a message
function pbs.is_command_done(self, job)
local data, err = qstat(self, job)
if err then
return nil, err
end
job.data.qstat_data = data
if pbs_to_sga_state[data.Job.job_state] == "FINISHED" then
local donetime = get_seconds(data.Job.resources_used.walltime)
return true, donetime, donetime, donetime -- FIXME detailed times
else
return false
end
end
local mem_fact = {b = 1, k = 1024, m = 1024^2, g = 1024^3, t = 1024^4}
141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
local function get_mem(minfo)
local m, um = minfo:match("(%d+)(%a+)")
m = m and tonumber(m) or 0
um = um and um:sub(1, 1):lower() or "b"
local f = mem_fact[um]
if f then return m * f else return m end
end
pbs.actions = {
-- Gets a command current status
-- @param job The job object
-- @return A table with information for each command component (process)
status = function(self, job)
-- This function reuses the data periodically fetched by pbs.is_command_done.
local data, err = job.data.qstat_data
if not data then
data, err = qstat(self, job)
if not data then
return nil, err
end
end
self.logger:debug(util.debug_table(data))
local state = pbs_to_sga_state[data.Job.job_state]
local walltime, mem, vmem, cput
if state == "FINISHED" then
walltime = get_seconds(data.Job.resources_used.walltime)
mem = get_mem(data.Job.resources_used.mem)
vmem = get_mem(data.Job.resources_used.vmem)
cput = get_seconds(data.Job.resources_used.cput)
else
walltime = self.exec:time() - (tonumber(data.Job.start_time) or self.exec:time())
mem, vmem, cput = 0, 0, 0
end
local processes = {
{
pid = data.Job.Job_Id:match("^(%d+)"),
ppid = 0,
exec_host = data.Job.exec_host,
string = job.cmd_string,
state = state,
processor_id = "",
memory_ram_size_mb = mem,
memory_swap_size_mb = 0,
cpu_perc = 0,
cpu_time_sec = cput,
wall_time_sec = walltime,
system_time_sec = 0,
user_time_sec = 0,
virtual_memory_size_mb = vmem,
bytes_in_kb = 0,
bytes_out_kb = 0,
disk_bytes_read_kb = 0,
disk_bytes_write_kb = 0,
}
}
return true, { processes = processes }
end,
}
local function get_node_attributes(ndata)
local attribs = {}
for attr, val in ndata:gmatch("(%S+)%s+=%s+([^\n]+)") do
if attr == "status" then
for attr_st, val_st in val:gmatch("(%w+)=([^,]+)") do
-- do not overwrite attribute defined in higher level
attribs[attr_st] = attribs[attr_st] or val_st