[BEAM-3446] Fixes RedisIO non-prefix read operations#5841
[BEAM-3446] Fixes RedisIO non-prefix read operations#5841iemejia merged 8 commits intoapache:masterfrom
Conversation
|
Run Java Precommit |
| public void processElement(ProcessContext processContext) throws Exception { | ||
| String key = processContext.element(); | ||
|
|
||
| String value = jedis.get(key); |
There was a problem hiding this comment.
As mentioned in the previous PR I am a bit concerned about losing the multiple data request capability, any chance you can work on this with the approach you mentioned based on MGET for ReadFn.
The simplest approach probably is to do like other IOs and have a default size that can be parametrized via a withBatchSize method. WDYT ?
There was a problem hiding this comment.
Sure, I think it should be straightforward, will update the pr over the week.
There was a problem hiding this comment.
@iemejia right now there are two operators exposed for read operations
- ReadKeysWithPattern - which is 2 stepped - a ) for each pattern prefix in the PCollection fetch all matching keys b) fetch values for each key
- ReadFn - which gets values for each key in the PCollection.
I can make changes to ReadKeysWithPattern to use a parameterized batch to fetch the data in the b step, but for ReadFn I am not sure of how to use the batch parameter.
There was a problem hiding this comment.
Hi, sorry I have missed your message. The idea is that we should add the DoFn startBundle and finishBundle methods and create a method in the Read to define the size of the maximum amount of elements that we will request, then you will build the collection of the keys that are going to be requested in the processElement, but you won't do the request in the processElement but in the finishBundle method by doing a MGET request with the defined number of elements of the batch, we should choose a default min size e.g. 1000. It is similar to what other IOs do in the Write (see withBatchSize in ElasticsearchIO or SolrIO, for ref.
beam/sdks/java/io/solr/src/main/java/org/apache/beam/sdk/io/solr/SolrIO.java
Lines 805 to 829 in c14c975
| public void finishBundle(FinishBundleContext context) throws Exception { | ||
| List<KV<String, String>> kvs = fetchAndFlush(); | ||
| for (KV<String, String> kv : kvs) { | ||
| context.output(kv, lastMsg, window); |
There was a problem hiding this comment.
@iemejia Not sure about this since I am using the Instant and window from the last processed message to produce output in the finish bundle method.
There was a problem hiding this comment.
Oh so silly of me I have misread the motivation on keeping the window, you are right, it makes total sense, in that case probably it is a better idea to store the elements in a Map with the window as key and the list of elements and use the window.maxTimeStamp (you don't need the lastMsg) and flush when enough elements, Similar to what is done here (but with the count logic):
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
Lines 825 to 844 in 70b6531
There was a problem hiding this comment.
Though here, messages with the same window are being bundled together, stored and finally processed in the finishBundle.
In the case of ReadFn, the idea was to buffer requests till the batch size and process them at that time. Hence the output is pushed in both processElement method as well as finishBundle.
Not sure if I understand how to use a map with window as the key.
There was a problem hiding this comment.
@iemejia could you please advice on the above ?
iemejia
left a comment
There was a problem hiding this comment.
Almost ready, please excuse me for being so slow with this review, many things at the same time taking my bandwith. Please take a look at my comment I think if you could address this it is almost GTM. Also can you please squash your commits + rename the commit properly (as the PR title that I just changed). Thanks.
| public void finishBundle(FinishBundleContext context) throws Exception { | ||
| List<KV<String, String>> kvs = fetchAndFlush(); | ||
| for (KV<String, String> kv : kvs) { | ||
| context.output(kv, lastMsg, window); |
There was a problem hiding this comment.
Oh so silly of me I have misread the motivation on keeping the window, you are right, it makes total sense, in that case probably it is a better idea to store the elements in a Map with the window as key and the list of elements and use the window.maxTimeStamp (you don't need the lastMsg) and flush when enough elements, Similar to what is done here (but with the count logic):
beam/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
Lines 825 to 844 in 70b6531
|
Sure will make the change. Sorry about the delay. |
|
No problem, thanks a lot for taking care of thism we are really close. |
|
I gonna do the review as well. |
|
Hi @vvarma it seems the changes on the other PR produced a conflict. Can you please rebase so we can merge this one (+ add the minor fixes of the review). Thanks! |
|
Just pinging about the status on this one @vvarma we are quite close, so hopefully you can fix the last bits so we can merge it. Sorry if this has taken too long. |
BaseReadFn to abstract general jedis operations. Separated key fetch using prefix and get by key into serparate DoFn.
…d sequentially pushing to output collector
|
Hi @iemejia , I have made the change requested. I used the window as you suggested. Please let me know if they are as expected. Apologies for the delay. |
|
Friendly ping for review! |
|
@huygaa11 sorry, I forgot. Resuming my review. |
iemejia
left a comment
There was a problem hiding this comment.
Hi, i took a quicklook, looks almost done, thanks, just two questions from a quicklook (nothing major, just things that I didn't understand immediately).
| String key = processContext.element(); | ||
| bundles.put(window, key); | ||
| if (batchCount.incrementAndGet() > getBatchSize()) { | ||
| Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush(); |
There was a problem hiding this comment.
why you need to deal with windows here ? (note I quickly looked but didn't get the intuition), if we can avoid this probably is better, no?
There was a problem hiding this comment.
the window stored in the key here is used in FinishBundle to output the keys, since the the context in FinishBundle takes window as a parameter. context.output(kv, w.maxTimestamp(), w);
|
|
||
| @FinishBundle | ||
| public void finishBundle(FinishBundleContext context) throws Exception { | ||
| Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush(); |
There was a problem hiding this comment.
Is this extra flush needed?, without an equivalent startBundle I don't see why this could be needed.
There was a problem hiding this comment.
the reason for this extra flush is because we have a batch size. Once the number of messages reaches this value, we invoke flush.
at the end of the window when finishbundle is invoked, there may be few messages left in the buffer (less than batch size). So we invoke flush from here as well. And for this reason we need to store the window of the message as well.
There was a problem hiding this comment.
Thanks for answering, I have somehow misread the startBundle as a setup only method. I see how everything fits now.
There was a problem hiding this comment.
Thanks @iemejia . Do suggest if there any other changes needed.
iemejia
left a comment
There was a problem hiding this comment.
LGTM, sorry for taking so long. I still think we can simplify a bit the iteration + maybe doing a method for the repeated flush part but we can address this in the future (not a blocker for merge). Thanks a lot @vvarma and sorry for the delay.
|
@iemejia Thank you! |
Rebase of #4656
BaseReadFn to abstract general jedis operations.
Moved key fetch given prefix to ReadKeywsWithPattern DoFn.
ReadFn is pure fetch from redis given key.
URL: https://issues.apache.org/jira/browse/BEAM-3446
@iemejia @jbonofre
It will help us expedite review of your Pull Request if you tag someone (e.g.
@username) to look at it.Post-Commit Tests Status (on master branch)