Skip to content

Commit b18c56e

Browse files
Added support for XREADGROUP command (#1327)
* Codestyle changes related to php-cs-fixer update (#1311) * Codestyle changes * Added missing type-hints * Added GETDEL command to KeyPrefixProcessor (#1306) * Added GETDEL command to KeyPrefixProcessor * Added test coverage * Codestyle fixes * Added timeout after FT.CREATE call * Added support for JSON.MERGE command (#1304) * Added support for JSON.MSET command (#1307) * Fixed subcommand test bug (#1313) * Update CHANGELOG.md * Update CHANGELOG.md * Added support for XGROUP container commands * Added support for XREADGROUP command * Fixed bug with incorrect multiple words processing (#1325) * Fixed bug with incorrect multiple words processing * Convert subcommand string to lower case * Update SubcommandStrategyResolver.php * Added test coverage * Codestyle fixes --------- Co-authored-by: Till Krüss <[email protected]> * Added split words handling * Fixed command id to be lowercase * Fixed test decorator * Marked test as realy incompatible * Added support for FUNCTION DUMP, FUNCTION FLUSH, FUNCTION RESTORE commands (#1332) * Added test case with multiple streams * Removed old files * Move back missing tests * Removed blank space * Added method signature --------- Co-authored-by: Till Krüss <[email protected]>
1 parent a5cc48d commit b18c56e

File tree

3 files changed

+236
-0
lines changed

3 files changed

+236
-0
lines changed

src/ClientInterface.php

+1
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,7 @@
284284
* @method int xlen(string $key)
285285
* @method array xrevrange(string $key, string $end, string $start, ?int $count = null)
286286
* @method array xrange(string $key, string $start, string $end, ?int $count = null)
287+
* @method array xreadgroup(string $group, string $consumer, ?int $count = null, ?int $blockMs = null, bool $noAck = false, string ...$keyOrId)
287288
* @method string xtrim(string $key, array|string $strategy, string $threshold, array $options = null)
288289
* @method int zadd(string $key, array $membersAndScoresDictionary)
289290
* @method int zcard(string $key)

src/Command/Redis/XREADGROUP.php

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Predis package.
5+
*
6+
* (c) 2009-2020 Daniele Alessandri
7+
* (c) 2021-2023 Till Krüss
8+
*
9+
* For the full copyright and license information, please view the LICENSE
10+
* file that was distributed with this source code.
11+
*/
12+
13+
namespace Predis\Command\Redis;
14+
15+
use Predis\Command\Command as RedisCommand;
16+
17+
class XREADGROUP extends RedisCommand
18+
{
19+
public function getId()
20+
{
21+
return 'XREADGROUP';
22+
}
23+
24+
public function setArguments(array $arguments)
25+
{
26+
$processedArguments = ['GROUP', $arguments[0], $arguments[1]];
27+
28+
if (count($arguments) >= 3 && null !== $arguments[2]) {
29+
array_push($processedArguments, 'COUNT', $arguments[2]);
30+
}
31+
32+
if (count($arguments) >= 4 && null !== $arguments[3]) {
33+
array_push($processedArguments, 'BLOCK', $arguments[3]);
34+
}
35+
36+
if (count($arguments) >= 5 && false !== $arguments[4]) {
37+
$processedArguments[] = 'NOACK';
38+
}
39+
40+
$processedArguments[] = 'STREAMS';
41+
$keyOrIds = array_slice($arguments, 5);
42+
43+
parent::setArguments(array_merge($processedArguments, $keyOrIds));
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Predis package.
5+
*
6+
* (c) 2009-2020 Daniele Alessandri
7+
* (c) 2021-2023 Till Krüss
8+
*
9+
* For the full copyright and license information, please view the LICENSE
10+
* file that was distributed with this source code.
11+
*/
12+
13+
namespace Predis\Command\Redis;
14+
15+
use Predis\Response\ServerException;
16+
17+
class XREADGROUP_Test extends PredisCommandTestCase
18+
{
19+
/**
20+
* {@inheritDoc}
21+
*/
22+
protected function getExpectedCommand(): string
23+
{
24+
return XREADGROUP::class;
25+
}
26+
27+
/**
28+
* {@inheritDoc}
29+
*/
30+
protected function getExpectedId(): string
31+
{
32+
return 'XREADGROUP';
33+
}
34+
35+
/**
36+
* @dataProvider argumentsProvider
37+
* @group disconnected
38+
*/
39+
public function testFilterArguments(array $actualArguments, array $expectedArguments): void
40+
{
41+
$command = $this->getCommand();
42+
$command->setArguments($actualArguments);
43+
44+
$this->assertSame($expectedArguments, $command->getArguments());
45+
}
46+
47+
/**
48+
* @group disconnected
49+
*/
50+
public function testParseResponse(): void
51+
{
52+
$this->assertSame(1, $this->getCommand()->parseResponse(1));
53+
}
54+
55+
/**
56+
* @group connected
57+
* @group relay-incompatible
58+
* @return void
59+
* @requiresRedisVersion >= 5.0.0
60+
*/
61+
public function testReadsFromGivenConsumerGroup(): void
62+
{
63+
$redis = $this->getClient();
64+
65+
$streamInitId = $redis->xadd('stream', ['field' => 'value']);
66+
$this->assertEquals('OK', $redis->xgroup->create('stream', 'group', $streamInitId));
67+
68+
$nextId = $redis->xadd('stream', ['newField' => 'newValue']);
69+
$expectedResponse = [
70+
[
71+
'stream',
72+
[
73+
[$nextId, ['newField', 'newValue']],
74+
],
75+
],
76+
];
77+
78+
$this->assertSame(
79+
$expectedResponse,
80+
$redis->xreadgroup(
81+
'group',
82+
'consumer',
83+
null,
84+
null,
85+
false,
86+
'stream',
87+
'>')
88+
);
89+
}
90+
91+
/**
92+
* @group connected
93+
* @group relay-incompatible
94+
* @return void
95+
* @requiresRedisVersion >= 5.0.0
96+
*/
97+
public function testReadsFromConsumerGroupFromMultipleStreams(): void
98+
{
99+
$redis = $this->getClient();
100+
101+
$streamInitId = $redis->xadd('stream', ['field' => 'value']);
102+
$this->assertEquals('OK', $redis->xgroup->create('stream', 'group', $streamInitId));
103+
104+
$anotherStreamInitId = $redis->xadd('another_stream', ['field' => 'value']);
105+
$this->assertEquals('OK', $redis->xgroup->create('another_stream', 'group', $anotherStreamInitId));
106+
107+
$nextId = $redis->xadd('stream', ['newField' => 'newValue']);
108+
$anotherNextId = $redis->xadd('another_stream', ['newField' => 'newValue']);
109+
110+
$expectedResponse = [
111+
[
112+
'stream',
113+
[
114+
[$nextId, ['newField', 'newValue']],
115+
],
116+
],
117+
[
118+
'another_stream',
119+
[
120+
[$anotherNextId, ['newField', 'newValue']],
121+
],
122+
],
123+
];
124+
125+
$this->assertSame(
126+
$expectedResponse,
127+
$redis->xreadgroup(
128+
'group',
129+
'consumer',
130+
null,
131+
null,
132+
false,
133+
'stream',
134+
'another_stream',
135+
'>',
136+
'>'
137+
)
138+
);
139+
}
140+
141+
/**
142+
* @group connected
143+
* @return void
144+
* @requiresRedisVersion >= 5.0.0
145+
*/
146+
public function testThrowsExceptionOnNonExistingConsumerGroupOrStream(): void
147+
{
148+
$redis = $this->getClient();
149+
150+
$this->expectException(ServerException::class);
151+
$this->expectExceptionMessage(
152+
"NOGROUP No such key 'stream' or consumer group 'group' in XREADGROUP with GROUP option"
153+
);
154+
155+
$redis->xreadgroup(
156+
'group',
157+
'consumer',
158+
null,
159+
null,
160+
false,
161+
'stream',
162+
'>');
163+
}
164+
165+
public function argumentsProvider(): array
166+
{
167+
return [
168+
'with default arguments' => [
169+
['group', 'consumer', null, null, false, 'stream', '0-0'],
170+
['GROUP', 'group', 'consumer', 'STREAMS', 'stream', '0-0'],
171+
],
172+
'with COUNT modifier' => [
173+
['group', 'consumer', 10, null, false, 'stream', '0-0'],
174+
['GROUP', 'group', 'consumer', 'COUNT', 10, 'STREAMS', 'stream', '0-0'],
175+
],
176+
'with BLOCK modifier' => [
177+
['group', 'consumer', null, 10, false, 'stream', '0-0'],
178+
['GROUP', 'group', 'consumer', 'BLOCK', 10, 'STREAMS', 'stream', '0-0'],
179+
],
180+
'with NOACK modifier' => [
181+
['group', 'consumer', null, null, true, 'stream', '0-0'],
182+
['GROUP', 'group', 'consumer', 'NOACK', 'STREAMS', 'stream', '0-0'],
183+
],
184+
'with all arguments' => [
185+
['group', 'consumer', 10, 10, true, 'stream', '0-0', '10-0'],
186+
['GROUP', 'group', 'consumer', 'COUNT', 10, 'BLOCK', 10, 'NOACK', 'STREAMS', 'stream', '0-0', '10-0'],
187+
],
188+
];
189+
}
190+
}

0 commit comments

Comments
 (0)