Skip to content

Comments

PARQUET-116: Pass a filter object to user defined predicate in filter2 api#73

Closed
saucam wants to merge 15 commits intoapache:masterfrom
saucam:master
Closed

PARQUET-116: Pass a filter object to user defined predicate in filter2 api#73
saucam wants to merge 15 commits intoapache:masterfrom
saucam:master

Conversation

@saucam
Copy link

@saucam saucam commented Oct 18, 2014

Currently for creating a user defined predicate using the new filter api, no value can be passed to create a dynamic filter at runtime. This reduces the usefulness of the user defined predicate, and meaningful predicates cannot be created. We can add a generic Object value that is passed through the api, which can internally be used in the keep function of the user defined predicate for creating many different types of filters.
For example, in spark sql, we can pass in a list of filter values for a where IN clause query and filter the row values based on that list.

@saucam
Copy link
Author

saucam commented Oct 21, 2014

@julienledem, please advise . I have tested this by passing a custom filter for where in clause query (select * from t1 where c1 in (f1,f2,f3) ) in spark sql .

@julienledem
Copy link
Member

@isnotinvain should probably review as well.

@isnotinvain
Copy link
Contributor

Hello, thanks for the PR!

I get the motivation for this and you're right that we need to fix it, but I wonder if this is the right API.
The way hadoop handles this situation is it instantiates classes via reflection, then hands them the Configuration object which they can pull any state that they need out of. We could maybe do something similar, so a UDP would have an additional method called configure or something like that.

One problem with that is, currently parquet-column doesn't depend on hadoop, so we'd be breaking that separation. And if we come up with our own configuration / context object, it will be yet another configuration floating around the codebase...

The way scalding solves this problem on the other hand, is it doesn't load classes via reflection, it serializes instances, so instead your UDP could contain some state (eg the HashSet) and that would get serialized + shipped. The downside there is, we don't currently do anything like that in parquet, so we'd probably be adding a dependency on kryo or chill or something like that.

The way this PR is written however, I'm not sure it will work if I understand correctly. The FilterPredicate needs to be serialized into the hadoop configuration so that it can be read remotely. Currently this isn't a problem, because all of the nodes in a predicate tree implement java.io.Serializable and contain no references to user defined instances (user defined classes yes, but instances no). So at a minimum, I think we would need to add some bounds to the Object passed to the UDP that ensures that it implements Serializable. But, now we're in the business of serializing user defined instances (not just classes), which comes with a bunch of its own problems (just because you say you're Serializable doesn't mean you actually are). I think if we're going to go that route, we might as well go the full scalding approach and just accept an instance of a UDP (with its own state) than accepting a (class, state) pair.

If we do decide to take this approach, I think we should at least add type bounds, and add a generic parameter to UserDefinedPredicate, eg:
public abstract class UserDefinedPredicate<T extends Comparable<T>, S extends Serializable>
so instead of taking an Object we would take something of type S, which would then verify that a UserDefinedPredicate<T, S> is only passed an S (this would also eliminate the cast).

@saucam
Copy link
Author

saucam commented Oct 22, 2014

thanks for the input. I understand the problem. What do you think will be the best approach ? Its easy to add type bounds, also in case of accepting an instance of UDP , the api will change ...

@rdblue
Copy link
Contributor

rdblue commented Oct 24, 2014

Another option we might want to consider is an API that delegates serializing predicate data to the predicate class. We implemented this in Kite as RegisteredPrediate (the tests are a good place to start).

A registered predicate registers a name, like "contains", and a factory method that will produce instances from a String, which is formatted by the predicate class. In addition, predicate class has a method that converts its data to such a string (we use toString(Schema)). Using those building blocks, Kite can convert an instance into a String by using the name and the string representation, and can get an instance from a string by taking the name, looking up the right factory, and calling the factory on the predicate-specific string. We end up with a serialization that can be used with a Configuration, has a readable representation ("contains(abc*)"), but doesn't use Serializable.

@saucam
Copy link
Author

saucam commented Oct 28, 2014

Enforced that a serializable object can only be passed as a filter object to UserDefinedPredicate. Please suggest changes/mods as appropriate

@isnotinvain
Copy link
Contributor

I like the idea of adding a String that will be kept with the object. It leaves the serialization decisions to the UDP writer, and it maps easily into the fact that we ultimately have to stuff this into the hadoop configuration object without exposing the user to the hadoop configuration object.

