application.lua 5.49 KiB
local application = {}
local safer = require("safer")
local client = require("sga.client")
local server = require("sga.server")
local logging = require("logging")
local copas = require("copas")
local lfs = require("lfs")
local schema = require("schema")
local function bold(text)
   return "\27[1m"..text.."\27[0m"
end
local function color(level)
   if level == "ERROR" then
      return "\27[1;31m"..level.."\27[0m"
   elseif level == "WARNING" then
      return "\27[1;33m"..level.."\27[0m"
   elseif level == "DEBUG" then
      return "\27[0;35m"..level.."\27[0m"
   elseif level == "INFO" then
      return "\27[0;36m"..level.."\27[0m"
   else
      return level
   end
end
local function make_logger(ident)
   return safer.readonly(logging.new(function(_, level, message)
      io.stderr:write(bold(os.date()).." ["..color(level).."] "..bold(ident)..": "..message.."\n") 
      return true
   end))
end
--[[
   Application architecture:
                  ┌──────────┐
         ┌───────◇│  client  │◇───────┐
         │        └────┬─────┘        │
         │             │              │
         │             ◇              │
   ┌─────┴────┐   ┌──────────┐   ┌────┴─────┐
   │  config  ├──◇│  server  │◇──┤  logger  │
   └─────┬────┘   └──────────┘   └────┬─────┘
         │             ◇   ◆          │
         │             │   │          │
         │             │ ┌─┴────────┐ │
         │             │ │ job list │ │
         │             │ └──────────┘ │
         │        ┌────┴─────┐        │
         └───────◇│  driver  │◇───────┘
                  └──────────┘
--]]
function application.new(config)
   local ok, err = lfs.mkdir(config.runtime_data_dir)
   if not ok then
      local attr = lfs.attributes(config.runtime_data_dir)
      if not (attr and attr.mode == "directory") then
         return nil, "Failed creating runtime data dir "..config.runtime_data_dir
      end
   end
   local client_object = client.new(config, make_logger("sga.client"))
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
local pok, driver_module = pcall(require, config.driver) if not pok then return nil, "Failed loading SGA driver '"..tostring(config.driver).."': "..driver_module end local driver_schema = schema.Record { actions = schema.Map(schema.String, schema.Function), execute_command = schema.Function, get_nodes = schema.Function, get_nodes_status = schema.Function, is_command_done = schema.Function, cleanup_job = schema.Function, check_path = schema.Function, new = schema.Function, type = schema.Optional(schema.String), } local driver_err = schema.CheckSchema(driver_module, driver_schema) if driver_err then return nil, "Error in driver API: "..tostring(driver_err) end local logger = make_logger("sga.application") local orig_driver pok, orig_driver, err = pcall(driver_module.new, config, make_logger(config.driver)) if not pok then return nil, "Error initializing driver: "..orig_driver end if not orig_driver then return nil, "Failed initializing driver: "..tostring(err) end local driver = setmetatable({}, { __index = function(_, k) return orig_driver[k] or driver_module[k] end, __newindex = function(_, k, v) orig_driver[k] = v end, }) local self = { config = config, logger = logger, client = client_object, driver = driver, server = server.new(config, make_logger("sga.server"), client_object, driver), } self = safer.readonly(setmetatable(self, { __index = application })) return self end function application.run(self) local nodes, n_err = self.driver:get_nodes() if not nodes then return nil, "Failed getting node configuration: " .. tostring(n_err) end local sga_type = self.config.sga_type or self.driver.type or "machine" local reg_ok, reg_err = self.client:register(sga_type, nodes, self.server:get_persisted_jobs()) if not reg_ok then return nil, "Failed registering: " .. tostring(reg_err) end local function loop_operation(msg, interval, fn) return copas.addthread(function() while not self.server.down do if self.client.registered then self.logger:debug(msg) local ok, err = fn()
141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
if not ok then ok, err = self.client:register(sga_type, self.driver:get_nodes(), self.server:get_persisted_jobs()) if not ok then self.logger:error("Failed registering: " .. err) end collectgarbage() end copas.sleep(interval) if self.server.down then break end else copas.sleep(1) end end end) end loop_operation("Heartbeat", tonumber(self.client.actions["heartbeat"].interval_s), function() return self.client:heartbeat() end) loop_operation("Status update", self.config.status_interval_s, function() return self.client:status(self.driver:get_nodes_status()) end) self.server:start() return true end return application