Commit e9f71bd7 authored by Renato Figueiro Maia's avatar Renato Figueiro Maia

[OPENBUS-1538] Implementar a nova proposta do SDK OpenBus em Lua

- Novas operações 'openbus.newthread' e 'openbus.sleep' que são aliases para operações com nomes confusos do módulo 'cothread'.

git-svn-id: https://subversion.tecgraf.puc-rio.br/engdist/openbus/sdk/lua/branches/openbus_v2_proto@130376 ae0415b3-e90b-0410-900d-d0be9363c56b
parent 1bfd8053
......@@ -29,7 +29,11 @@ local LRUCache = require "loop.collection.LRUCache"
local Wrapper = require "loop.object.Wrapper"
local cothread = require "cothread"
local resume = cothread.next
local running = cothread.running
local threadtrap = cothread.trap
local unschedule = cothread.unschedule
local Mutex = require "cothread.Mutex"
local giop = require "oil.corba.giop"
......@@ -58,12 +62,8 @@ local receiveBusRequest = CoreInterceptor.receiverequest
-- must be loaded after OiL is loaded because OiL is the one that installs
-- the cothread plug-in that supports the 'now' operation.
local cothread = require "cothread"
local resume = cothread.next
local unschedule = cothread.unschedule
local deferuntil = cothread.defer
local delay = cothread.delay
local time = cothread.now
local threadtrap = cothread.trap
......@@ -220,7 +220,7 @@ local function newRenewer(self, lease)
local access = self.AccessControl
while self.login == login do
self.renewer = thread
deferuntil(time()+lease)
delay(lease)
self.renewer = nil
log:action(msg.RenewLogin:tag{id=login.id,entity=login.entity})
local ok, result = pcall(access.renew, access)
......@@ -623,7 +623,13 @@ end
local openbus = {}
local openbus = { sleep = delay }
function openbus.newthread(func, ...)
local thread = newthread(func)
cothread.next(thread, ...)
return thread
end
function openbus.initORB(configs)
local orb = neworb(copy(configs))
......@@ -657,6 +663,8 @@ argcheck.convertclass(ConnectionManager, {
clearDispatcher = { "string" },
})
argcheck.convertmodule(openbus, {
sleep = { "number" },
newthread = { "function" },
initORB = { "nil|table" },
})
......
local _G = require "_G"
local next = _G.next
local pairs = _G.pairs
local pcall = _G.pcall
local setmetatable = _G.setmetatable
local coroutine = require "coroutine"
local newthread = coroutine.create
local table = require "loop.table"
local copy = table.copy
local memoize = table.memoize
local cothread = require "cothread"
local running = cothread.running
local oo = require "openbus.util.oo"
local class = oo.class
local log = require "openbus.util.logger"
local msg = require "openbus.util.messages"
local idl = require "openbus.core.idl"
local const = idl.const.services.access_control
local Identifier = idl.types.Identifier
local BusObjectKey = idl.const.BusObjectKey
local access = require "openbus.core.Access"
local neworb = access.initORB
local openbus = require "openbus"
local basicORB = openbus.initORB
local connect = openbus.connect
local VersionHeader = "\002\000"
local CredentialContextId = 0x42555300 -- "BUS\0"
local WeakKeyMeta = { __mode = "k" }
local IceptorOp2LogMessage = {
sendrequest = msg.MultiplexedCall,
receivereply = false,
receiverequest = msg.GotMultiplexedCall,
sendreply = false,
}
local function logMultiplexed(conn, op, request, ...)
local message = IceptorOp2LogMessage[op]
if message then
local login = conn.login
log:multiplexed(true, IceptorOp2LogMessage[op]:tag{
bus = conn.busid,
login = login and login.id,
entity = login and login.entity,
operation = request.operation_name,
})
end
conn[op](conn, request, ...)
if not message then
log:multiplexed(false)
end
end
local function getBusId(self, contexts)
local data = contexts[CredentialContextId]
if data ~= nil then
local decoder = self.orb:newdecoder(data)
return decoder:get(self.identifierType)
end
end
local Multiplexer = class()
function Multiplexer:__init()
self.connections = {} -- [conn] = { [thread]=true, ... }
self.connectionOf = setmetatable({}, WeakKeyMeta) -- [thread]=connection
local orb = self.orb
self.identifierType = orb.types:lookup_id(Identifier)
end
function Multiplexer:addConnection(conn)
self.connections[conn] = {}
end
function Multiplexer:removeConnection(conn)
local connections = self.connections
local threads = connections[conn]
if threads then
local connectionOf = self.connectionOf
connectionOf[conn.busid] = nil
for thread in pairs(threads) do
connectionOf[thread] = nil
end
connections[conn] = nil
end
end
function Multiplexer:sendrequest(request, ...)
local conn = self.connectionOf[running()]
if conn ~= nil then
request.multiplexedConnection = conn
logMultiplexed(conn, "sendrequest", request, ...)
else
request.success = false
request.results = {self.orb:newexcept{
"CORBA::NO_PERMISSION",
completed = "COMPLETED_NO",
minor = const.NoLoginCode,
}}
log:badaccess(msg.CallInThreadWithoutConnection:tag{
operation = request.operation_name,
})
end
end
function Multiplexer:receivereply(request, ...)
local conn = request.multiplexedConnection
logMultiplexed(conn, "receivereply", request, ...)
if request.success ~= nil then
request.multiplexedConnection = nil
end
end
function Multiplexer:receiverequest(request, ...)
local ok, busid = pcall(getBusId, self, request.service_context)
if ok then
local conn = self.connectionOf[busid]
if conn ~= nil then
request.multiplexedConnection = conn
self.connectionOf[running()] = conn
logMultiplexed(conn, "receiverequest", request, ...)
else
request.success = false
request.results = {self.orb:newexcept{
"CORBA::NO_PERMISSION",
completed = "COMPLETED_NO",
minor = const.UnknownBusCode,
}}
log:badaccess(msg.DeniedCallFromUnknownBus:tag{
operation = request.operation_name,
bus = busid,
})
end
else
request.success = false
request.results = { busid } -- TODO[maia]: Is a good CORBA SysEx to throw here?
log:badaccess(msg.UnableToDecodeCredential:tag{errmsg=busid})
end
end
function Multiplexer:sendreply(request, ...)
local conn = request.multiplexedConnection
logMultiplexed(conn, "sendreply", request, ...)
request.multiplexedConnection = nil
self.connectionOf[running()] = nil
end
function Multiplexer:setCurrentConnection(conn)
local set = self.connections[conn]
if set == nil then error("invalid connection") end
local thread = running()
self.connectionOf[thread] = conn
set[thread] = true
end
function Multiplexer:getCurrentConnection()
return self.connectionOf[running()]
end
function Multiplexer:setIncomingConnection(busid, conn)
if self.connections[conn] == nil or conn.busid ~= busid then
error("invalid connection")
end
self.connectionOf[busid] = conn
end
function Multiplexer:getIncomingConnection(busid)
return self.connectionOf[busid]
end
local function initORB(...)
local orb = basicORB(...)
local multiplexer = Multiplexer{ orb = orb }
orb.OpenBusConnectionMultiplexer = multiplexer
-- redefine interceptor operations
local iceptor = orb.OpenBusInterceptor
function iceptor:addConnection(conn)
local current = self.connection
if current == nil then
self.connection = conn
else
if current ~= multiplexer then
multiplexer:addConnection(current)
self.connection = multiplexer
log:multiplexed(msg.MultiplexingEnabled)
end
multiplexer:addConnection(conn)
end
end
local removeConnection = iceptor.removeConnection
function iceptor:removeConnection(conn)
local current = self.connection
if self.connection ~= multiplexer then
removeConnection(self, conn)
else
multiplexer:removeConnection(conn)
local connections = multiplexer.connections
local single = next(connections)
if single ~= nil and next(connections, single) == nil then
multiplexer:removeConnection(single)
self.connection = single
log:multiplexed(msg.MultiplexingDisabled)
end
end
end
return orb
end
local openbus = { initORB = initORB }
function openbus.connect(host, port, orb)
if orb == nil then orb = initORB() end
local conn = connect(host, port, orb)
local multiplexer = orb.OpenBusConnectionMultiplexer
local renewer = conn.newrenewer
function conn:newrenewer(lease)
local thread = renewer(self, lease)
multiplexer.connectionOf[thread] = conn
return thread
end
return conn
end
-- insert function argument typing
local argcheck = require "openbus.util.argcheck"
argcheck.convertclass(Multiplexer, {
setCurrentConnection = { --[[Connection]] },
getCurrentConnection = {},
setIncomingConnection = { "string", --[[Connection]] },
getIncomingConnection = { "string" },
})
argcheck.convertmodule(openbus, {
initORB = { "nil|table" },
connect = { "string", "number" },
})
return openbus
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment