Skip to content

Commit 1aaf637

Browse files
Totktonadalocker
authored andcommitted
net.box: add skip_header option to use with buffer
Needed for #3276. @TarantoolBot document Title: net.box: skip_header option This option instructs net.box to skip {[IPROTO_DATA_KEY] = ...} wrapper from a buffer. This may be needed to pass this buffer to some C function when it expects some specific msgpack input. Usage example: ```lua local net_box = require('net.box') local buffer = require('buffer') local ffi = require('ffi') local msgpack = require('msgpack') local yaml = require('yaml') box.cfg{listen = 3301} box.once('load_data', function() box.schema.user.grant('guest', 'read,write,execute', 'universe') box.schema.space.create('s') box.space.s:create_index('pk') box.space.s:insert({1}) box.space.s:insert({2}) box.space.s:insert({3}) box.space.s:insert({4}) end) local function foo() return box.space.s:select() end _G.foo = foo local conn = net_box.connect('localhost:3301') local buf = buffer.ibuf() conn.space.s:select(nil, {buffer = buf}) local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos) local buf_lua = msgpack.decode(buf_str) print('select:\n' .. yaml.encode(buf_lua)) -- {48: [[1], [2], [3], [4]]} local buf = buffer.ibuf() conn.space.s:select(nil, {buffer = buf, skip_header = true}) local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos) local buf_lua = msgpack.decode(buf_str) print('select:\n' .. yaml.encode(buf_lua)) -- [[1], [2], [3], [4]] local buf = buffer.ibuf() conn:call('foo', nil, {buffer = buf}) local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos) local buf_lua = msgpack.decode(buf_str) print('call:\n' .. yaml.encode(buf_lua)) -- {48: [[[1], [2], [3], [4]]]} local buf = buffer.ibuf() conn:call('foo', nil, {buffer = buf, skip_header = true}) local buf_str = ffi.string(buf.rpos, buf.wpos - buf.rpos) local buf_lua = msgpack.decode(buf_str) print('call:\n' .. yaml.encode(buf_lua)) -- [[[1], [2], [3], [4]]] os.exit() ```
1 parent b1b2e7b commit 1aaf637

File tree

3 files changed

+333
-31
lines changed

3 files changed

+333
-31
lines changed

src/box/lua/net_box.lua

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,12 @@ local urilib = require('uri')
1010
local internal = require('net.box.lib')
1111
local trigger = require('internal.trigger')
1212

13-
local band = bit.band
14-
local max = math.max
15-
local fiber_clock = fiber.clock
16-
local fiber_self = fiber.self
17-
local decode = msgpack.decode_unchecked
13+
local band = bit.band
14+
local max = math.max
15+
local fiber_clock = fiber.clock
16+
local fiber_self = fiber.self
17+
local decode = msgpack.decode_unchecked
18+
local decode_map_header = msgpack.decode_map_header
1819

