PARQUET-116: Pass a filter object to user defined predicate in filter2 api#73
PARQUET-116: Pass a filter object to user defined predicate in filter2 api#73saucam wants to merge 15 commits intoapache:masterfrom
Conversation
|
@julienledem, please advise . I have tested this by passing a custom filter for where in clause query (select * from t1 where c1 in (f1,f2,f3) ) in spark sql . |
|
@isnotinvain should probably review as well. |
|
Hello, thanks for the PR! I get the motivation for this and you're right that we need to fix it, but I wonder if this is the right API. One problem with that is, currently parquet-column doesn't depend on hadoop, so we'd be breaking that separation. And if we come up with our own configuration / context object, it will be yet another configuration floating around the codebase... The way scalding solves this problem on the other hand, is it doesn't load classes via reflection, it serializes instances, so instead your UDP could contain some state (eg the HashSet) and that would get serialized + shipped. The downside there is, we don't currently do anything like that in parquet, so we'd probably be adding a dependency on kryo or chill or something like that. The way this PR is written however, I'm not sure it will work if I understand correctly. The FilterPredicate needs to be serialized into the hadoop configuration so that it can be read remotely. Currently this isn't a problem, because all of the nodes in a predicate tree implement If we do decide to take this approach, I think we should at least add type bounds, and add a generic parameter to UserDefinedPredicate, eg: |
|
thanks for the input. I understand the problem. What do you think will be the best approach ? Its easy to add type bounds, also in case of accepting an instance of UDP , the api will change ... |
|
Another option we might want to consider is an API that delegates serializing predicate data to the predicate class. We implemented this in Kite as A registered predicate registers a name, like "contains", and a factory method that will produce instances from a String, which is formatted by the predicate class. In addition, predicate class has a method that converts its data to such a string (we use |
|
Enforced that a serializable object can only be passed as a filter object to UserDefinedPredicate. Please suggest changes/mods as appropriate |
|
I like the idea of adding a String that will be kept with the object. It leaves the serialization decisions to the UDP writer, and it maps easily into the fact that we ultimately have to stuff this into the hadoop configuration object without exposing the user to the hadoop configuration object. We could take a similar approach to Pig and support a constructor that takes a string as an argument, or we can just have a getter + setter for the String in the UDP. This approach is very similar to how this PR is written already -- we could also just stick with Serializable and leave it up to the UDP writer to ensure that their Serializable is actually serializable. (String is Serializable anyway). |
There was a problem hiding this comment.
lets change the above to:
public abstract class UserDefinedPredicate<T extends Comparable<T>, S extends Serializable> {
public UserDefinedPredicate(S state) { }
then the keep method doesn't need to take an extra arg?
There was a problem hiding this comment.
If we change the constructor, we need to change the reflective invocation part of the class 's default constructor as well. Is that ok ?
public U getUserDefinedPredicate() {
try {
return udpClass.newInstance();
} catch (InstantiationException e) {
throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e);
} catch (IllegalAccessException e) {
throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e);
}
… user defined predicate
|
Added generic parameter for the filter object , bound by Serializable |
|
Thanks for updating and for fixing this. Lets get the naming consistent -- I think filter object is ambiguous (a UDP is a filter object after all). Let's just call this a also instead of adding a parameter to the |
I don't understand. A String is Serializable, but a Serializable isn't necessarily a String. What I was proposing was to allow the UDP to serialize its data as a String and pass that String to the UDP when initializing. We can pass the data to the UDP using a method that accepts Serializable, but don't we also need the to-string part to be added? |
|
I was saying if you're going to go through the trouble of keeping a companion state object around for a UDP (in your description a String), it's not that different to generalize to a Serializable instead of limiting to only a String. The upside of using String is parquet doesn't have to get in the business of serializing things, it just has to shuffle a string around and give it back to the UDP at the right time, and parquet never has to worry about the String not being serializable... because it's a String. The downside is every UDP has to have some code in it for serializing its state into a String. The other option I was mentioning, which is how this PR is currently written, is exactly the same as your suggestion, except the state object isn't a String it's any Serializable. The upside here is you can add a type param to UDP and UDP writers don't have to write any serialization code, the downside is parquet has to deal with the fact that just because a class implements Serializable doesn't mean that it can actually be serialized. I don't feel that strongly either way. |
…ethod in udp predicate
|
|
Thanks for clarifying. I see your point that we can think of the Serializable and String doing the same work, with String being a more restrictive case. I think that it is good to have the more restrictive case because it means we have a lot more flexibility when sending the UDP around. It's easy to put a UDP into a Configuration this way and we end up with a human-readable verison if not a human-writable verison. And, as you noted, there's always the chance that although a class implements Serializable, it actually isn't and produces runtime errors. The conversion to a String usually isn't bad, the heavy-lifting is done by instantiating the UDP class itself. The data matches the column type most of the time, which is why we pass the column type to the method. We could even add a set of helper methods that convert each primitive type to a String. |
|
Hi @isnotinvain , there is some enforcer plugin error... can you help with this one ? Locally have tested everything. [WARNING] Rule 0: org.semver.enforcer.RequireSemanticVersioningConformance failed with message: |
|
Hi @isnotinvain |
|
@saucam sorry for the slow reply, taking another look |
|
ok ! thanks ! |
|
Hi @isnotinvain any updates on this one ? |
|
@rdblue makes some pretty good arguments for using String, and it would simplify a lot of this. @julienledem @egonina do you have any strong opinions on how to store this kind of state? |
|
Hi @saucam
|
|
Hi @julienledem , @isnotinvain Thanks for the comments, added the changes mentioned. How does it look now ? |
|
@julienledem is there value in making ConfiguredUserDefinedPredicate compose another UDP, or instead just make: |
|
Sorry to keep piling on this review, one more question though. It seems like we shouldn't need to add a new operator to the filter tree, and then update every single visitor for filter trees. We just want 2 ways of serializing a UDP, either by class name, or by serializing an instance. I think we can handle this just at the layer that deals with building a filter predicate + serializing it. The rest should be transparent. I think we can do this by adding a method to FilterApi that accepts an Instance of a UDP that is Serializable, similar to what you have, eg: Note that we are still instantiating a The reason is, UserDefined is already serializable. If we make it act like a union type that holds either a class, or a reference to a serializable UDP, it should be fine, eg: Now this is the only change we have to make, none of the visitors need to care about a new type to visit. Does that make sense? |
|
@isnotinvain UserDefined could be abstract with just the column field and we would have 2 subclasses. One with the field udpClass and the other with the field udpInstance. The visitor only needs to know about UserDefined and an abstract definition of getUdpInstance() that would either return the serialized instance or instantiate it from the class depending on the implementation. |
|
@julienledem yes, that would work too. Either way the main thing is it's a new getUdpInstance() method behavior, but none of the visitors need to be updated (I don't feel strongly whether we do this through subclassing, or just an if statement in UserDefined). |
…ting udp class, other accepting serializable udp instance
|
@isnotinvain , @julienledem , made UserDefined abstract, added subclasses SimpleUserDefined , ConfiguredUserDefined please suggest... |
|
oops, I was playing around with this too. I'll take a look, thanks! |
|
Cool, looks like we did the same thing :) I sent you a PR: saucam#2 This combines what you just submitted, plus updates to the scala DSL to support this new form of UDP, and some tests that exercise the serialization codepath. Let me know what you think. |
Simplify user defined predicates with state, Add more test cases
|
LGTM |
|
I like it |
|
OK, let's merge once the tests pass. I'm trying to reproduce the test failure locally (we can't hit the rebuild button in travis unfortunately) |
|
Tests passed for me locally. |
|
I'm re-running the tests via this PR: https://github.com/apache/incubator-parquet-mr/pull/116, if those pass, I'll merge this PR instead. |
|
OK, I ran the tests successfully via my github fork here: |
Ignore binary incompatibility in private filter2 class
|
Done |
|
Thanks! I'll merge this now (finally :p) |
|
Thanks :) |
…2 api Currently for creating a user defined predicate using the new filter api, no value can be passed to create a dynamic filter at runtime. This reduces the usefulness of the user defined predicate, and meaningful predicates cannot be created. We can add a generic Object value that is passed through the api, which can internally be used in the keep function of the user defined predicate for creating many different types of filters. For example, in spark sql, we can pass in a list of filter values for a where IN clause query and filter the row values based on that list. Author: Yash Datta <[email protected]> Author: Alex Levenson <[email protected]> Author: Yash Datta <[email protected]> Closes apache#73 from saucam/master and squashes the following commits: 7231a3b [Yash Datta] Merge pull request #3 from isnotinvain/alexlevenson/fix-binary-compat dcc276b [Alex Levenson] Ignore binary incompatibility in private filter2 class 7bfa5ad [Yash Datta] Merge pull request #2 from isnotinvain/alexlevenson/simplify-udp-state 0187376 [Alex Levenson] Resolve merge conflicts 25aa716 [Alex Levenson] Simplify user defined predicates with state 51952f8 [Yash Datta] PARQUET-116: Fix whitespace d7b7159 [Yash Datta] PARQUET-116: Make UserDefined abstract, add two subclasses, one accepting udp class, other accepting serializable udp instance 40d394a [Yash Datta] PARQUET-116: Fix whitespace 9a63611 [Yash Datta] PARQUET-116: Fix whitespace 7caa4dc [Yash Datta] PARQUET-116: Add ConfiguredUserDefined that takes a serialiazble udp directly 0eaabf4 [Yash Datta] PARQUET-116: Move the config object from keep method to a configure method in udp predicate f51a431 [Yash Datta] PARQUET-116: Adding type safety for the filter object to be passed to user defined predicate d5a2b9e [Yash Datta] PARQUET-116: Enforce that the filter object to be passed must be Serializable dfd0478 [Yash Datta] PARQUET-116: Add a test case for passing a filter object to user defined predicate 4ab46ec [Yash Datta] PARQUET-116: Pass a filter object to user defined predicate in filter2 api
Currently for creating a user defined predicate using the new filter api, no value can be passed to create a dynamic filter at runtime. This reduces the usefulness of the user defined predicate, and meaningful predicates cannot be created. We can add a generic Object value that is passed through the api, which can internally be used in the keep function of the user defined predicate for creating many different types of filters.
For example, in spark sql, we can pass in a list of filter values for a where IN clause query and filter the row values based on that list.