[BEAM-3446] Fixes RedisIO non-prefix read operations #4656
[BEAM-3446] Fixes RedisIO non-prefix read operations #4656vvarma wants to merge 1 commit intoapache:masterfrom
Conversation
BaseReadFn to abstract general jedis operations. Separated key fetch using prefix and get by key into serparate DoFn.
|
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 1 week if no further activity occurs. If you think that’s incorrect or this pull request requires a review, please simply write any comment. If closed, you can revive the PR at any time and @mention a reviewer or discuss it on the [email protected] list. Thank you for your contributions. |
|
the fix is in place @jbonofre please help review |
|
We have turned on autoformatting of the codebase, which causes small conflicts across the board. You can probably safely rebase and just keep your changes. Like this: Please ping me if you run into any difficulty. |
|
|
||
| return input | ||
| .apply(Create.of(keyPattern())) | ||
| .apply(ParDo.of(new ReadKeywsWithPattern(connectionConfiguration()))) |
There was a problem hiding this comment.
s/ReadKeywsWithPattern/ReadKeysWithPattern
|
|
||
| private final RedisConnectionConfiguration connectionConfiguration; | ||
| private abstract static class BaseReadFn<T> extends DoFn<String, T> { | ||
| protected final RedisConnectionConfiguration connectionConfiguration; |
| protected final RedisConnectionConfiguration connectionConfiguration; | ||
|
|
||
| private transient Jedis jedis; | ||
| protected transient Jedis jedis; |
| protected transient Jedis jedis; | ||
|
|
||
| public ReadFn(RedisConnectionConfiguration connectionConfiguration) { | ||
| public BaseReadFn(RedisConnectionConfiguration connectionConfiguration) { |
There was a problem hiding this comment.
remove public, in general it is a common Beam practice to restrict access as much as possible. You can use IntelliJ's analyze code to do this.
| ScanResult<String> scanResult = jedis.scan(cursor, scanParams); | ||
| List<String> keys = scanResult.getResult(); | ||
|
|
||
| Pipeline pipeline = jedis.pipelined(); |
There was a problem hiding this comment.
Question: What is the reason to remove pipelining in general, seems like if the approach of this PR is more composable, it would perform worse, won't it.
There was a problem hiding this comment.
The number of keys that need to be looked up in a given window or batch can vary. Ideally, we should have a configurable batch size and use MGET https://redis.io/commands/mget if wanted to optimize further.
Pipelining an entire window or batch can cause memory spikes in Redis depending on the number of keys being looked up, for the time being, to simplify things I removed pipeline.
|
Closing this pr, opened a new one with fixes and rebased. #5841 |
URL: https://issues.apache.org/jira/browse/BEAM-3446