1920
local table_new = require('table.new')
2021
local check_iterator_type = box.internal.check_iterator_type
@@ -483,8 +484,8 @@ local function create_transport(host, port, user, password, callback,
483484
-- @retval nil, error Error occured.
484485
-- @retval not nil Future object.
485486
--
486-
local function perform_async_request(buffer, method, on_push, on_push_ctx,
487-
...)
487+
local function perform_async_request(buffer, skip_header, method, on_push,
488+
on_push_ctx, ...)
488489
if state ~= 'active' and state ~= 'fetch_schema' then
489490
return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
490491
reason = last_error})
@@ -497,12 +498,13 @@ local function create_transport(host, port, user, password, callback,
497498
local id = next_request_id
498499
method_encoder[method](send_buf, id, ...)
499500
next_request_id = next_id(id)
500-
-- Request in most cases has maximum 8 members:
501-
-- method, buffer, id, cond, errno, response, on_push,
502-
-- on_push_ctx.
503-
local request = setmetatable(table_new(0, 8), request_mt)
501+
-- Request in most cases has maximum 9 members:
502+
-- method, buffer, skip_header, id, cond, errno, response,
503+
-- on_push, on_push_ctx.
504+
local request = setmetatable(table_new(0, 9), request_mt)
504505
request.method = method
505506
request.buffer = buffer
507+
request.skip_header = skip_header
506508
request.id = id
507509
request.cond = fiber.cond()
508510
requests[id] = request
@@ -516,10 +518,11 @@ local function create_transport(host, port, user, password, callback,
516518
-- @retval nil, error Error occured.
517519
-- @retval not nil Response object.
518520
--
519-
local function perform_request(timeout, buffer, method, on_push,
520-
on_push_ctx, ...)
521+
local function perform_request(timeout, buffer, skip_header, method,
522+
on_push, on_push_ctx, ...)
521523
local request, err =
522-
perform_async_request(buffer, method, on_push, on_push_ctx, ...)
524+
perform_async_request(buffer, skip_header, method, on_push,
525+
on_push_ctx, ...)
523526
if not request then
524527
return nil, err
525528
end
@@ -551,6 +554,15 @@ local function create_transport(host, port, user, password, callback,
551554
if buffer ~= nil then
552555
-- Copy xrow.body to user-provided buffer
553556
local body_len = body_end - body_rpos
557+
if request.skip_header then
558+
-- Skip {[IPROTO_DATA_KEY] = ...} wrapper.
559+
local map_len, key
560+
map_len, body_rpos = decode_map_header(body_rpos, body_len)
561+
assert(map_len == 1)
562+
key, body_rpos = decode(body_rpos)
563+
assert(key == IPROTO_DATA_KEY)
564+
body_len = body_end - body_rpos
565+
end
554566
local wpos = buffer:alloc(body_len)
555567
ffi.copy(wpos, body_rpos, body_len)
556568
body_len = tonumber(body_len)
@@ -1047,18 +1059,19 @@ end
10471059

10481060
function remote_methods:_request(method, opts, ...)
10491061
local transport = self._transport
1050-
local on_push, on_push_ctx, buffer, deadline
1062+
local on_push, on_push_ctx, buffer, skip_header, deadline
10511063
-- Extract options, set defaults, check if the request is
10521064
-- async.
10531065
if opts then
10541066
buffer = opts.buffer
1067+
skip_header = opts.skip_header
10551068
if opts.is_async then
10561069
if opts.on_push or opts.on_push_ctx then
10571070
error('To handle pushes in an async request use future:pairs()')
10581071
end
10591072
local res, err =
1060-
transport.perform_async_request(buffer, method, table.insert,
1061-
{}, ...)
1073+
transport.perform_async_request(buffer, skip_header, method,
1074+
table.insert, {}, ...)
10621075
if err then
10631076
box.error(err)
10641077
end
@@ -1084,8 +1097,9 @@ function remote_methods:_request(method, opts, ...)
10841097
transport.wait_state('active', timeout)
10851098
timeout = deadline and max(0, deadline - fiber_clock())
10861099
end
1087-
local res, err = transport.perform_request(timeout, buffer, method,
1088-
on_push, on_push_ctx, ...)
1100+
local res, err = transport.perform_request(timeout, buffer, skip_header,
1101+
method, on_push, on_push_ctx,
1102+
...)
10891103
if err then
10901104
box.error(err)
10911105
end
@@ -1288,10 +1302,10 @@ function console_methods:eval(line, timeout)
12881302
end
12891303
if self.protocol == 'Binary' then
12901304
local loader = 'return require("console").eval(...)'
1291-
res, err = pr(timeout, nil, 'eval', nil, nil, loader, {line})
1305+
res, err = pr(timeout, nil, false, 'eval', nil, nil, loader, {line})
12921306
else
12931307
assert(self.protocol == 'Lua console')
1294-
res, err = pr(timeout, nil, 'inject', nil, nil, line..'$EOF$\n')
1308+
res, err = pr(timeout, nil, false, 'inject', nil, nil, line..'$EOF$\n')
12951309
end
12961310
if err then
12971311
box.error(err)

0 commit comments

Comments
 (0)