Skip to content

Commit d0f37f0

Browse files
authored
Added temporary XREADGROUP_CLAIM command (#1608)
* Added temporary XREADGROUP_CLAIM command * Updated CHANGELOG.md * Added deprecation annotation
1 parent f11e855 commit d0f37f0

File tree

10 files changed

+471
-3
lines changed

10 files changed

+471
-3
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
### Added
55
- Added cluster support for `XADD`, `XDEL` and `XRANGE` (#1587)
66
- Added prefixable interface for `HEXPIRE` and `HEXPIRETIME` (#1592)
7+
- Added temporary XREADGROUP_CLAIM command (#1608)
78
- Added support for MSET command (#1610)
89

910
### Changed

src/ClientContextInterface.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@
300300
* @method $this xrange(string $key, string $start, string $end, ?int $count = null)
301301
* @method $this xread(int $count = null, int $block = null, array $streams = null, string ...$id)
302302
* @method $this xreadgroup(string $group, string $consumer, ?int $count = null, ?int $blockMs = null, bool $noAck = false, string ...$keyOrId)
303+
* @method $this xreadgroup_claim(string $group, string $consumer, array $keyIdDict, ?int $count = null, ?int $blockMs = null, bool $noAck = false, ?int $claim = null)
303304
* @method $this xsetid(string $key, string $lastId, ?int $entriesAdded = null, ?string $maxDeleteId = null)
304305
* @method $this xtrim(string $key, array|string $strategy, string $threshold, array $options = null)
305306
* @method $this zadd($key, array $membersAndScoresDictionary)

src/ClientInterface.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,7 @@
311311
* @method array xrange(string $key, string $start, string $end, ?int $count = null)
312312
* @method array|null xread(int $count = null, int $block = null, array $streams = null, string ...$id)
313313
* @method array xreadgroup(string $group, string $consumer, ?int $count = null, ?int $blockMs = null, bool $noAck = false, string ...$keyOrId)
314+
* @method array xreadgroup_claim(string $group, string $consumer, array $keyIdDict, ?int $count = null, ?int $blockMs = null, bool $noAck = false, ?int $claim = null)
314315
* @method Status xsetid(string $key, string $lastId, ?int $entriesAdded = null, ?string $maxDeleteId = null)
315316
* @method string xtrim(string $key, array|string $strategy, string $threshold, array $options = null)
316317
* @method int zadd(string $key, array $membersAndScoresDictionary)

src/Command/Redis/XGROUP.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212

1313
namespace Predis\Command\Redis;
1414

15-
use Predis\Command\Command as RedisCommand;
15+
use Predis\Command\PrefixableCommand as RedisCommand;
1616

1717
/**
1818
* @see https://redis.io/commands/?name=xgroup
@@ -78,4 +78,11 @@ private function setSetIdArguments(array $arguments): void
7878

7979
parent::setArguments($processedArguments);
8080
}
81+
82+
public function prefixKeys($prefix)
83+
{
84+
$arguments = $this->getArguments();
85+
$arguments[1] = $prefix . $arguments[1];
86+
$this->setRawArguments($arguments);
87+
}
8188
}

src/Command/Redis/XREADGROUP.php

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,12 @@
1212

1313
namespace Predis\Command\Redis;
1414

15-
use Predis\Command\Command as RedisCommand;
15+
use Predis\Command\PrefixableCommand as RedisCommand;
1616

17+
/**
18+
* @deprecated Public API will be changed in the next major version.
19+
* XREADGROUP_CLAIM API will be used instead.
20+
*/
1721
class XREADGROUP extends RedisCommand
1822
{
1923
public function getId()
@@ -58,4 +62,18 @@ public function parseResponse($data)
5862

5963
return $result;
6064
}
65+
66+
public function prefixKeys($prefix)
67+
{
68+
$arguments = $this->getArguments();
69+
$keyIdsStartingIndex = array_search('STREAMS', $arguments) + 1;
70+
$keysAndIdsCount = count($arguments) - $keyIdsStartingIndex;
71+
$keysCount = $keysAndIdsCount / 2;
72+
73+
for ($i = $keyIdsStartingIndex; $i < $keyIdsStartingIndex + $keysCount; $i++) {
74+
$arguments[$i] = $prefix . $arguments[$i];
75+
}
76+
77+
parent::setRawArguments($arguments);
78+
}
6179
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Predis package.
5+
*
6+
* (c) 2009-2020 Daniele Alessandri
7+
* (c) 2021-2025 Till Krüss
8+
*
9+
* For the full copyright and license information, please view the LICENSE
10+
* file that was distributed with this source code.
11+
*/
12+
13+
namespace Predis\Command\Redis;
14+
15+
use Predis\Command\PrefixableCommand as RedisCommand;
16+
17+
/**
18+
* This is a transitional command. In the next major version this command will replace XREADGROUP.
19+
*/
20+
class XREADGROUP_CLAIM extends RedisCommand
21+
{
22+
public function getId()
23+
{
24+
return 'XREADGROUP';
25+
}
26+
27+
public function setArguments(array $arguments)
28+
{
29+
$processedArguments = ['GROUP', $arguments[0], $arguments[1]];
30+
31+
if (count($arguments) >= 4 && null !== $arguments[3]) {
32+
array_push($processedArguments, 'COUNT', $arguments[3]);
33+
}
34+
35+
if (count($arguments) >= 5 && null !== $arguments[4]) {
36+
array_push($processedArguments, 'BLOCK', $arguments[4]);
37+
}
38+
39+
if (count($arguments) >= 6 && false !== $arguments[5]) {
40+
$processedArguments[] = 'NOACK';
41+
}
42+
43+
if (count($arguments) >= 7 && false !== $arguments[6]) {
44+
array_push($processedArguments, 'CLAIM', $arguments[6]);
45+
}
46+
47+
array_push($processedArguments, 'STREAMS', ...array_keys($arguments[2]), ...array_values($arguments[2]));
48+
49+
parent::setArguments($processedArguments);
50+
}
51+
52+
public function parseResponse($data)
53+
{
54+
if (!is_array($data) || $data === array_values($data)) {
55+
return $data;
56+
}
57+
58+
// Relay
59+
$result = [];
60+
foreach ($data as $key => $value) {
61+
$group = [$key, $value];
62+
$result[] = $group;
63+
}
64+
65+
return $result;
66+
}
67+
68+
public function prefixKeys($prefix)
69+
{
70+
$arguments = $this->getArguments();
71+
$keyIdsStartingIndex = array_search('STREAMS', $arguments) + 1;
72+
$keysAndIdsCount = count($arguments) - $keyIdsStartingIndex;
73+
$keysCount = $keysAndIdsCount / 2;
74+
75+
for ($i = $keyIdsStartingIndex; $i < $keyIdsStartingIndex + $keysCount; $i++) {
76+
$arguments[$i] = $prefix . $arguments[$i];
77+
}
78+
79+
parent::setRawArguments($arguments);
80+
}
81+
}

tests/PHPUnit/PredisTestCase.php

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
use PHPUnit\Util\Test as TestUtil;
1717
use Predis\Client;
1818
use Predis\Command;
19+
use Predis\Command\Processor\KeyPrefixProcessor;
1920
use Predis\Connection;
2021

2122
/**
@@ -262,8 +263,14 @@ protected function createClient(?array $parameters = null, ?array $options = nul
262263
$parameters ?: []
263264
);
264265

266+
$commandsFactory = $this->getCommandFactory();
267+
268+
if (null !== $options && array_key_exists('prefix', $options)) {
269+
$commandsFactory->setProcessor(new KeyPrefixProcessor($options['prefix']));
270+
}
271+
265272
$options = array_merge(
266-
['commands' => $this->getCommandFactory()],
273+
['commands' => $commandsFactory],
267274
$options ?: [],
268275
getenv('USE_RELAY') ? ['connections' => 'relay'] : []
269276
);

tests/Predis/Command/Processor/KeyPrefixProcessorTest.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -808,6 +808,18 @@ public function commandArgumentsDataProvider(): array
808808
['key', 'MAXLEN', 100],
809809
['prefix:key', 'MAXLEN', 100],
810810
],
811+
['XGROUP',
812+
['CREATE', 'key', 'group', '$'],
813+
['CREATE', 'prefix:key', 'group', '$'],
814+
],
815+
['XREADGROUP',
816+
['group', 'consumer', 10, 10, true, 'stream', 'stream1', '0-0', '0-0'],
817+
['GROUP', 'group', 'consumer', 'COUNT', 10, 'BLOCK', 10, 'NOACK', 'STREAMS', 'prefix:stream', 'prefix:stream1', '0-0', '0-0'],
818+
],
819+
['XREADGROUP_CLAIM',
820+
['group', 'consumer', ['stream' => '0-0', 'stream1' => '0-0'], 10, 10, true, 10],
821+
['GROUP', 'group', 'consumer', 'COUNT', 10, 'BLOCK', 10, 'NOACK', 'CLAIM', 10, 'STREAMS', 'prefix:stream', 'prefix:stream1', '0-0', '0-0'],
822+
],
811823
['ZPOPMIN',
812824
['key'],
813825
['prefix:key'],

0 commit comments

Comments
 (0)