posix.lua 10.28 KiB
local posix = {}
local signal = require("posix.signal")
local wait = require("posix.sys.wait")
local unistd = require("posix.unistd")
local stat = require("posix.sys.stat")
local procdata = require("procdata")
local safer = require("safer")
local util = require("sga.util")
local filemonitor = require("sga.filemonitor")
local function done_file(self, jid)
   return self.config.runtime_data_dir.."/"..jid..".done"
end
local function start_file(self, jid)
   return self.config.runtime_data_dir.."/"..jid..".start"
end
---
-- Execute a new command.
-- @param job The job object: job.data is a writable table for driver data.
-- @param cmd_string The command string
-- @return True if succeded or nil and an error message
function posix.execute_command(self, job, cmd_string)
   self.active_commands = self.active_commands + 1
   filemonitor.init(job)
   for _, sandbox_path in ipairs(job.sandboxes) do
      local ok, err = lfs.mkdir(sandbox_path)
      if not ok then
         local attr = lfs.attributes(sandbox_path)
         if not (attr and attr.mode == "directory") then
            return nil, "Failed creating job's sandbox "..sandbox_path
         end
      end
   end
   local pid, errmsg = unistd.fork()
   if pid == nil then
      return nil, "Failed forking subprocess"
   end
   if pid == 0 then
      -- FIXME In the forked process, just dump errors to stdout
      -- until we have a better procedure.
      local function dump(err)
         io.stderr:write(err.."\n")
      end
      for i = 3, 65535 do
         unistd.close(i)
      end
      local ok, err = pcall(function()
         signal.signal(signal.SIGHUP, signal.SIG_IGN)
         pid = unistd.getpid()
         local pinfo = procdata.get_process_info(pid)
         assert(pinfo)
         local ok, err = util.write_file(start_file(self, job.jid), pinfo.starttime)
         if not ok then
            dump(err)
            os.exit(0) -- don't run a command we can't monitor
         end
         local start_time = os.time()
         os.execute(cmd_string)
         local walltime_s = os.difftime(os.time(), start_time)
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
ok, err = util.write_file(done_file(self, job.jid), walltime_s) if not ok then dump(err) end end) if not ok then dump(err) end os.exit(0) else job.data.pid = pid self.logger:debug("Created PID "..pid.." for jid "..job.jid) return true end end local function is_command_alive(self, job) local pid = job.data.pid -- Check that it is the one we started: (detect pid rotation) local pinfo, err = procdata.get_process_info(pid) if pinfo then local pidstart = util.read_file(start_file(self, job.jid)) -- if pidstart and tonumber(pidstart) == tonumber(pinfo.starttime) then if pinfo.state == "Z" then job.data.pinfo = pinfo local term = wait.wait(pid, wait.WNOHANG) if term == pid then return false end end return true -- end end return false end local function collect_exec_data(self, job) local script = job.parameters.csbase_command_path.."/collect_execution_data" if not stat.stat(script) then return pairs({}) end return coroutine.wrap(function () local params = { job.cmd_id, job.sandboxes[1], job.parameters.csbase_command_path, job.parameters.csbase_command_output_path, job.parameters.csbase_command_root_path } local collect_cmd = "ksh "..script.." "..table.concat(params," ") self.logger:debug("Executing data collection script: "..collect_cmd) local stdout = assert(io.popen(collect_cmd, 'r')) for line in stdout:lines() do for key, value in string.gmatch(line, "(.-)%s*=%s*([^,]+)") do coroutine.yield(key, value) end end stdout:close() end) -- local params = { -- job.parameters.csbase_command_path, -- job.cmd_id, -- job.sandboxes[1], -- job.parameters.csbase_command_output_path -- } -- local collect_cmd = "ksh "..script.." "..table.concat(params," ") -- self.logger:debug("Executing data collection script: "..collect_cmd)
141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210
-- local stdout = localhost:run(collect_cmd) -- local tab = {} -- for key, value in string.gmatch(stdout, "([%w%p]+)%s*=%s*([%w%p]+)") do -- tab[key] = value -- end -- return pairs(tab) end -- -- Checks if a command has finished. -- @param job The job object -- @return True plus command times if command has terminated, False if command is still running, or nil and a message. -- If times are nil the command was killed function posix.is_command_done(self, job) if job.data.killed then for k,v in collect_exec_data(self, job) do end return true, nil end if is_command_alive(self, job) then return false else local donetime, err = util.read_file(done_file(self, job.jid)) if not donetime then return nil, err end self.active_commands = self.active_commands - 1 for k,v in collect_exec_data(self, job) do end return true, donetime, donetime, donetime --FIXME detailed times end end -- -- Deletes resources (files) created for a command. -- @param job The job object function posix.cleanup_job(self, job) os.remove(done_file(self, job.jid)) os.remove(start_file(self, job.jid)) for _, sandbox_path in ipairs(job.sandboxes) do local ok, err = util.removedir(sandbox_path) if not ok then self.logger:error("Failed removing job's sandbox "..sandbox_path) end end end local function process_tree(pid) return coroutine.wrap(function () local queue = { pid } local processed = {} while #queue > 0 do local curpid = table.remove(queue, 1) processed[curpid] = true local children = procdata.get_children(curpid) for _, child in ipairs(children) do if not processed[child] then table.insert(queue, child) end end coroutine.yield(curpid) end end) end posix.actions = { -- Terminates a command
211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
-- @param job The job object terminate = function(self, job) if is_command_alive(self, job) then local pids = {} for pid in process_tree(job.data.pid) do table.insert(pids, 1, pid) end for _, pid in ipairs(pids) do signal.kill(pid, signal.SIGTERM) end job.data.killed = true return true else return false, "Job is not running" end end, -- Gets a command current status -- @param job The job object -- @return A table with information for each command component (process) status = function(self, job) local processes = {} self.logger:debug("Got a status request for command " .. job.jid .. " (pid " ..job.data.pid .. ")") for pid in process_tree(job.data.pid) do local pinfo = procdata.get_process_info(pid) -- FIXME return more information, don't return fake information processes[#processes + 1] = { pid = pid, ppid = pinfo.ppid, exec_host = "", string = pinfo.comm, state = "RUNNING", processor_id = "", memory_ram_size_mb = pinfo.rss / (1024 * 1024), memory_swap_size_mb = 0, cpu_perc = 0, cpu_time_sec = 0, wall_time_sec = pinfo.walltime, system_time_sec = pinfo.stime, user_time_sec = pinfo.utime, virtual_memory_size_mb = pinfo.vmsize / (1024 * 1024), bytes_in_kb = 0, bytes_out_kb = 0, disk_bytes_read_kb = 0, disk_bytes_write_kb = 0, } end for k,v in collect_exec_data(self, job) do processes[1][k] = v -- copas.step(1) end for k,v in pairs(filemonitor.read_files(job)) do processes[1][k] = v -- copas.step(1) end return true, { processes = processes } end, } -- -- Gets available execution nodes and their resources -- @return A table indexed by node name, each entry contains the node's resources ("static" data) function posix.get_nodes(self) local mem, err = procdata.get_total_memory() if not mem then return nil, err end local ncpus, err = procdata.get_num_cpus()
281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
if not ncpus then return nil, err end local clock, err = procdata.get_clock_speed() if not clock then return nil, err end return { [self.config.sga_name] = { num_of_cpus = ncpus, ram_mb = mem.ram / (1024 * 1024), swap_mb = mem.swap / (1024 * 1024), clock_mhz = clock, } } end -- -- Get nodes current status -- @return A table indexed by node name, each entry contains the node's available resources ("monitoring" data) function posix.get_nodes_status(self) local mem, err = procdata.get_total_memory() if not mem then self.logger:error(err) return nil end local umem, err = procdata.get_used_memory() if not umem then self.logger:error(err) return nil end local cpuload, err = procdata.get_cpu_load() if not cpuload then self.logger:error(err) return nil end local ncpus, err = procdata.get_num_cpus() if not ncpus then self.logger:error(err) return nil end local uram_perc = umem.ram * 100 / mem.ram local uswap_perc = umem.swap * 100 / mem.swap local l1_perc = cpuload.l1 * 100 / ncpus local l5_perc = cpuload.l5 * 100 / ncpus local l15_perc = cpuload.l15 * 100 / ncpus local result = { [self.config.sga_name] = { ram_used_perc = uram_perc, swap_used_perc = uswap_perc, load_avg_1min_perc = l1_perc, load_avg_5min_perc = l5_perc, load_avg_15min_perc = l15_perc, number_of_jobs = self.active_commands, } } return result end -- -- Gets a local path attributes -- @param path_name The path filename -- @return A table with the file attributes corresponding to path_name, or nil and a message function posix.check_path(self, path_name) return util.check_local_path(path_name) end function posix.new(config, logger) local self = safer.table { config = config,
351352353354355356357358359
logger = logger, active_commands = 0, } return self end return posix