An error occurred while loading the file. Please try again.
-
Isabella Almeida da Silva authored
[SOMA-74][SOMA-398][WA] Erro no monitoramento de comando See merge request csbase/sgarest-daemon!6
1f51b666
local posix = {}
local signal = require("posix.signal")
local wait = require("posix.sys.wait")
local unistd = require("posix.unistd")
local stat = require("posix.sys.stat")
local procdata = require("procdata")
local safer = require("safer")
local util = require("sga.util")
local filemonitor = require("sga.filemonitor")
local function done_file(self, jid)
return self.config.runtime_data_dir.."/"..jid..".done"
end
local function start_file(self, jid)
return self.config.runtime_data_dir.."/"..jid..".start"
end
---
-- Execute a new command.
-- @param job The job object: job.data is a writable table for driver data.
-- @param cmd_string The command string
-- @return True if succeded or nil and an error message
function posix.execute_command(self, job, cmd_string)
self.active_commands = self.active_commands + 1
filemonitor.init(job)
for _, sandbox_path in ipairs(job.sandboxes) do
local ok, err = lfs.mkdir(sandbox_path)
if not ok then
local attr = lfs.attributes(sandbox_path)
if not (attr and attr.mode == "directory") then
return nil, "Failed creating job's sandbox "..sandbox_path
end
end
end
local pid, errmsg = unistd.fork()
if pid == nil then
return nil, "Failed forking subprocess"
end
if pid == 0 then
-- FIXME In the forked process, just dump errors to stdout
-- until we have a better procedure.
local function dump(err)
io.stderr:write(err.."\n")
end
for i = 3, 65535 do
unistd.close(i)
end
local ok, err = pcall(function()
signal.signal(signal.SIGHUP, signal.SIG_IGN)
pid = unistd.getpid()
local pinfo = procdata.get_process_info(pid)
assert(pinfo)
local ok, err = util.write_file(start_file(self, job.jid), pinfo.starttime)
if not ok then
dump(err)
os.exit(0) -- don't run a command we can't monitor
end
local start_time = os.time()
os.execute(cmd_string)
local walltime_s = os.difftime(os.time(), start_time)
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
ok, err = util.write_file(done_file(self, job.jid), walltime_s)
if not ok then
dump(err)
end
end)
if not ok then
dump(err)
end
os.exit(0)
else
job.data.pid = pid
self.logger:debug("Created PID "..pid.." for jid "..job.jid)
return true
end
end
local function is_command_alive(self, job)
local pid = job.data.pid
-- Check that it is the one we started: (detect pid rotation)
local pinfo, err = procdata.get_process_info(pid)
if pinfo then
local pidstart = util.read_file(start_file(self, job.jid))
-- if pidstart and tonumber(pidstart) == tonumber(pinfo.starttime) then
if pinfo.state == "Z" then
job.data.pinfo = pinfo
local term = wait.wait(pid, wait.WNOHANG)
if term == pid then
return false
end
end
return true
-- end
end
return false
end
local function collect_exec_data(self, job)
local script = job.parameters.csbase_command_path.."/collect_execution_data"
if not stat.stat(script) then
return pairs({})
end
return coroutine.wrap(function ()
local params = {
job.cmd_id,
job.sandboxes[1],
job.parameters.csbase_command_path,
job.parameters.csbase_command_output_path,
job.parameters.csbase_command_root_path
}
local collect_cmd = "ksh "..script.." "..table.concat(params," ")
self.logger:debug("Executing data collection script: "..collect_cmd)
local stdout = assert(io.popen(collect_cmd, 'r'))
for line in stdout:lines() do
for key, value in string.gmatch(line, "(.-)%s*=%s*([^,]+)") do
coroutine.yield(key, value)
end
end
stdout:close()
end)
-- local params = {
-- job.parameters.csbase_command_path,
-- job.cmd_id,
-- job.sandboxes[1],
-- job.parameters.csbase_command_output_path
-- }
-- local collect_cmd = "ksh "..script.." "..table.concat(params," ")
-- self.logger:debug("Executing data collection script: "..collect_cmd)
141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
-- local stdout = localhost:run(collect_cmd)
-- local tab = {}
-- for key, value in string.gmatch(stdout, "([%w%p]+)%s*=%s*([%w%p]+)") do
-- tab[key] = value
-- end
-- return pairs(tab)
end
--
-- Checks if a command has finished.
-- @param job The job object
-- @return True plus command times if command has terminated, False if command is still running, or nil and a message.
-- If times are nil the command was killed
function posix.is_command_done(self, job)
if job.data.killed then
for k,v in collect_exec_data(self, job) do
end
return true, nil
end
if is_command_alive(self, job) then
return false
else
local donetime, err = util.read_file(done_file(self, job.jid))
if not donetime then
return nil, err
end
self.active_commands = self.active_commands - 1
for k,v in collect_exec_data(self, job) do
end
return true, donetime, donetime, donetime --FIXME detailed times
end
end
--
-- Deletes resources (files) created for a command.
-- @param job The job object
function posix.cleanup_job(self, job)
os.remove(done_file(self, job.jid))
os.remove(start_file(self, job.jid))
for _, sandbox_path in ipairs(job.sandboxes) do
local ok, err = util.removedir(sandbox_path)
if not ok then
self.logger:error("Failed removing job's sandbox "..sandbox_path)
end
end
end
local function process_tree(pid)
return coroutine.wrap(function ()
local queue = { pid }
local processed = {}
while #queue > 0 do
local curpid = table.remove(queue, 1)
processed[curpid] = true
local children = procdata.get_children(curpid)
for _, child in ipairs(children) do
if not processed[child] then
table.insert(queue, child)
end
end
coroutine.yield(curpid)
end
end)
end
posix.actions = {
-- Terminates a command
211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
-- @param job The job object
terminate = function(self, job)
if is_command_alive(self, job) then
local pids = {}
for pid in process_tree(job.data.pid) do
table.insert(pids, 1, pid)
end
for _, pid in ipairs(pids) do
signal.kill(pid, signal.SIGTERM)
end
job.data.killed = true
return true
else
return false, "Job is not running"
end
end,
-- Gets a command current status
-- @param job The job object
-- @return A table with information for each command component (process)
status = function(self, job)
local processes = {}
self.logger:debug("Got a status request for command " .. job.jid .. " (pid " ..job.data.pid .. ")")
for pid in process_tree(job.data.pid) do
local pinfo = procdata.get_process_info(pid)
-- FIXME return more information, don't return fake information
processes[#processes + 1] = {
pid = pid,
ppid = pinfo.ppid,
exec_host = "",
string = pinfo.comm,
state = "RUNNING",
processor_id = "",
memory_ram_size_mb = pinfo.rss / (1024 * 1024),
memory_swap_size_mb = 0,
cpu_perc = 0,
cpu_time_sec = 0,
wall_time_sec = pinfo.walltime,
system_time_sec = pinfo.stime,
user_time_sec = pinfo.utime,
virtual_memory_size_mb = pinfo.vmsize / (1024 * 1024),
bytes_in_kb = 0,
bytes_out_kb = 0,
disk_bytes_read_kb = 0,
disk_bytes_write_kb = 0,
}
end
for k,v in collect_exec_data(self, job) do
processes[1][k] = v
-- copas.step(1)
end
for k,v in pairs(filemonitor.read_files(job)) do
processes[1][k] = v
-- copas.step(1)
end
return true, { processes = processes }
end,
}
--
-- Gets available execution nodes and their resources
-- @return A table indexed by node name, each entry contains the node's resources ("static" data)
function posix.get_nodes(self)
local mem, err = procdata.get_total_memory()
if not mem then
return nil, err
end
local ncpus, err = procdata.get_num_cpus()
281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
if not ncpus then
return nil, err
end
local clock, err = procdata.get_clock_speed()
if not clock then
return nil, err
end
return {
[self.config.sga_name] = {
num_of_cpus = ncpus,
ram_mb = mem.ram / (1024 * 1024),
swap_mb = mem.swap / (1024 * 1024),
clock_mhz = clock,
}
}
end
--
-- Get nodes current status
-- @return A table indexed by node name, each entry contains the node's available resources ("monitoring" data)
function posix.get_nodes_status(self)
local mem, err = procdata.get_total_memory()
if not mem then
self.logger:error(err)
return nil
end
local umem, err = procdata.get_used_memory()
if not umem then
self.logger:error(err)
return nil
end
local cpuload, err = procdata.get_cpu_load()
if not cpuload then
self.logger:error(err)
return nil
end
local ncpus, err = procdata.get_num_cpus()
if not ncpus then
self.logger:error(err)
return nil
end
local uram_perc = umem.ram * 100 / mem.ram
local uswap_perc = umem.swap * 100 / mem.swap
local l1_perc = cpuload.l1 * 100 / ncpus
local l5_perc = cpuload.l5 * 100 / ncpus
local l15_perc = cpuload.l15 * 100 / ncpus
local result = {
[self.config.sga_name] = {
ram_used_perc = uram_perc,
swap_used_perc = uswap_perc,
load_avg_1min_perc = l1_perc,
load_avg_5min_perc = l5_perc,
load_avg_15min_perc = l15_perc,
number_of_jobs = self.active_commands,
}
}
return result
end
--
-- Gets a local path attributes
-- @param path_name The path filename
-- @return A table with the file attributes corresponding to path_name, or nil and a message
function posix.check_path(self, path_name)
return util.check_local_path(path_name)
end
function posix.new(config, logger)
local self = safer.table {
config = config,
351352353354355356357358359
logger = logger,
active_commands = 0,
}
return self
end
return posix