An error occurred while loading the file. Please try again.
-
Hisham authored2511a987
local application = {}
local safer = require("safer")
local client = require("sga.client")
local server = require("sga.server")
local logging = require("logging")
local copas = require("copas")
local lfs = require("lfs")
local schema = require("schema")
local function bold(text)
return "\27[1m"..text.."\27[0m"
end
local function color(level)
if level == "ERROR" then
return "\27[1;31m"..level.."\27[0m"
elseif level == "WARNING" then
return "\27[1;33m"..level.."\27[0m"
elseif level == "DEBUG" then
return "\27[0;35m"..level.."\27[0m"
elseif level == "INFO" then
return "\27[0;36m"..level.."\27[0m"
else
return level
end
end
local function make_logger(ident)
return safer.readonly(logging.new(function(_, level, message)
io.stderr:write(bold(os.date()).." ["..color(level).."] "..bold(ident)..": "..message.."\n")
return true
end))
end
--[[
Application architecture:
┌──────────┐
┌───────◇│ client │◇───────┐
│ └────┬─────┘ │
│ │ │
│ ◇ │
┌─────┴────┐ ┌──────────┐ ┌────┴─────┐
│ config ├──◇│ server │◇──┤ logger │
└─────┬────┘ └──────────┘ └────┬─────┘
│ ◇ ◆ │
│ │ │ │
│ │ ┌─┴────────┐ │
│ │ │ job list │ │
│ │ └──────────┘ │
│ ┌────┴─────┐ │
└───────◇│ driver │◇───────┘
└──────────┘
--]]
function application.new(config)
local ok, err = lfs.mkdir(config.runtime_data_dir)
if not ok then
local attr = lfs.attributes(config.runtime_data_dir)
if not (attr and attr.mode == "directory") then
return nil, "Failed creating runtime data dir "..config.runtime_data_dir
end
end
local client_object = client.new(config, make_logger("sga.client"))
7172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
local pok, driver_module = pcall(require, config.driver)
if not pok then
return nil, "Failed loading SGA driver '"..tostring(config.driver).."': "..driver_module
end
local driver_schema = schema.Record {
actions = schema.Map(schema.String, schema.Function),
execute_command = schema.Function,
get_nodes = schema.Function,
get_nodes_status = schema.Function,
is_command_done = schema.Function,
cleanup_job = schema.Function,
check_path = schema.Function,
new = schema.Function,
type = schema.Optional(schema.String),
}
local driver_err = schema.CheckSchema(driver_module, driver_schema)
if driver_err then
return nil, "Error in driver API: "..tostring(driver_err)
end
local logger = make_logger("sga.application")
local orig_driver
pok, orig_driver, err = pcall(driver_module.new, config, make_logger(config.driver))
if not pok then
return nil, "Error initializing driver: "..orig_driver
end
if not orig_driver then
return nil, "Failed initializing driver: "..tostring(err)
end
local driver = setmetatable({}, {
__index = function(_, k) return orig_driver[k] or driver_module[k] end,
__newindex = function(_, k, v) orig_driver[k] = v end,
})
local self = {
config = config,
logger = logger,
client = client_object,
driver = driver,
server = server.new(config, make_logger("sga.server"), client_object, driver),
}
self = safer.readonly(setmetatable(self, { __index = application }))
return self
end
function application.run(self)
local nodes, n_err = self.driver:get_nodes()
if not nodes then
return nil, "Failed getting node configuration: " .. tostring(n_err)
end
local sga_type = self.config.sga_type or self.driver.type or "machine"
local reg_ok, reg_err = self.client:register(sga_type, nodes, self.server:get_persisted_jobs())
if not reg_ok then
return nil, "Failed registering: " .. tostring(reg_err)
end
local function loop_operation(msg, interval, fn)
return copas.addthread(function()
while not self.server.down do
if self.client.registered then
self.logger:debug(msg)
local ok, err = fn()
141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
if not ok then
ok, err = self.client:register(sga_type, self.driver:get_nodes(), self.server:get_persisted_jobs())
if not ok then
self.logger:error("Failed registering: " .. err)
end
collectgarbage()
end
copas.sleep(interval)
if self.server.down then break end
else
copas.sleep(1)
end
end
end)
end
loop_operation("Heartbeat", tonumber(self.client.actions["heartbeat"].interval_s), function()
return self.client:heartbeat()
end)
loop_operation("Status update", self.config.status_interval_s, function()
return self.client:status(self.driver:get_nodes_status())
end)
self.server:start()
return true
end
return application