Skip to content

[BEAM-3446] Fixes RedisIO non-prefix read operations#5841

Merged
iemejia merged 8 commits intoapache:masterfrom
vvarma:i3446rebase
Sep 19, 2018
Merged

[BEAM-3446] Fixes RedisIO non-prefix read operations#5841
iemejia merged 8 commits intoapache:masterfrom
vvarma:i3446rebase

Conversation

@vvarma
Copy link
Copy Markdown
Contributor

@vvarma vvarma commented Jun 30, 2018

Rebase of #4656

BaseReadFn to abstract general jedis operations.
Moved key fetch given prefix to ReadKeywsWithPattern DoFn.
ReadFn is pure fetch from redis given key.
URL: https://issues.apache.org/jira/browse/BEAM-3446
@iemejia @jbonofre

It will help us expedite review of your Pull Request if you tag someone (e.g. @username) to look at it.

Post-Commit Tests Status (on master branch)

Lang SDK Apex Dataflow Flink Gearpump Samza Spark
Go Build Status --- --- --- --- --- ---
Java Build Status Build Status Build Status Build Status Build Status Build Status Build Status
Python Build Status --- Build Status
Build Status
--- --- --- ---

@iemejia
Copy link
Copy Markdown
Member

iemejia commented Jul 2, 2018

Run Java Precommit

public void processElement(ProcessContext processContext) throws Exception {
String key = processContext.element();

String value = jedis.get(key);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in the previous PR I am a bit concerned about losing the multiple data request capability, any chance you can work on this with the approach you mentioned based on MGET for ReadFn.
The simplest approach probably is to do like other IOs and have a default size that can be parametrized via a withBatchSize method. WDYT ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I think it should be straightforward, will update the pr over the week.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iemejia right now there are two operators exposed for read operations

  1. ReadKeysWithPattern - which is 2 stepped - a ) for each pattern prefix in the PCollection fetch all matching keys b) fetch values for each key
  2. ReadFn - which gets values for each key in the PCollection.

I can make changes to ReadKeysWithPattern to use a parameterized batch to fetch the data in the b step, but for ReadFn I am not sure of how to use the batch parameter.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, sorry I have missed your message. The idea is that we should add the DoFn startBundle and finishBundle methods and create a method in the Read to define the size of the maximum amount of elements that we will request, then you will build the collection of the keys that are going to be requested in the processElement, but you won't do the request in the processElement but in the finishBundle method by doing a MGET request with the defined number of elements of the batch, we should choose a default min size e.g. 1000. It is similar to what other IOs do in the Write (see withBatchSize in ElasticsearchIO or SolrIO, for ref.

