From 4960d281781a06dd094b76b28fdff3d731126457 Mon Sep 17 00:00:00 2001 From: Renato Maia Date: Thu, 26 Nov 2020 17:56:40 -0300 Subject: [PATCH] =?UTF-8?q?Corre=C3=A7=C3=A3o=20na=20execu=C3=A7=C3=A3o=20?= =?UTF-8?q?de=20comandos=20de=20forma=20ass=C3=ADncrona.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit [SOMA-6793][SOMA-6791] --- sga-exec-scm-1.rockspec | 1 - sga/async.lua | 226 ++++++++++++++++++++++++++++++++++++++++ sga/exec/localhost.lua | 2 +- 3 files changed, 227 insertions(+), 2 deletions(-) create mode 100644 sga/async.lua diff --git a/sga-exec-scm-1.rockspec b/sga-exec-scm-1.rockspec index 4e72c1a..2fe3c4b 100644 --- a/sga-exec-scm-1.rockspec +++ b/sga-exec-scm-1.rockspec @@ -20,7 +20,6 @@ dependencies = { "sga-daemon", "lua-schema", "safer", - "copas-async" } build = { diff --git a/sga/async.lua b/sga/async.lua new file mode 100644 index 0000000..3845f3f --- /dev/null +++ b/sga/async.lua @@ -0,0 +1,226 @@ + +local async = {} + +local lanes = require("lanes") +local copas = require("copas") +local socket = require("socket") +local unpack = unpack or table.unpack + +lanes.configure() + +local function pack(...) + local ret = { n = select("#", ...) } + for i = 1, ret.n do + ret[i] = select(i, ...) + end + return ret +end + +local function normalize_exit(ret, typ, cod) + if type(ret) == "number" then + if ret == 0 then + return true, "exit", 0 + elseif ret < 255 then + return nil, "signal", ret + else + return nil, "exit", math.floor(ret / 255) + end + else + return ret, typ, cod + end +end + +local wskt = socket.bind("*", 0) +local whost, wport = wskt:getsockname() +wport = tonumber(wport) + +local waiting = {} + +local function launch_wakeup_server() + copas.addserver(wskt, function(dskt) + local id = "" + while true do + local data, err, partial = dskt:receive() + if data == nil then + if partial then + id = id .. partial + end + else + id = id .. data + end + if err == "closed" then + break + end + end + dskt:close() + local coro = waiting[id] + waiting[id] = nil + if not next(waiting) then + -- HACK to stop copas.removeserver() from calling wskt:close() + local proxy = setmetatable({ socket = wskt, close = function() end }, getmetatable(copas.wrap(wskt))) + copas.removeserver(proxy) + end + if coro then + copas.wakeup(coro) + end + end) +end + +local function add_waiting_coro(id, coro) + local empty = not next(waiting) + waiting[id] = coro + if empty then + launch_wakeup_server() + end +end + +local function awake_future(ch, ch_id, ...) + local socket = require("socket") + ch:send(ch_id, pack(...)) + local cskt = socket.tcp() + local ok = cskt:connect(whost, wport) + if ok then + cskt:send(tostring(ch)) + end + cskt:close() +end + +local function new_future(ch, ch_id) + local future = {} + future.try = function() + if future.getting == true then + error("concurrent access to future") + end + if not future.res then + local key, value = ch:receive(0, ch_id) + if key then + future.res = value + future.dead = true + end + end + if future.res then + return future.dead, unpack(future.res, 1, future.res.n) + end + end + future.get = function() + if future.getting == true then + error("concurrent access to future") + end + local me = coroutine.running() + future.getting = true + if not future.dead then + add_waiting_coro(tostring(ch), me) + copas.sleep(-1) + local key, value = ch:receive(0, ch_id) + if key then + future.res = value + end + future.dead = true + end + future.getting = false + if future.res then + return unpack(future.res, 1, future.res.n) + end + end + setmetatable(future, { __call = future.get }) + return future +end + +function async.addthread(fn) + local ch = lanes.linda() + + lanes.gen("*", function() + -- FIXME PCALL TEST + awake_future(ch, "done", fn()) + end)() + + return new_future(ch, "done") +end + +function async.os_execute(command) + local future = async.addthread(function() + return os.execute(command) + end) + return normalize_exit(future:get()) +end + +function async.channel() + local ch = lanes.linda() + + local receive_operation = function(self, op) + if self.accessing then + error("Concurrent access to channel.") + end + self.accessing = true + local future = new_future(ch, "data") + local res = pack(future[op](future)) + self.accessing = false + return unpack(res, 1, res.n) + end + + return { + send = function(_, ...) + awake_future(ch, "data", ...) + end, + receive = receive_operation("get"), + try_receive = receive_operation("try"), + } +end + +function async.io_popen(command, mode) + local ch = lanes.linda() + + async.addthread(function() + local fd, err = io.popen(command, mode) + if not fd then + return nil, err + end + local op = mode == "r" and "read" or "write" + while true do + local _, fd_cmd = ch:receive("fd_cmd") + if fd_cmd == "close" then + awake_future(ch, "result", normalize_exit(fd:close())) + break + end + awake_future(ch, "result", fd[op](fd, fd_cmd)) + end + fd:close() + end) + + local function operation(valid_mode, errormsg_on_invalid) + return function(_, arg) + if mode ~= valid_mode then + return nil, errormsg_on_invalid + end + local ok = ch:send("fd_cmd", arg) + if ok == true then + return new_future(ch, "result")() + end + end + end + + return { + close = function() + local ok = ch:send("fd_cmd", "close") + if ok == true then + return new_future(ch, "result")() + end + end, + flush = function() + return nil, "Not available." + end, + lines = function() + return nil, "Not available." + end, + read = operation("r", "Pipe not open for reading"), + seek = function() + return nil, "Not available." + end, + setvbuf = function() + return nil, "Not available." + end, + write = operation("w", "Pipe not open for writing"), + } +end + +return async diff --git a/sga/exec/localhost.lua b/sga/exec/localhost.lua index e5d2d59..d60877e 100644 --- a/sga/exec/localhost.lua +++ b/sga/exec/localhost.lua @@ -1,7 +1,7 @@ local localhost = {} -local async = require("copas.async") +local async = require("sga.async") local util = require("sga.util") local posix = require("posix") -- GitLab