We could take a similar approach to Pig and support a constructor that takes a string as an argument, or we can just have a getter + setter for the String in the UDP.

This approach is very similar to how this PR is written already -- we could also just stick with Serializable and leave it up to the UDP writer to ensure that their Serializable is actually serializable. (String is Serializable anyway).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets change the above to:

public abstract class UserDefinedPredicate<T extends Comparable<T>, S extends Serializable> {
  public UserDefinedPredicate(S state) { }

then the keep method doesn't need to take an extra arg?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we change the constructor, we need to change the reflective invocation part of the class 's default constructor as well. Is that ok ?

public U getUserDefinedPredicate() {
  try {
    return udpClass.newInstance();
  } catch (InstantiationException e) {
    throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e); 
  } catch (IllegalAccessException e) {
    throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e); 
  }   

@saucam
Copy link
Author

saucam commented Oct 30, 2014

Added generic parameter for the filter object , bound by Serializable

@isnotinvain
Copy link
Contributor

Thanks for updating and for fixing this.

Lets get the naming consistent -- I think filter object is ambiguous (a UDP is a filter object after all). Let's just call this a Configuration or State or UDPConfig or something like that.

also instead of adding a parameter to the keep method, lets just add a constructor arg to UserDefinedPredicate or a configure method that acts like a setter.

@rdblue
Copy link
Contributor

rdblue commented Oct 31, 2014

This approach [passing data a UDP-managed string] is very similar to how this PR is written already -- we could also just stick with Serializable and leave it up to the UDP writer to ensure that their Serializable is actually serializable.

I don't understand. A String is Serializable, but a Serializable isn't necessarily a String. What I was proposing was to allow the UDP to serialize its data as a String and pass that String to the UDP when initializing. We can pass the data to the UDP using a method that accepts Serializable, but don't we also need the to-string part to be added?

@isnotinvain
Copy link
Contributor

I was saying if you're going to go through the trouble of keeping a companion state object around for a UDP (in your description a String), it's not that different to generalize to a Serializable instead of limiting to only a String.

The upside of using String is parquet doesn't have to get in the business of serializing things, it just has to shuffle a string around and give it back to the UDP at the right time, and parquet never has to worry about the String not being serializable... because it's a String. The downside is every UDP has to have some code in it for serializing its state into a String.

The other option I was mentioning, which is how this PR is currently written, is exactly the same as your suggestion, except the state object isn't a String it's any Serializable. The upside here is you can add a type param to UDP and UDP writers don't have to write any serialization code, the downside is parquet has to deal with the fact that just because a class implements Serializable doesn't mean that it can actually be serialized. I don't feel that strongly either way.

@saucam
Copy link
Author

saucam commented Oct 31, 2014

  • Added a configure method to UserDefinedPredicate
  • renamed the object to udpConfig

@rdblue
Copy link
Contributor

rdblue commented Oct 31, 2014

Thanks for clarifying. I see your point that we can think of the Serializable and String doing the same work, with String being a more restrictive case. I think that it is good to have the more restrictive case because it means we have a lot more flexibility when sending the UDP around. It's easy to put a UDP into a Configuration this way and we end up with a human-readable verison if not a human-writable verison. And, as you noted, there's always the chance that although a class implements Serializable, it actually isn't and produces runtime errors.

The conversion to a String usually isn't bad, the heavy-lifting is done by instantiating the UDP class itself. The data matches the column type most of the time, which is why we pass the column type to the method. We could even add a set of helper methods that convert each primitive type to a String.

@saucam
Copy link
Author

saucam commented Nov 3, 2014

Hi @isnotinvain ,

there is some enforcer plugin error... can you help with this one ? Locally have tested everything.

[WARNING] Rule 0: org.semver.enforcer.RequireSemanticVersioningConformance failed with message:
Current codebase is incompatible with version <1.6.0rc1>. Version should be at least <2.0.0>.

@saucam
Copy link
Author

saucam commented Nov 23, 2014

Hi @isnotinvain
please review or suggest what changes are needed ?

@isnotinvain
Copy link
Contributor

@saucam sorry for the slow reply, taking another look

@saucam
Copy link
Author

saucam commented Nov 28, 2014

ok ! thanks !

@saucam
Copy link
Author

saucam commented Jan 28, 2015

Hi @isnotinvain any updates on this one ?
This is useful for many different use cases, for example , we can pass in a bloom filter and get all rows not passing...

@isnotinvain
Copy link
Contributor

@rdblue makes some pretty good arguments for using String, and it would simplify a lot of this.

@julienledem @egonina do you have any strong opinions on how to store this kind of state?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does o refer to here?

@julienledem
Copy link
Member

Hi @saucam
Adding a serializable configuration to the current UserDefinedPredicate is a little awkward because the point of using a class name was not make explicit there was no serialized state.
Now if we want to add this capability we may as well pass directly an instance of a UserDefinedPredicate that is serializable.
I would recommend the following:

  • leave UserDefined as is.
  • similar to UserDefined, define a new ConfiguredUserDefined<T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable>
    with a constructor: ConfiguredUserDefined(Column<T> column, U udp)
  • add in FilterApi:
public static <T extends Comparable<T>, U extends UserDefinedPredicate<T> & Serializable>
   UserDefined<T, U> userDefined(Column<T> column, U udp) {
     return new ConfiguredUserDefined<T, U>(column, udp);
  }
}
  • add the new ConfiguredUserDefined in the visitor pattern and add the appropriate implementation that will be similar as UserDefined (you can even make it common by provided a base class to both)
    I hope this helps

@saucam
Copy link
Author

saucam commented Feb 2, 2015

Hi @julienledem , @isnotinvain

Thanks for the comments, added the changes mentioned. How does it look now ?

@isnotinvain
Copy link
Contributor

@julienledem is there value in making ConfiguredUserDefinedPredicate compose another UDP, or instead just make:

asbstract class ConfiguredUserDefinedPredicate extend UserDefinedPredicate<...> implements Serializable

@isnotinvain
Copy link
Contributor

Sorry to keep piling on this review, one more question though. It seems like we shouldn't need to add a new operator to the filter tree, and then update every single visitor for filter trees.

We just want 2 ways of serializing a UDP, either by class name, or by serializing an instance. I think we can handle this just at the layer that deals with building a filter predicate + serializing it. The rest should be transparent.

I think we can do this by adding a method to FilterApi that accepts an Instance of a UDP that is Serializable, similar to what you have, eg:

/**
 * Keeps records that pass the provided {@link UserDefinedPredicate}
 */
public static <T extends Comparable<T>, U extends UserDefinedPredicate<T>>
 UserDefined<T, U> userDefined(Column<T> column, Class<U> clazz) {
 return new UserDefined<T, U>(column, clazz);
}

/**
 * Similar to above but allows to pass Serializable {@link UserDefinedPredicate}
 */
public static <T extends Comparable<T>, U extends UserDefinedPredicate<T>> & Serializable
 UserDefined<T, U> userDefined(Column<T> column, U udp) {
 return new UserDefined<T, U>(column, udp);
}

Note that we are still instantiating a UserDefined, not a new type like ConfigurableUserDefined.

The reason is, UserDefined is already serializable. If we make it act like a union type that holds either a class, or a reference to a serializable UDP, it should be fine, eg:

public static final class UserDefined<T extends Comparable<T>, U extends UserDefinedPredicate<T>> implements FilterPredicate, Serializable {
    private final Column<T> column;
    private final Class<U> udpClass;
    private final U udpInstance;
    ...

    public U getUserDefinedPredicate() {
      if (udpInstance != null) { 
        return udpInstance; 
      } else {
      try {
        return udpClass.newInstance();
      } catch (InstantiationException e) {
        throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e);
      } catch (IllegalAccessException e) {
        throw new RuntimeException(String.format(INSTANTIATION_ERROR_MESSAGE, udpClass), e);
      }
    }
   }
  }

Now this is the only change we have to make, none of the visitors need to care about a new type to visit. Does that make sense?

@julienledem
Copy link
Member

@isnotinvain UserDefined could be abstract with just the column field and we would have 2 subclasses. One with the field udpClass and the other with the field udpInstance. The visitor only needs to know about UserDefined and an abstract definition of getUdpInstance() that would either return the serialized instance or instantiate it from the class depending on the implementation.

@isnotinvain
Copy link
Contributor

@julienledem yes, that would work too. Either way the main thing is it's a new getUdpInstance() method behavior, but none of the visitors need to be updated (I don't feel strongly whether we do this through subclassing, or just an if statement in UserDefined).

Yash Datta added 2 commits February 4, 2015 13:15
@saucam
Copy link
Author

saucam commented Feb 4, 2015

@isnotinvain , @julienledem , made UserDefined abstract, added subclasses SimpleUserDefined , ConfiguredUserDefined

please suggest...

@isnotinvain
Copy link
Contributor

oops, I was playing around with this too. I'll take a look, thanks!

@isnotinvain
Copy link
Contributor

Cool, looks like we did the same thing :)

I sent you a PR: saucam#2

This combines what you just submitted, plus updates to the scala DSL to support this new form of UDP, and some tests that exercise the serialization codepath. Let me know what you think.

Simplify user defined predicates with state, Add more test cases
@saucam
Copy link
Author

saucam commented Feb 4, 2015

LGTM
Thanks for the PR @isnotinvain :) merged.

@julienledem
Copy link
Member

I like it
+1

@isnotinvain
Copy link
Contributor

OK, let's merge once the tests pass. I'm trying to reproduce the test failure locally (we can't hit the rebuild button in travis unfortunately)

@isnotinvain
Copy link
Contributor

Tests passed for me locally.

@isnotinvain
Copy link
Contributor

I'm re-running the tests via this PR: https://github.com/apache/incubator-parquet-mr/pull/116, if those pass, I'll merge this PR instead.

@isnotinvain
Copy link
Contributor

OK, I ran the tests successfully via my github fork here:
https://travis-ci.org/isnotinvain/incubator-parquet-mr/builds/50141393

This is ready merge once @saucam merges: saucam#3

Ignore binary incompatibility in private filter2 class
@saucam
Copy link
Author

saucam commented Feb 10, 2015

Done

@isnotinvain
Copy link
Contributor

Thanks! I'll merge this now (finally :p)

@asfgit asfgit closed this in 807915b Feb 10, 2015
@saucam
Copy link
Author

saucam commented Feb 10, 2015

Thanks :)

rdblue pushed a commit to rdblue/parquet-mr that referenced this pull request Mar 9, 2015
…2 api

Currently for creating a user defined predicate using the new filter api, no value can be passed to create a dynamic filter at runtime. This reduces the usefulness of the user defined predicate, and meaningful predicates cannot be created. We can add a generic Object value that is passed through the api, which can internally be used in the keep function of the user defined predicate for creating many different types of filters.
For example, in spark sql, we can pass in a list of filter values for a where IN clause query and filter the row values based on that list.

Author: Yash Datta <[email protected]>
Author: Alex Levenson <[email protected]>
Author: Yash Datta <[email protected]>

Closes apache#73 from saucam/master and squashes the following commits:

7231a3b [Yash Datta] Merge pull request #3 from isnotinvain/alexlevenson/fix-binary-compat
dcc276b [Alex Levenson] Ignore binary incompatibility in private filter2 class
7bfa5ad [Yash Datta] Merge pull request #2 from isnotinvain/alexlevenson/simplify-udp-state
0187376 [Alex Levenson] Resolve merge conflicts
25aa716 [Alex Levenson] Simplify user defined predicates with state
51952f8 [Yash Datta] PARQUET-116: Fix whitespace
d7b7159 [Yash Datta] PARQUET-116: Make UserDefined abstract, add two subclasses, one accepting udp class, other accepting serializable udp instance
40d394a [Yash Datta] PARQUET-116: Fix whitespace
9a63611 [Yash Datta] PARQUET-116: Fix whitespace
7caa4dc [Yash Datta] PARQUET-116: Add ConfiguredUserDefined that takes a serialiazble udp directly
0eaabf4 [Yash Datta] PARQUET-116: Move the config object from keep method to a configure method in udp predicate
f51a431 [Yash Datta] PARQUET-116: Adding type safety for the filter object to be passed to user defined predicate
d5a2b9e [Yash Datta] PARQUET-116: Enforce that the filter object to be passed must be Serializable
dfd0478 [Yash Datta] PARQUET-116: Add a test case for passing a filter object to user defined predicate
4ab46ec [Yash Datta] PARQUET-116: Pass a filter object to user defined predicate in filter2 api
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants