Blog post on using UDFs in python#17
Blog post on using UDFs in python#17timsaucer merged 11 commits intoapache:mainfrom timsaucer:tsaucer/python-udf-approaches
Conversation
| return pa.array(result) | ||
|
|
||
|
|
||
| is_of_interest = udf( |
There was a problem hiding this comment.
| is_of_interest = udf( | |
| # Wrap our custom function with `datafusion.udf`, annotating expected | |
| # parameter and return types | |
| is_of_interest = udf( |
As a separate note, it wouldn't be hard to convert this udf function wrapper into a Python decorator, so we could do
@udf(args=(pa.int64(), pa.int64(), pa.utf8()), returns=pa.bool_(), "stable")
def is_of_interest(
partkey_arr: pa.Array,
suppkey_arr: pa.Array,
returnflag_arr: pa.Array,
) -> pa.Array: ...There was a problem hiding this comment.
Great idea. I've added it to the issue list apache/datafusion-python#806
| returnflag_arr: pa.Array, | ||
| ) -> pa.Array: | ||
| results = None | ||
| for partkey, suppkey, returnflag in values_of_interest: |
There was a problem hiding this comment.
I think you can use pyarrow.is_in to speed this up, instead of doing an equality check multiple times: https://arrow.apache.org/docs/python/generated/pyarrow.compute.is_in.html
| let values = partkey_arr | ||
| .values() | ||
| .iter() | ||
| .zip(suppkey_arr.values().iter()) | ||
| .zip(returnflag_arr.iter()) | ||
| .map(|((a, b), c)| (a, b, c.unwrap_or_default())) | ||
| .map(|v| values_to_search.contains(&v)); |
There was a problem hiding this comment.
This is faster I suppose because it's not doing a boolean check on each individual array in its entirety and then ORing them? It's doing it all at once in a single pass?
There was a problem hiding this comment.
Yes, I didn't dive any deeper but my expectation is that by doing a single pass through the iteration we'll get a small speed improvement. It my modest test it only accounted for about a 5% boost.
|
Huge tip of the hat to @kylebarron for the thorough feedback! |
|
@timsaucer is this something we should publish? I hadn't seen it before but it looks great |
|
I keep meaning to get back to it and add a portion on window functions but I get distracted with other things. Yes, let me brush it up. |
|
I definitely found the post helpful! |
|
I don't know if I'll have time soon to write an entire section on how to use the window functions, so I just added a blurb to look at the online documentation. I do think the online docs are actually in a pretty good spot. I think this is ready for review/merge/publish. |
andygrove
left a comment
There was a problem hiding this comment.
Excellent blog post! Thanks @timsaucer
|
|
||
| For a few months now I’ve been working with [Apache DataFusion](https://datafusion.apache.org/), a | ||
| fast query engine written in Rust. From my experience the language that nearly all data scientists | ||
| are working in is Python. In general, often stick to [Pandas](https://pandas.pydata.org/) for |
There was a problem hiding this comment.
| are working in is Python. In general, often stick to [Pandas](https://pandas.pydata.org/) for | |
| are working in is Python. In general, data scientists often use [Pandas](https://pandas.pydata.org/) for |
|
|
||
| ## User Defined Window Functions | ||
|
|
||
| Writing a user defined window function is slighlty more complex than an aggregate function due |
There was a problem hiding this comment.
| Writing a user defined window function is slighlty more complex than an aggregate function due | |
| Writing a user defined window function is slightly more complex than an aggregate function due |
| processing. | ||
|
|
||
| In addition to DataFusion, there is another Rust based newcomer to the DataFrame world, | ||
| [Polars](https://pola.rs/). It is growing extremely fast, and it serves many of the same use cases |
There was a problem hiding this comment.
| [Polars](https://pola.rs/). It is growing extremely fast, and it serves many of the same use cases | |
| [Polars](https://pola.rs/). The latter is growing extremely fast, and it serves many of the same use cases |
|
|
||
| When it comes to designing UDFs, I strongly recommend seeing if you can write your UDF using | ||
| [PyArrow functions](https://arrow.apache.org/docs/python/api/compute.html) rather than pure Python | ||
| objects. These will give you enormous speed benefits. If you must do something that isn't well |
There was a problem hiding this comment.
Some of users jump directly into conclusions sections, so I'd add an short notice, what was speed benefits
alamb
left a comment
There was a problem hiding this comment.
Thank you @timsaucer -- I think this is great (and I certainly learned a lot)
Once you merge this PR we need to do the publish process here: https://github.com/apache/datafusion-site?tab=readme-ov-file#publish-site
I am happy to do so as well, but I wanted to give you a heads up that it wouldn't just auto publish (no one has hooked that up yet)
| - Writing a UDF in Rust and exposing it to Python | ||
|
|
||
| Additionally I will demonstrate two variants of this. The first will be nearly identical to the | ||
| PyArrow library approach to simplicity of understanding how to connect the Rust code to Python. The |
There was a problem hiding this comment.
| PyArrow library approach to simplicity of understanding how to connect the Rust code to Python. The | |
| PyArrow library approach to simplify understanding how to connect the Rust code to Python. In the |
| DataFrame itself, perform a join, and select the columns from the original DataFrame. If we were | ||
| working in PySpark we would probably broadcast join the DataFrame created from the tuple list since | ||
| it is tiny. In practice, I have found that with some DataFrame libraries performing a filter rather | ||
| than a join can be significantly faster. This is worth profiling for your specific use case. |
There was a problem hiding this comment.
100% agree
BTW this is the kind of project I think is needed to make such queries much faster: apache/datafusion#7955
| only to return a DataFrame with a specific combination of these three values. That is, I want | ||
| to know if part number 1530 from supplier 4031 was sold (not returned), so I want a specific | ||
| combination of `p_partkey = 1530`, `p_suppkey = 4031`, and `p_returnflag = 'N'`. I have a small | ||
| handful of these combinations I want to return. |
There was a problem hiding this comment.
I don't think we should change this blog post / example, but I wanted to point out that you can do this same calculation in SQL
> create table foo (x int, y int) as values (1,2), (3,4), (5,6);
0 row(s) fetched.
Elapsed 0.046 seconds.
> select * from foo where (x,y) IN ((3,4), (4,5));
+---+---+
| x | y |
+---+---+
| 3 | 4 |
+---+---+
1 row(s) fetched.
Elapsed 0.018 seconds.
> select * from foo where (x,y) IN ((3,4), (4,5));
+---+---+
| x | y |
+---+---+
| 3 | 4 |
+---+---+
1 row(s) fetched.
Elapsed 0.018 seconds.
DataFusion rewrites it internally using structs:
> select * from foo where {'x': x, 'y': y} IN ({'x': 3, 'y': 4}, {'x': 4, 'y': 5});
+---+---+
| x | y |
+---+---+
| 3 | 4 |
+---+---+> explain select * from foo where (x,y) IN ((3,4), (4,5));
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type | plan |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan | Filter: CAST(struct(foo.x, foo.y) AS Struct([Field { name: "c0", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c1", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }])) IN ([Struct({c0:3,c1:4}), Struct({c0:4,c1:5})]) |
| | TableScan: foo projection=[x, y] |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192 |
| | FilterExec: CAST(struct(x@0, y@1) AS Struct([Field { name: "c0", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c1", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }])) IN ([Literal { value: Struct({c0:3,c1:4}) }, Literal { value: Struct({c0:4,c1:5}) }]) |
| | MemoryExec: partitions=1, partition_sizes=[1] |
| | |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.003 seconds.| will do here is write a Rust function to perform our computation and then expose that function to | ||
| Python. I know of two use cases where I would recommend this approach. The first is the case when | ||
| the PyArrow compute functions are insufficient for your needs. Perhaps your code is too complex or | ||
| could be greatly simplified if you pulled in some outside dependency. The second use case is when |
There was a problem hiding this comment.
these are really good points
| we will process a single array and update the internal state, which we share with the `state()` | ||
| function. For larger batches we may `merge()` these states. It is important to note that the | ||
| `states` in the `merge()` function are an array of the values returned from `state()`. It is | ||
| entirely possible that the `merge` function is significantly different than the `update`, though in |
There was a problem hiding this comment.
A classic example is "avg" where the state is a sum + count and the output is a value
There was a problem hiding this comment.
avg was my first example before I found a bug (apache/datafusion-python#797) and switched it to sum to unblock myself!
| ``` | ||
|
|
||
| As expected, the conversion to Python objects is by far the worst performance. As soon as we drop | ||
| into using any functions that keep the data entirely on the Rust side we see a near 10x speed |
There was a problem hiding this comment.
I think technically using pyarrow doesn't use rust -- it is implemented in C/C++. Possibly this would be slightly more accurate:
| into using any functions that keep the data entirely on the Rust side we see a near 10x speed | |
| into using any functions that keep the data entirely on the Native (Rust or C/C++) side we see a near 10x speed |
|
Thank you all for the feedback. I've incorporated the suggestions and will wait to the end of my work day to see if there are any more comments before publishing. |
|
I wasn't able to get the site to build on my machine. I'm going to try setting up a docker container just for building it tomorrow. |
I had a docker container setup so I made a PR to publish it: |
This PR adds a blog post describing using UDFs, and in particular on how to combine third party rust UDFs with datafusion-python.