mirror of
https://github.com/neovim/neovim.git
synced 2026-01-04 10:26:42 +10:00
refactor(tests): move lua-client into core and use it for functionaltests
Eliminates lua-client and non-static libluv as test time dependencies Note: the API for a public lua-client is not yet finished. The interface needs to be adjusted to work in the embedded loop of a nvim instance (to use it to talk between instances)
This commit is contained in:
112
test/client/msgpack_rpc_stream.lua
Normal file
112
test/client/msgpack_rpc_stream.lua
Normal file
@@ -0,0 +1,112 @@
|
||||
local mpack = require('mpack')
|
||||
|
||||
-- temporary hack to be able to manipulate buffer/window/tabpage
|
||||
local Buffer = {}
|
||||
Buffer.__index = Buffer
|
||||
function Buffer.new(id) return setmetatable({id=id}, Buffer) end
|
||||
local Window = {}
|
||||
Window.__index = Window
|
||||
function Window.new(id) return setmetatable({id=id}, Window) end
|
||||
local Tabpage = {}
|
||||
Tabpage.__index = Tabpage
|
||||
function Tabpage.new(id) return setmetatable({id=id}, Tabpage) end
|
||||
|
||||
local Response = {}
|
||||
Response.__index = Response
|
||||
|
||||
function Response.new(msgpack_rpc_stream, request_id)
|
||||
return setmetatable({
|
||||
_msgpack_rpc_stream = msgpack_rpc_stream,
|
||||
_request_id = request_id
|
||||
}, Response)
|
||||
end
|
||||
|
||||
function Response:send(value, is_error)
|
||||
local data = self._msgpack_rpc_stream._session:reply(self._request_id)
|
||||
if is_error then
|
||||
data = data .. self._msgpack_rpc_stream._pack(value)
|
||||
data = data .. self._msgpack_rpc_stream._pack(mpack.NIL)
|
||||
else
|
||||
data = data .. self._msgpack_rpc_stream._pack(mpack.NIL)
|
||||
data = data .. self._msgpack_rpc_stream._pack(value)
|
||||
end
|
||||
self._msgpack_rpc_stream._stream:write(data)
|
||||
end
|
||||
|
||||
local MsgpackRpcStream = {}
|
||||
MsgpackRpcStream.__index = MsgpackRpcStream
|
||||
|
||||
function MsgpackRpcStream.new(stream)
|
||||
return setmetatable({
|
||||
_stream = stream,
|
||||
_pack = mpack.Packer({
|
||||
ext = {
|
||||
[Buffer] = function(o) return 0, mpack.encode(o.id) end,
|
||||
[Window] = function(o) return 1, mpack.encode(o.id) end,
|
||||
[Tabpage] = function(o) return 2, mpack.encode(o.id) end
|
||||
}
|
||||
}),
|
||||
_session = mpack.Session({
|
||||
unpack = mpack.Unpacker({
|
||||
ext = {
|
||||
[0] = function(_c, s) return Buffer.new(mpack.decode(s)) end,
|
||||
[1] = function(_c, s) return Window.new(mpack.decode(s)) end,
|
||||
[2] = function(_c, s) return Tabpage.new(mpack.decode(s)) end
|
||||
}
|
||||
})
|
||||
}),
|
||||
}, MsgpackRpcStream)
|
||||
end
|
||||
|
||||
function MsgpackRpcStream:write(method, args, response_cb)
|
||||
local data
|
||||
if response_cb then
|
||||
assert(type(response_cb) == 'function')
|
||||
data = self._session:request(response_cb)
|
||||
else
|
||||
data = self._session:notify()
|
||||
end
|
||||
|
||||
data = data .. self._pack(method) .. self._pack(args)
|
||||
self._stream:write(data)
|
||||
end
|
||||
|
||||
function MsgpackRpcStream:read_start(request_cb, notification_cb, eof_cb)
|
||||
self._stream:read_start(function(data)
|
||||
if not data then
|
||||
return eof_cb()
|
||||
end
|
||||
local type, id_or_cb, method_or_error, args_or_result
|
||||
local pos = 1
|
||||
local len = #data
|
||||
while pos <= len do
|
||||
type, id_or_cb, method_or_error, args_or_result, pos =
|
||||
self._session:receive(data, pos)
|
||||
if type == 'request' or type == 'notification' then
|
||||
if type == 'request' then
|
||||
request_cb(method_or_error, args_or_result, Response.new(self,
|
||||
id_or_cb))
|
||||
else
|
||||
notification_cb(method_or_error, args_or_result)
|
||||
end
|
||||
elseif type == 'response' then
|
||||
if method_or_error == mpack.NIL then
|
||||
method_or_error = nil
|
||||
else
|
||||
args_or_result = nil
|
||||
end
|
||||
id_or_cb(method_or_error, args_or_result)
|
||||
end
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
function MsgpackRpcStream:read_stop()
|
||||
self._stream:read_stop()
|
||||
end
|
||||
|
||||
function MsgpackRpcStream:close(signal)
|
||||
self._stream:close(signal)
|
||||
end
|
||||
|
||||
return MsgpackRpcStream
|
||||
192
test/client/session.lua
Normal file
192
test/client/session.lua
Normal file
@@ -0,0 +1,192 @@
|
||||
local uv = require('luv')
|
||||
local MsgpackRpcStream = require('test.client.msgpack_rpc_stream')
|
||||
|
||||
local Session = {}
|
||||
Session.__index = Session
|
||||
if package.loaded['jit'] then
|
||||
-- luajit pcall is already coroutine safe
|
||||
Session.safe_pcall = pcall
|
||||
else
|
||||
Session.safe_pcall = require'coxpcall'.pcall
|
||||
end
|
||||
|
||||
local function resume(co, ...)
|
||||
local status, result = coroutine.resume(co, ...)
|
||||
|
||||
if coroutine.status(co) == 'dead' then
|
||||
if not status then
|
||||
error(result)
|
||||
end
|
||||
return
|
||||
end
|
||||
|
||||
assert(coroutine.status(co) == 'suspended')
|
||||
result(co)
|
||||
end
|
||||
|
||||
local function coroutine_exec(func, ...)
|
||||
local args = {...}
|
||||
local on_complete
|
||||
|
||||
if #args > 0 and type(args[#args]) == 'function' then
|
||||
-- completion callback
|
||||
on_complete = table.remove(args)
|
||||
end
|
||||
|
||||
resume(coroutine.create(function()
|
||||
local status, result, flag = Session.safe_pcall(func, unpack(args))
|
||||
if on_complete then
|
||||
coroutine.yield(function()
|
||||
-- run the completion callback on the main thread
|
||||
on_complete(status, result, flag)
|
||||
end)
|
||||
end
|
||||
end))
|
||||
end
|
||||
|
||||
function Session.new(stream)
|
||||
return setmetatable({
|
||||
_msgpack_rpc_stream = MsgpackRpcStream.new(stream),
|
||||
_pending_messages = {},
|
||||
_prepare = uv.new_prepare(),
|
||||
_timer = uv.new_timer(),
|
||||
_is_running = false
|
||||
}, Session)
|
||||
end
|
||||
|
||||
function Session:next_message(timeout)
|
||||
local function on_request(method, args, response)
|
||||
table.insert(self._pending_messages, {'request', method, args, response})
|
||||
uv.stop()
|
||||
end
|
||||
|
||||
local function on_notification(method, args)
|
||||
table.insert(self._pending_messages, {'notification', method, args})
|
||||
uv.stop()
|
||||
end
|
||||
|
||||
if self._is_running then
|
||||
error('Event loop already running')
|
||||
end
|
||||
|
||||
if #self._pending_messages > 0 then
|
||||
return table.remove(self._pending_messages, 1)
|
||||
end
|
||||
|
||||
self:_run(on_request, on_notification, timeout)
|
||||
return table.remove(self._pending_messages, 1)
|
||||
end
|
||||
|
||||
function Session:notify(method, ...)
|
||||
self._msgpack_rpc_stream:write(method, {...})
|
||||
end
|
||||
|
||||
function Session:request(method, ...)
|
||||
local args = {...}
|
||||
local err, result
|
||||
if self._is_running then
|
||||
err, result = self:_yielding_request(method, args)
|
||||
else
|
||||
err, result = self:_blocking_request(method, args)
|
||||
end
|
||||
|
||||
if err then
|
||||
return false, err
|
||||
end
|
||||
|
||||
return true, result
|
||||
end
|
||||
|
||||
function Session:run(request_cb, notification_cb, setup_cb, timeout)
|
||||
local function on_request(method, args, response)
|
||||
coroutine_exec(request_cb, method, args, function(status, result, flag)
|
||||
if status then
|
||||
response:send(result, flag)
|
||||
else
|
||||
response:send(result, true)
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
local function on_notification(method, args)
|
||||
coroutine_exec(notification_cb, method, args)
|
||||
end
|
||||
|
||||
self._is_running = true
|
||||
|
||||
if setup_cb then
|
||||
coroutine_exec(setup_cb)
|
||||
end
|
||||
|
||||
while #self._pending_messages > 0 do
|
||||
local msg = table.remove(self._pending_messages, 1)
|
||||
if msg[1] == 'request' then
|
||||
on_request(msg[2], msg[3], msg[4])
|
||||
else
|
||||
on_notification(msg[2], msg[3])
|
||||
end
|
||||
end
|
||||
|
||||
self:_run(on_request, on_notification, timeout)
|
||||
self._is_running = false
|
||||
end
|
||||
|
||||
function Session:stop()
|
||||
uv.stop()
|
||||
end
|
||||
|
||||
function Session:close(signal)
|
||||
if not self._timer:is_closing() then self._timer:close() end
|
||||
if not self._prepare:is_closing() then self._prepare:close() end
|
||||
self._msgpack_rpc_stream:close(signal)
|
||||
end
|
||||
|
||||
function Session:_yielding_request(method, args)
|
||||
return coroutine.yield(function(co)
|
||||
self._msgpack_rpc_stream:write(method, args, function(err, result)
|
||||
resume(co, err, result)
|
||||
end)
|
||||
end)
|
||||
end
|
||||
|
||||
function Session:_blocking_request(method, args)
|
||||
local err, result
|
||||
|
||||
local function on_request(method_, args_, response)
|
||||
table.insert(self._pending_messages, {'request', method_, args_, response})
|
||||
end
|
||||
|
||||
local function on_notification(method_, args_)
|
||||
table.insert(self._pending_messages, {'notification', method_, args_})
|
||||
end
|
||||
|
||||
self._msgpack_rpc_stream:write(method, args, function(e, r)
|
||||
err = e
|
||||
result = r
|
||||
uv.stop()
|
||||
end)
|
||||
|
||||
self:_run(on_request, on_notification)
|
||||
return (err or self.eof_err), result
|
||||
end
|
||||
|
||||
function Session:_run(request_cb, notification_cb, timeout)
|
||||
if type(timeout) == 'number' then
|
||||
self._prepare:start(function()
|
||||
self._timer:start(timeout, 0, function()
|
||||
uv.stop()
|
||||
end)
|
||||
self._prepare:stop()
|
||||
end)
|
||||
end
|
||||
self._msgpack_rpc_stream:read_start(request_cb, notification_cb, function()
|
||||
uv.stop()
|
||||
self.eof_err = {1, "EOF was received from Nvim. Likely the Nvim process crashed."}
|
||||
end)
|
||||
uv.run()
|
||||
self._prepare:stop()
|
||||
self._timer:stop()
|
||||
self._msgpack_rpc_stream:read_stop()
|
||||
end
|
||||
|
||||
return Session
|
||||
164
test/client/uv_stream.lua
Normal file
164
test/client/uv_stream.lua
Normal file
@@ -0,0 +1,164 @@
|
||||
local uv = require('luv')
|
||||
|
||||
local StdioStream = {}
|
||||
StdioStream.__index = StdioStream
|
||||
|
||||
function StdioStream.open()
|
||||
local self = setmetatable({
|
||||
_in = uv.new_pipe(false),
|
||||
_out = uv.new_pipe(false)
|
||||
}, StdioStream)
|
||||
self._in:open(0)
|
||||
self._out:open(1)
|
||||
return self
|
||||
end
|
||||
|
||||
function StdioStream:write(data)
|
||||
self._out:write(data)
|
||||
end
|
||||
|
||||
function StdioStream:read_start(cb)
|
||||
self._in:read_start(function(err, chunk)
|
||||
if err then
|
||||
error(err)
|
||||
end
|
||||
cb(chunk)
|
||||
end)
|
||||
end
|
||||
|
||||
function StdioStream:read_stop()
|
||||
self._in:read_stop()
|
||||
end
|
||||
|
||||
function StdioStream:close()
|
||||
self._in:close()
|
||||
self._out:close()
|
||||
end
|
||||
|
||||
local SocketStream = {}
|
||||
SocketStream.__index = SocketStream
|
||||
|
||||
function SocketStream.open(file)
|
||||
local socket = uv.new_pipe(false)
|
||||
local self = setmetatable({
|
||||
_socket = socket,
|
||||
_stream_error = nil
|
||||
}, SocketStream)
|
||||
uv.pipe_connect(socket, file, function (err)
|
||||
self._stream_error = self._stream_error or err
|
||||
end)
|
||||
return self
|
||||
end
|
||||
|
||||
function SocketStream.connect(host, port)
|
||||
local socket = uv.new_tcp()
|
||||
local self = setmetatable({
|
||||
_socket = socket,
|
||||
_stream_error = nil
|
||||
}, SocketStream)
|
||||
uv.tcp_connect(socket, host, port, function (err)
|
||||
self._stream_error = self._stream_error or err
|
||||
end)
|
||||
return self
|
||||
end
|
||||
|
||||
|
||||
function SocketStream:write(data)
|
||||
if self._stream_error then
|
||||
error(self._stream_error)
|
||||
end
|
||||
uv.write(self._socket, data, function(err)
|
||||
if err then
|
||||
error(self._stream_error or err)
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
function SocketStream:read_start(cb)
|
||||
if self._stream_error then
|
||||
error(self._stream_error)
|
||||
end
|
||||
uv.read_start(self._socket, function(err, chunk)
|
||||
if err then
|
||||
error(err)
|
||||
end
|
||||
cb(chunk)
|
||||
end)
|
||||
end
|
||||
|
||||
function SocketStream:read_stop()
|
||||
if self._stream_error then
|
||||
error(self._stream_error)
|
||||
end
|
||||
uv.read_stop(self._socket)
|
||||
end
|
||||
|
||||
function SocketStream:close()
|
||||
uv.close(self._socket)
|
||||
end
|
||||
|
||||
local ChildProcessStream = {}
|
||||
ChildProcessStream.__index = ChildProcessStream
|
||||
|
||||
function ChildProcessStream.spawn(argv, env, io_extra)
|
||||
local self = setmetatable({
|
||||
_child_stdin = uv.new_pipe(false),
|
||||
_child_stdout = uv.new_pipe(false)
|
||||
}, ChildProcessStream)
|
||||
local prog = argv[1]
|
||||
local args = {}
|
||||
for i = 2, #argv do
|
||||
args[#args + 1] = argv[i]
|
||||
end
|
||||
self._proc, self._pid = uv.spawn(prog, {
|
||||
stdio = {self._child_stdin, self._child_stdout, 2, io_extra},
|
||||
args = args,
|
||||
env = env,
|
||||
}, function()
|
||||
self:close()
|
||||
end)
|
||||
|
||||
if not self._proc then
|
||||
local err = self._pid
|
||||
error(err)
|
||||
end
|
||||
|
||||
return self
|
||||
end
|
||||
|
||||
function ChildProcessStream:write(data)
|
||||
self._child_stdin:write(data)
|
||||
end
|
||||
|
||||
function ChildProcessStream:read_start(cb)
|
||||
self._child_stdout:read_start(function(err, chunk)
|
||||
if err then
|
||||
error(err)
|
||||
end
|
||||
cb(chunk)
|
||||
end)
|
||||
end
|
||||
|
||||
function ChildProcessStream:read_stop()
|
||||
self._child_stdout:read_stop()
|
||||
end
|
||||
|
||||
function ChildProcessStream:close(signal)
|
||||
if self._closed then
|
||||
return
|
||||
end
|
||||
self._closed = true
|
||||
self:read_stop()
|
||||
self._child_stdin:close()
|
||||
self._child_stdout:close()
|
||||
if type(signal) == 'string' then
|
||||
self._proc:kill('sig'..signal)
|
||||
end
|
||||
uv.run('nowait')
|
||||
end
|
||||
|
||||
return {
|
||||
StdioStream = StdioStream;
|
||||
ChildProcessStream = ChildProcessStream;
|
||||
SocketStream = SocketStream;
|
||||
}
|
||||
Reference in New Issue
Block a user