@StartBundle
public void startBundle(StartBundleContext context) {
batch = new ArrayList<>();
}
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
SolrInputDocument document = context.element();
batch.add(document);
if (batch.size() >= spec.getMaxBatchSize()) {
flushBatch();
}
}
@FinishBundle
public void finishBundle(FinishBundleContext context) throws Exception {
flushBatch();
}
// Flushes the batch, implementing the retry mechanism as configured in the spec.
private void flushBatch() throws IOException, InterruptedException {
if (batch.isEmpty()) {
return;
}
try {

@iemejia iemejia self-requested a review July 2, 2018 13:12
@vvarma vvarma requested a review from jbonofre as a code owner July 14, 2018 17:44
public void finishBundle(FinishBundleContext context) throws Exception {
List<KV<String, String>> kvs = fetchAndFlush();
for (KV<String, String> kv : kvs) {
context.output(kv, lastMsg, window);
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iemejia Not sure about this since I am using the Instant and window from the last processed message to produce output in the finish bundle method.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh so silly of me I have misread the motivation on keeping the window, you are right, it makes total sense, in that case probably it is a better idea to store the elements in a Map with the window as key and the list of elements and use the window.maxTimeStamp (you don't need the lastMsg) and flush when enough elements, Similar to what is done here (but with the count logic):

private static class GatherBundlesPerWindowFn<T> extends DoFn<T, List<T>> {
@Nullable private transient Multimap<BoundedWindow, T> bundles = null;
@StartBundle
public void startBundle() {
bundles = ArrayListMultimap.create();
}
@ProcessElement
public void process(ProcessContext c, BoundedWindow w) {
bundles.put(w, c.element());
}
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
for (BoundedWindow w : bundles.keySet()) {
c.output(Lists.newArrayList(bundles.get(w)), w.maxTimestamp(), w);
}
}
}

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though here, messages with the same window are being bundled together, stored and finally processed in the finishBundle.
In the case of ReadFn, the idea was to buffer requests till the batch size and process them at that time. Hence the output is pushed in both processElement method as well as finishBundle.
Not sure if I understand how to use a map with window as the key.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iemejia could you please advice on the above ?

@iemejia iemejia changed the title Fixes https://issues.apache.org/jira/browse/BEAM-3446. [BEAM-3446] Fixes RedisIO non-prefix read operations Jul 19, 2018
Copy link
Copy Markdown
Member

@iemejia iemejia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost ready, please excuse me for being so slow with this review, many things at the same time taking my bandwith. Please take a look at my comment I think if you could address this it is almost GTM. Also can you please squash your commits + rename the commit properly (as the PR title that I just changed). Thanks.

public void finishBundle(FinishBundleContext context) throws Exception {
List<KV<String, String>> kvs = fetchAndFlush();
for (KV<String, String> kv : kvs) {
context.output(kv, lastMsg, window);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh so silly of me I have misread the motivation on keeping the window, you are right, it makes total sense, in that case probably it is a better idea to store the elements in a Map with the window as key and the list of elements and use the window.maxTimeStamp (you don't need the lastMsg) and flush when enough elements, Similar to what is done here (but with the count logic):

private static class GatherBundlesPerWindowFn<T> extends DoFn<T, List<T>> {
@Nullable private transient Multimap<BoundedWindow, T> bundles = null;
@StartBundle
public void startBundle() {
bundles = ArrayListMultimap.create();
}
@ProcessElement
public void process(ProcessContext c, BoundedWindow w) {
bundles.put(w, c.element());
}
@FinishBundle
public void finishBundle(FinishBundleContext c) throws Exception {
for (BoundedWindow w : bundles.keySet()) {
c.output(Lists.newArrayList(bundles.get(w)), w.maxTimestamp(), w);
}
}
}

@vvarma
Copy link
Copy Markdown
Contributor Author

vvarma commented Jul 26, 2018

Sure will make the change. Sorry about the delay.

@iemejia
Copy link
Copy Markdown
Member

iemejia commented Jul 26, 2018

No problem, thanks a lot for taking care of thism we are really close.
I just wanted to bring awareness of another PR on Redis #6045 conceptually there seems not to be a conflict but good to think on any impact.

@jbonofre
Copy link
Copy Markdown
Member

I gonna do the review as well.

@iemejia
Copy link
Copy Markdown
Member

iemejia commented Aug 20, 2018

Hi @vvarma it seems the changes on the other PR produced a conflict. Can you please rebase so we can merge this one (+ add the minor fixes of the review). Thanks!

@iemejia
Copy link
Copy Markdown
Member

iemejia commented Sep 3, 2018

Just pinging about the status on this one @vvarma we are quite close, so hopefully you can fix the last bits so we can merge it. Sorry if this has taken too long.

@vvarma
Copy link
Copy Markdown
Contributor Author

vvarma commented Sep 3, 2018

Hi @iemejia , I have made the change requested. I used the window as you suggested. Please let me know if they are as expected. Apologies for the delay.

@huygaa11
Copy link
Copy Markdown
Contributor

Friendly ping for review!

@jbonofre
Copy link
Copy Markdown
Member

@huygaa11 sorry, I forgot. Resuming my review.

Copy link
Copy Markdown
Member

@iemejia iemejia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, i took a quicklook, looks almost done, thanks, just two questions from a quicklook (nothing major, just things that I didn't understand immediately).

String key = processContext.element();
bundles.put(window, key);
if (batchCount.incrementAndGet() > getBatchSize()) {
Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why you need to deal with windows here ? (note I quickly looked but didn't get the intuition), if we can avoid this probably is better, no?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the window stored in the key here is used in FinishBundle to output the keys, since the the context in FinishBundle takes window as a parameter. context.output(kv, w.maxTimestamp(), w);


@FinishBundle
public void finishBundle(FinishBundleContext context) throws Exception {
Multimap<BoundedWindow, KV<String, String>> kvs = fetchAndFlush();
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this extra flush needed?, without an equivalent startBundle I don't see why this could be needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the reason for this extra flush is because we have a batch size. Once the number of messages reaches this value, we invoke flush.
at the end of the window when finishbundle is invoked, there may be few messages left in the buffer (less than batch size). So we invoke flush from here as well. And for this reason we need to store the window of the message as well.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for answering, I have somehow misread the startBundle as a setup only method. I see how everything fits now.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @iemejia . Do suggest if there any other changes needed.

Copy link
Copy Markdown
Member

@iemejia iemejia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, sorry for taking so long. I still think we can simplify a bit the iteration + maybe doing a method for the repeated flush part but we can address this in the future (not a blocker for merge). Thanks a lot @vvarma and sorry for the delay.

@iemejia iemejia merged commit b7c2975 into apache:master Sep 19, 2018
@vvarma
Copy link
Copy Markdown
Contributor Author

vvarma commented Sep 19, 2018

@iemejia Thank you!

@vvarma vvarma deleted the i3446rebase branch September 19, 2018 15:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants