150 lines
4.6 KiB
Lua
150 lines
4.6 KiB
Lua
-- -*- Mode: Lua; indent-tabs-mode: nil; lua-indent-level: 2 -*-
|
|
-- LADI Continuous Integration (ladici)
|
|
-- SPDX-FileCopyrightText: Copyright © 2010-2023 Nedko Arnaudov */
|
|
-- SPDX-License-Identifier: GPL-2.0-or-later
|
|
|
|
local socket = require 'socket'
|
|
-- require 'misc'
|
|
|
|
--module('remotes', package.seeall)
|
|
|
|
local threads = {}
|
|
|
|
function add_thread(fun)
|
|
local thread = coroutine.create(fun)
|
|
table.insert(threads, thread)
|
|
-- print("new " .. tostring(thread))
|
|
-- misc.dump_table(threads)
|
|
end
|
|
|
|
local function receive(sock, block_size)
|
|
local block, status
|
|
while true do
|
|
local partial
|
|
sock:settimeout(0) -- do not block
|
|
block, status, partial = sock:receive(block_size)
|
|
-- print('block [' .. tostring(block) .. ']')
|
|
-- print('status [' .. tostring(status) .. ']')
|
|
-- print('partial [' .. tostring(partial) .. ']')
|
|
if block and string.len(block) == 0 then block = nil end
|
|
block = block or partial
|
|
if block and string.len(block) == 0 then block = nil end
|
|
if block then break end
|
|
if status == 'timeout' then
|
|
-- if data is not available, tell the dispatcher so it can eventually wait on this socket
|
|
coroutine.yield(sock)
|
|
else
|
|
break
|
|
end
|
|
end
|
|
return block, status
|
|
end
|
|
|
|
local function accept_thread_factory(sock, client_thread)
|
|
local accept_enabled = true
|
|
return
|
|
function()
|
|
while accept_enabled do
|
|
sock:settimeout(0) -- do not block
|
|
local client, err = sock:accept()
|
|
if client then
|
|
add_thread(function()
|
|
local ip = client:getpeername()
|
|
local description = ("%s:%s"):format(client:getpeername())
|
|
local peer = {
|
|
send = function(data) client:send(data) end,
|
|
receive = function(block_size) return receive(client, block_size) end,
|
|
get_description = function() return description end,
|
|
get_ip = function() return ip end,
|
|
accept_disable = function() accept_enabled = false end
|
|
}
|
|
client_thread(peer)
|
|
-- print('closing client socket')
|
|
client:close()
|
|
end)
|
|
elseif err == 'timeout' then
|
|
-- if data is not available, tell the dispatcher so it can eventually wait on this socket
|
|
coroutine.yield(sock)
|
|
else
|
|
error("accept failed: " .. err) -- terminate the thread coroutine
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
function dispatch()
|
|
local i
|
|
local sockets = {} -- list of sockets that to wait on
|
|
while true do
|
|
if threads[i] == nil then -- no more threads?
|
|
if threads[1] == nil then print("no more threads to dispatch") break end
|
|
i = 1 -- restart the loop
|
|
sockets = {}
|
|
end
|
|
|
|
-- print('resuming ' .. tostring(threads[i]))
|
|
local status, sock = coroutine.resume(threads[i])
|
|
if not sock then -- thread finished its task?
|
|
-- print(('finished %s'):format(tostring(threads[i])))
|
|
table.remove(threads, i)
|
|
-- misc.dump_table(threads)
|
|
else
|
|
i = i + 1
|
|
assert(type(sock) == 'userdata', tostring(sock))
|
|
table.insert(sockets, sock)
|
|
if #sockets == #threads then -- all threads blocked?
|
|
-- print("all threads blocked")
|
|
-- misc.dump_table(sockets)
|
|
socket.select(sockets)
|
|
-- print("select done")
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
function connect_tcp(host, port)
|
|
local sock, err = socket.connect(host, port)
|
|
if not sock then return nil, err end
|
|
|
|
local local_ip = sock:getsockname()
|
|
local ip, port2 = sock:getpeername()
|
|
assert(port == port2)
|
|
|
|
return {
|
|
get_description = function() return ("%s[%s]:%s"):format(host, ip, port) end,
|
|
get_local_ip = function() return local_ip end,
|
|
send = function(data) sock:send(data) end,
|
|
receive = function(block_size) return receive(sock, block_size) end,
|
|
close = function() sock:close() end,
|
|
}
|
|
end
|
|
|
|
function create_tcp_server(client_thread, binds, backlog)
|
|
assert(client_thread)
|
|
local sock, res, err
|
|
|
|
sock, err = socket.tcp()
|
|
if not sock then return err end
|
|
|
|
res, err = sock:setoption('linger', {on=true, timeout=0})
|
|
--if not res then return err end
|
|
if not res then print(err) end
|
|
|
|
for _, bind in pairs(binds) do
|
|
res, err = sock:bind(bind.host, bind.port)
|
|
if not res then return err end
|
|
print(("Listening on %s:%s"):format(bind.host, bind.port))
|
|
end
|
|
|
|
res, err = sock:listen(backlog)
|
|
if not res then return err end
|
|
|
|
sock:settimeout(0)
|
|
add_thread(accept_thread_factory(sock, client_thread))
|
|
end
|
|
|
|
return {
|
|
create_tcp_server = create_tcp_server,
|
|
dispatch = dispatch,
|
|
}
|