-
Notifications
You must be signed in to change notification settings - Fork 872
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support min_by group by aggregate #16163
base: branch-24.08
Are you sure you want to change the base?
Support min_by group by aggregate #16163
Conversation
Signed-off-by: Haoyang Li <[email protected]>
aa6c36b
to
101a929
Compare
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain an example of where this new aggregation is a better choice than argmin + gather? libcudf already provides the essential building blocks for this kind of operation, and I don't see how this specialized implementation provides a significant benefit. I'm not sure if I find the claims in #16139 about memory pressure, single-pass aggregation, and performance to be compelling from a surface level view.
If we do go this route and decide this feature is necessary, we should also implement a max_by
aggregation at the same time, for symmetry.
min_by is an aggregation for Spark but not in Pandas, so we would like to match Spark's behavior directly from the cuDF side. argmin/max + gather have few features gap between Spark's min/max_by that are difficult to handle from the spark-rapids side, such as
Now we have handled the all nulls case by modifying the null masks and using argmin+gather route to quickly support our customer's need. But for the next step, we'd like to have an independent implementation to support float aggregation and max_by. Another reason is that min_by and max_by are two special aggregations in Spark that need to perform aggregation on two different columns. So we need to package the two columns into one struct column for cuDF to handle because AFAIK cuDF only supports aggregation on one column. Otherwise we need to do some special post-processing in spark-rapids to check if there are min/max_bys in aggs, gather them to their value column and concat the results back into the original agg result table, which would be much slower and harder to implement.
Will do if we go this route. |
In terms of semantics, the proposed min_by would need to match the argmin + gather implementation exactly. In libcudf, null and NaN control are handled by separate arguments. For example, the collect set aggregation (https://docs.rapids.ai/api/libcudf/stable/group__aggregation__factories#gaebe680a414f3c942a631f609bcfb5781) accepts null_policy, null_equality, and nan_equality arguments. This is a better route to address the desired semantics if you can express it. There are also some examples of ordering policies in libcudf, like null_order. We need to work on some improvements to argmin and argmax anyway, so this would be a good joint project for us. Currently we have “experimental” row comparators that we use for everything except for argmin calls, and we need to adopt those to expand support for nested list and struct columns. I can dig up the issue where this is discussed. |
Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven Here is the other issue I was thinking of: #14412 (comment) |
That sounds good, adding those arguments seems to work. I will try to do this, maybe in another pr just for argmin/argmax and make min/max_by just: 1. unpack the struct column. 2. call argmin/argmax with different arguments. 3. do a gather with the struct column. Does this make sense to you?
Thank you! I was wondering how to write a hash-based implementation for min_by since the value will always be a struct (but no idea). |
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Description
Closes #16139
This pr adds support for min_by, which is used to return the value of a column associated with the minimum value of another column. It will be useful for spark-rapids.
Currently this pr only supports sort based group by, will try to add a hash group by too, but I'm not very clear how to do it right now because the input column from spark will be a struct column of value and order.
Related pr in spark-rapids: NVIDIA/spark-rapids#11123
For Spark, all orderable types (basic types and array/struct) are supported, except float and double with NaN values, because Spark has a special handling for NaN in non-nested floating types.
Checklist