Skip to content

Blog post on using UDFs in python#17

Merged
timsaucer merged 11 commits intoapache:mainfrom
timsaucer:tsaucer/python-udf-approaches
Nov 19, 2024
Merged

Blog post on using UDFs in python#17
timsaucer merged 11 commits intoapache:mainfrom
timsaucer:tsaucer/python-udf-approaches

Conversation

@timsaucer
Copy link
Member

This PR adds a blog post describing using UDFs, and in particular on how to combine third party rust UDFs with datafusion-python.

return pa.array(result)


is_of_interest = udf(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
is_of_interest = udf(
# Wrap our custom function with `datafusion.udf`, annotating expected
# parameter and return types
is_of_interest = udf(

As a separate note, it wouldn't be hard to convert this udf function wrapper into a Python decorator, so we could do

@udf(args=(pa.int64(), pa.int64(), pa.utf8()), returns=pa.bool_(), "stable")
def is_of_interest(
    partkey_arr: pa.Array,
    suppkey_arr: pa.Array,
    returnflag_arr: pa.Array,
) -> pa.Array: ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great idea. I've added it to the issue list apache/datafusion-python#806

returnflag_arr: pa.Array,
) -> pa.Array:
results = None
for partkey, suppkey, returnflag in values_of_interest:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use pyarrow.is_in to speed this up, instead of doing an equality check multiple times: https://arrow.apache.org/docs/python/generated/pyarrow.compute.is_in.html

Comment on lines 479 to 485
let values = partkey_arr
.values()
.iter()
.zip(suppkey_arr.values().iter())
.zip(returnflag_arr.iter())
.map(|((a, b), c)| (a, b, c.unwrap_or_default()))
.map(|v| values_to_search.contains(&v));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is faster I suppose because it's not doing a boolean check on each individual array in its entirety and then ORing them? It's doing it all at once in a single pass?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I didn't dive any deeper but my expectation is that by doing a single pass through the iteration we'll get a small speed improvement. It my modest test it only accounted for about a 5% boost.

@timsaucer
Copy link
Member Author

Huge tip of the hat to @kylebarron for the thorough feedback!

@alamb
Copy link
Contributor

alamb commented Nov 18, 2024

@timsaucer is this something we should publish? I hadn't seen it before but it looks great

@timsaucer
Copy link
Member Author

I keep meaning to get back to it and add a portion on window functions but I get distracted with other things. Yes, let me brush it up.

@emgeee
Copy link

emgeee commented Nov 18, 2024

I definitely found the post helpful!

@timsaucer
Copy link
Member Author

I don't know if I'll have time soon to write an entire section on how to use the window functions, so I just added a blurb to look at the online documentation. I do think the online docs are actually in a pretty good spot. I think this is ready for review/merge/publish.

Copy link
Member

@andygrove andygrove left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent blog post! Thanks @timsaucer


For a few months now I’ve been working with [Apache DataFusion](https://datafusion.apache.org/), a
fast query engine written in Rust. From my experience the language that nearly all data scientists
are working in is Python. In general, often stick to [Pandas](https://pandas.pydata.org/) for
Copy link
Contributor

@Omega359 Omega359 Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
are working in is Python. In general, often stick to [Pandas](https://pandas.pydata.org/) for
are working in is Python. In general, data scientists often use [Pandas](https://pandas.pydata.org/) for


## User Defined Window Functions

Writing a user defined window function is slighlty more complex than an aggregate function due
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Writing a user defined window function is slighlty more complex than an aggregate function due
Writing a user defined window function is slightly more complex than an aggregate function due

processing.

In addition to DataFusion, there is another Rust based newcomer to the DataFrame world,
[Polars](https://pola.rs/). It is growing extremely fast, and it serves many of the same use cases
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
[Polars](https://pola.rs/). It is growing extremely fast, and it serves many of the same use cases
[Polars](https://pola.rs/). The latter is growing extremely fast, and it serves many of the same use cases


When it comes to designing UDFs, I strongly recommend seeing if you can write your UDF using
[PyArrow functions](https://arrow.apache.org/docs/python/api/compute.html) rather than pure Python
objects. These will give you enormous speed benefits. If you must do something that isn't well
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some of users jump directly into conclusions sections, so I'd add an short notice, what was speed benefits

Copy link
Contributor

@comphead comphead left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm thanks @timsaucer

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @timsaucer -- I think this is great (and I certainly learned a lot)

Once you merge this PR we need to do the publish process here: https://github.com/apache/datafusion-site?tab=readme-ov-file#publish-site

I am happy to do so as well, but I wanted to give you a heads up that it wouldn't just auto publish (no one has hooked that up yet)

- Writing a UDF in Rust and exposing it to Python

Additionally I will demonstrate two variants of this. The first will be nearly identical to the
PyArrow library approach to simplicity of understanding how to connect the Rust code to Python. The
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
PyArrow library approach to simplicity of understanding how to connect the Rust code to Python. The
PyArrow library approach to simplify understanding how to connect the Rust code to Python. In the

DataFrame itself, perform a join, and select the columns from the original DataFrame. If we were
working in PySpark we would probably broadcast join the DataFrame created from the tuple list since
it is tiny. In practice, I have found that with some DataFrame libraries performing a filter rather
than a join can be significantly faster. This is worth profiling for your specific use case.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree

BTW this is the kind of project I think is needed to make such queries much faster: apache/datafusion#7955

only to return a DataFrame with a specific combination of these three values. That is, I want
to know if part number 1530 from supplier 4031 was sold (not returned), so I want a specific
combination of `p_partkey = 1530`, `p_suppkey = 4031`, and `p_returnflag = 'N'`. I have a small
handful of these combinations I want to return.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should change this blog post / example, but I wanted to point out that you can do this same calculation in SQL

> create table foo (x int, y int) as values (1,2), (3,4), (5,6);
0 row(s) fetched.
Elapsed 0.046 seconds.

> select * from foo where (x,y) IN ((3,4), (4,5));
+---+---+
| x | y |
+---+---+
| 3 | 4 |
+---+---+
1 row(s) fetched.
Elapsed 0.018 seconds.

> select * from foo where (x,y) IN ((3,4), (4,5));
+---+---+
| x | y |
+---+---+
| 3 | 4 |
+---+---+
1 row(s) fetched.
Elapsed 0.018 seconds.

DataFusion rewrites it internally using structs:

> select * from foo where {'x': x, 'y': y} IN ({'x': 3, 'y': 4}, {'x': 4, 'y': 5});
+---+---+
| x | y |
+---+---+
| 3 | 4 |
+---+---+
> explain select * from foo where (x,y) IN ((3,4), (4,5));
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| plan_type     | plan                                                                                                                                                                                                                                                                                                                                                       |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| logical_plan  | Filter: CAST(struct(foo.x, foo.y) AS Struct([Field { name: "c0", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c1", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }])) IN ([Struct({c0:3,c1:4}), Struct({c0:4,c1:5})])                                         |
|               |   TableScan: foo projection=[x, y]                                                                                                                                                                                                                                                                                                                         |
| physical_plan | CoalesceBatchesExec: target_batch_size=8192                                                                                                                                                                                                                                                                                                                |
|               |   FilterExec: CAST(struct(x@0, y@1) AS Struct([Field { name: "c0", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: "c1", data_type: Int64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }])) IN ([Literal { value: Struct({c0:3,c1:4}) }, Literal { value: Struct({c0:4,c1:5}) }]) |
|               |     MemoryExec: partitions=1, partition_sizes=[1]                                                                                                                                                                                                                                                                                                          |
|               |                                                                                                                                                                                                                                                                                                                                                            |
+---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
2 row(s) fetched.
Elapsed 0.003 seconds.

will do here is write a Rust function to perform our computation and then expose that function to
Python. I know of two use cases where I would recommend this approach. The first is the case when
the PyArrow compute functions are insufficient for your needs. Perhaps your code is too complex or
could be greatly simplified if you pulled in some outside dependency. The second use case is when
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are really good points

we will process a single array and update the internal state, which we share with the `state()`
function. For larger batches we may `merge()` these states. It is important to note that the
`states` in the `merge()` function are an array of the values returned from `state()`. It is
entirely possible that the `merge` function is significantly different than the `update`, though in
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A classic example is "avg" where the state is a sum + count and the output is a value

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avg was my first example before I found a bug (apache/datafusion-python#797) and switched it to sum to unblock myself!

```

As expected, the conversion to Python objects is by far the worst performance. As soon as we drop
into using any functions that keep the data entirely on the Rust side we see a near 10x speed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think technically using pyarrow doesn't use rust -- it is implemented in C/C++. Possibly this would be slightly more accurate:

Suggested change
into using any functions that keep the data entirely on the Rust side we see a near 10x speed
into using any functions that keep the data entirely on the Native (Rust or C/C++) side we see a near 10x speed

@timsaucer
Copy link
Member Author

Thank you all for the feedback. I've incorporated the suggestions and will wait to the end of my work day to see if there are any more comments before publishing.

@timsaucer timsaucer merged commit 4b60caf into apache:main Nov 19, 2024
@timsaucer
Copy link
Member Author

I wasn't able to get the site to build on my machine. I'm going to try setting up a docker container just for building it tomorrow.

@alamb
Copy link
Contributor

alamb commented Nov 19, 2024

I wasn't able to get the site to build on my machine. I'm going to try setting up a docker container just for building it tomorrow.

I had a docker container setup so I made a PR to publish it:

@alamb
Copy link
Contributor

alamb commented Nov 20, 2024

The post is now live https://datafusion.apache.org/blog/2024/11/19/datafusion-python-udf-comparisons/

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants