Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.

### Added

- Ability to grant tubes to roles. All default tube drivers are supported (#249).
Comment thread
oleg-jukovec marked this conversation as resolved.
Example: `queue.tube.my_tube:grant_role('queue_worker', {call = true})`

### Changed

### Fixed
Expand Down
28 changes: 28 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ align="right">
* [Initialization](#initialization)
* [Get the module version](#get-the-module-version)
* [Creating a new queue](#creating-a-new-queue)
* [Granting access to a queue](#granting-access-to-a-queue)
* [Set queue settings](#set-queue-settings)
* [Session identify](#session-identify)
* [Putting a task in a queue](#putting-a-task-in-a-queue)
Expand Down Expand Up @@ -529,6 +530,33 @@ created.

Example: `queue.create_tube('list_of_sites', 'fifo', {temporary = true})`

## Granting access to a queue

```lua
local tube = queue.create_tube('test_tube', 'fifo')
tube:grant(user name [, {options} ])
```

This function grants the user enough privileges to work with the tube using the
Lua API (tube:put(), tube:take(), tube:ack(), etc.) when the code is executed
inside Tarantool under that user.

Available `options`:
* `call` - boolean - additionally grant permissions required to use the tube via remote
calls (`net.box:call()`). When enabled, the module creates and grants execute privileges
for `queue.identify`, `queue.statistics`, and `queue.tube.<tube_name>:*` functions.
* `truncate` - boolean - additionally grant permission to truncate the tube via
`queue.tube.<tube_name>:truncate` function. Useful when you want to allow clearing the
tube contents.

```lua
local tube = queue.create_tube('test_tube', 'fifo')
tube:grant_role(role name [, {options} ])
```

Same function as `tube:grant()`, but grants privileges to a role instead of a user.
Any user who has this role will be able to access the tube. The options are the same.

## Set queue settings

```lua
Expand Down
90 changes: 59 additions & 31 deletions queue/abstract.lua
Original file line number Diff line number Diff line change
Expand Up @@ -324,51 +324,79 @@ function tube.on_task_change(self, cb)
return old_cb
end

function tube.grant(self, user, args)
if not check_state("grant") then
return
local function tube_grant_space(grantor, grantee, name, tp)
grantor.grant(grantee, tp or 'read,write', 'space', name, {
if_not_exists = true,
})
end

local function tube_grant_func(grantor, grantee, name)
box.schema.func.create(name, { if_not_exists = true })
grantor.grant(grantee, 'execute', 'function', name, {
if_not_exists = true
})
end

local function grant_system_spaces(grantor, grantee)
tube_grant_space(grantor, grantee, '_queue', 'read')
tube_grant_space(grantor, grantee, '_queue_consumers')
tube_grant_space(grantor, grantee, '_queue_taken_2')
end

local function grant_args(grantor, grantee, args, name)
if args.call then
tube_grant_func(grantor, grantee, 'queue.identify')
tube_grant_func(grantor, grantee, 'queue.statistics')
local prefix = (args.prefix or 'queue.tube') .. ('.%s:'):format(name)
tube_grant_func(grantor, grantee, prefix .. 'put')
tube_grant_func(grantor, grantee, prefix .. 'take')
tube_grant_func(grantor, grantee, prefix .. 'touch')
tube_grant_func(grantor, grantee, prefix .. 'ack')
tube_grant_func(grantor, grantee, prefix .. 'release')
tube_grant_func(grantor, grantee, prefix .. 'peek')
tube_grant_func(grantor, grantee, prefix .. 'bury')
tube_grant_func(grantor, grantee, prefix .. 'kick')
tube_grant_func(grantor, grantee, prefix .. 'delete')
end
local function tube_grant_space(user, name, tp)
box.schema.user.grant(user, tp or 'read,write', 'space', name, {
if_not_exists = true,
})

if args.truncate then
local prefix = (args.prefix or 'queue.tube') .. ('.%s:'):format(name)
tube_grant_func(grantor, grantee, prefix .. 'truncate')
end
end

local function tube_grant_func(user, name)
box.schema.func.create(name, { if_not_exists = true })
box.schema.user.grant(user, 'execute', 'function', name, {
if_not_exists = true
})
function tube.grant(self, user, args)
if not check_state("grant") then
return
end

args = args or {}

tube_grant_space(user, '_queue', 'read')
tube_grant_space(user, '_queue_consumers')
tube_grant_space(user, '_queue_taken_2')
grant_system_spaces(box.schema.user, user)

self.raw:grant(user, {if_not_exists = true})
session.grant(user)

if args.call then
tube_grant_func(user, 'queue.identify')
tube_grant_func(user, 'queue.statistics')
local prefix = (args.prefix or 'queue.tube') .. ('.%s:'):format(self.name)
tube_grant_func(user, prefix .. 'put')
tube_grant_func(user, prefix .. 'take')
tube_grant_func(user, prefix .. 'touch')
tube_grant_func(user, prefix .. 'ack')
tube_grant_func(user, prefix .. 'release')
tube_grant_func(user, prefix .. 'peek')
tube_grant_func(user, prefix .. 'bury')
tube_grant_func(user, prefix .. 'kick')
tube_grant_func(user, prefix .. 'delete')
grant_args(box.schema.user, user, args, self.name)
end

function tube.grant_role(self, role, args)
if not check_state("grant") then
return
end

if args.truncate then
local prefix = (args.prefix or 'queue.tube') .. ('.%s:'):format(self.name)
tube_grant_func(user, prefix .. 'truncate')
args = args or {}

if self.raw.grant_role ~= nil then
self.raw:grant_role(role, { if_not_exists = true })
else
error(('Tube %s driver does not support grant_role()'):format(self.name))
end

grant_system_spaces(box.schema.role, role)
session.grant_role(role)

grant_args(box.schema.role, role, args, self.name)
end

-- methods
Expand Down
4 changes: 4 additions & 0 deletions queue/abstract/driver/fifo.lua
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ function method.grant(self, user, opts)
box.schema.user.grant(user, 'read,write', 'space', self.space.name, opts)
end

function method.grant_role(self, role, opts)
box.schema.role.grant(role, 'read,write', 'space', self.space.name, opts)
end

-- normalize task: cleanup all internal fields
function method.normalize_task(self, task)
return task
Expand Down
4 changes: 4 additions & 0 deletions queue/abstract/driver/fifottl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,10 @@ function method.grant(self, user, opts)
box.schema.user.grant(user, 'read,write', 'space', self.space.name, opts)
end

function method.grant_role(self, role, opts)
box.schema.role.grant(role, 'read,write', 'space', self.space.name, opts)
end

-- cleanup internal fields in task
function method.normalize_task(self, task)
return task and task:transform(3, 5)
Expand Down
8 changes: 8 additions & 0 deletions queue/abstract/driver/utube.lua
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,14 @@ function method.grant(self, user, opts)
end
end

function method.grant_role(self, role, opts)
box.schema.role.grant(role, 'read,write', 'space', self.space.name, opts)
if self.space_ready_buffer ~= nil then
box.schema.role.grant(role, 'read,write', 'space',
self.space_ready_buffer.name, opts)
end
end

-- normalize task: cleanup all internal fields
function method.normalize_task(self, task)
return task and task:transform(3, 1)
Expand Down
8 changes: 8 additions & 0 deletions queue/abstract/driver/utubettl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,14 @@ function method.grant(self, user, opts)
end
end

function method.grant_role(self, role, opts)
box.schema.role.grant(role, 'read,write', 'space', self.space.name, opts)
if self.space_ready_buffer ~= nil then
box.schema.role.grant(role, 'read,write', 'space',
self.space_ready_buffer.name, opts)
end
end

-- cleanup internal fields in task
function method.normalize_task(self, task)
return task and task:transform(i_next_event, i_data - i_next_event)
Expand Down
8 changes: 8 additions & 0 deletions queue/abstract/queue_session.lua
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,13 @@ local function grant(user)
{ if_not_exists = true })
end

local function grant_role(role)
box.schema.role.grant(role, 'read, write', 'space', '_queue_session_ids',
{ if_not_exists = true })
box.schema.role.grant(role, 'read, write', 'space', '_queue_shared_sessions',
{ if_not_exists = true })
end

local function start()
identification_init()
queue_session.sync_chan = fiber.channel()
Expand Down Expand Up @@ -349,6 +356,7 @@ local method = {
identify = identify,
disconnect = disconnect,
grant = grant,
grant_role = grant_role,
on_session_remove = on_session_remove,
start = start,
stop = stop,
Expand Down
Loading
Loading