Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support min_by group by aggregate #16163

Draft
wants to merge 12 commits into
base: branch-24.08
Choose a base branch
from

Conversation

thirtiseven
Copy link
Contributor

@thirtiseven thirtiseven commented Jul 2, 2024

Description

Closes #16139

This pr adds support for min_by, which is used to return the value of a column associated with the minimum value of another column. It will be useful for spark-rapids.

Currently this pr only supports sort based group by, will try to add a hash group by too, but I'm not very clear how to do it right now because the input column from spark will be a struct column of value and order.

Related pr in spark-rapids: NVIDIA/spark-rapids#11123

For Spark, all orderable types (basic types and array/struct) are supported, except float and double with NaN values, because Spark has a special handling for NaN in non-nested floating types.

Checklist

  • I am familiar with the Contributing Guidelines.
  • New or existing tests cover these changes.
  • The documentation is up to date with these changes.

Copy link

copy-pr-bot bot commented Jul 2, 2024

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@github-actions github-actions bot added libcudf Affects libcudf (C++/CUDA) code. CMake CMake build issue Java Affects Java cuDF API. labels Jul 2, 2024
@firestarman firestarman added feature request New feature or request non-breaking Non-breaking change labels Jul 3, 2024
Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven thirtiseven marked this pull request as ready for review July 5, 2024 10:31
@thirtiseven thirtiseven requested review from a team as code owners July 5, 2024 10:31
Copy link
Contributor

@bdice bdice left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain an example of where this new aggregation is a better choice than argmin + gather? libcudf already provides the essential building blocks for this kind of operation, and I don't see how this specialized implementation provides a significant benefit. I'm not sure if I find the claims in #16139 about memory pressure, single-pass aggregation, and performance to be compelling from a surface level view.

If we do go this route and decide this feature is necessary, we should also implement a max_by aggregation at the same time, for symmetry.

@thirtiseven
Copy link
Contributor Author

Can you explain an example of where this new aggregation is a better choice than argmin + gather? libcudf already provides the essential building blocks for this kind of operation, and I don't see how this specialized implementation provides a significant benefit. I'm not sure if I find the claims in #16139 about memory pressure, single-pass aggregation, and performance to be compelling from a surface level view.

min_by is an aggregation for Spark but not in Pandas, so we would like to match Spark's behavior directly from the cuDF side. argmin/max + gather have few features gap between Spark's min/max_by that are difficult to handle from the spark-rapids side, such as

  • all nulls in a grouped order column, Spark returns null, argmin/max + gather returns first element in grouped value column.
  • NaN for float aggregation. In Spark, Nan is the maximum float value, but in cuDF, the calculation involving Nan is undefined.
  • min_by and max_by return the last minimum order value in Spark. For argmin it's matched, but for argmax cuDF will return the first maximum order value.

Now we have handled the all nulls case by modifying the null masks and using argmin+gather route to quickly support our customer's need. But for the next step, we'd like to have an independent implementation to support float aggregation and max_by.

Another reason is that min_by and max_by are two special aggregations in Spark that need to perform aggregation on two different columns. So we need to package the two columns into one struct column for cuDF to handle because AFAIK cuDF only supports aggregation on one column. Otherwise we need to do some special post-processing in spark-rapids to check if there are min/max_bys in aggs, gather them to their value column and concat the results back into the original agg result table, which would be much slower and harder to implement.

If we do go this route and decide this feature is necessary, we should also implement a max_by aggregation at the same time, for symmetry.

Will do if we go this route.

@bdice
Copy link
Contributor

bdice commented Jul 8, 2024

In terms of semantics, the proposed min_by would need to match the argmin + gather implementation exactly. In libcudf, null and NaN control are handled by separate arguments. For example, the collect set aggregation (https://docs.rapids.ai/api/libcudf/stable/group__aggregation__factories#gaebe680a414f3c942a631f609bcfb5781) accepts null_policy, null_equality, and nan_equality arguments. This is a better route to address the desired semantics if you can express it. There are also some examples of ordering policies in libcudf, like null_order.

We need to work on some improvements to argmin and argmax anyway, so this would be a good joint project for us. Currently we have “experimental” row comparators that we use for everything except for argmin calls, and we need to adopt those to expand support for nested list and struct columns. I can dig up the issue where this is discussed.

Signed-off-by: Haoyang Li <[email protected]>
@thirtiseven thirtiseven marked this pull request as draft July 9, 2024 08:26
@bdice
Copy link
Contributor

bdice commented Jul 9, 2024

@thirtiseven Here is the other issue I was thinking of: #14412 (comment)

@thirtiseven
Copy link
Contributor Author

thirtiseven commented Jul 10, 2024

In terms of semantics, the proposed min_by would need to match the argmin + gather implementation exactly. In libcudf, null and NaN control are handled by separate arguments. For example, the collect set aggregation (https://docs.rapids.ai/api/libcudf/stable/group__aggregation__factories#gaebe680a414f3c942a631f609bcfb5781) accepts null_policy, null_equality, and nan_equality arguments. This is a better route to address the desired semantics if you can express it. There are also some examples of ordering policies in libcudf, like null_order.

That sounds good, adding those arguments seems to work. I will try to do this, maybe in another pr just for argmin/argmax and make min/max_by just: 1. unpack the struct column. 2. call argmin/argmax with different arguments. 3. do a gather with the struct column. Does this make sense to you?

We need to work on some improvements to argmin and argmax anyway, so this would be a good joint project for us. Currently we have “experimental” row comparators that we use for everything except for argmin calls, and we need to adopt those to expand support for nested list and struct columns. I can dig up the issue where this is discussed.
@thirtiseven Here is the other issue I was thinking of: #14412 (comment)

Thank you! I was wondering how to write a hash-based implementation for min_by since the value will always be a struct (but no idea).
Currently for spark-rapids the sort-based way is good enough according to perf tests. We'd love to write a hash-based min/max_by as the next step after argmin/max supports nested types with row comparators.

Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
@github-actions github-actions bot added Python Affects Python cuDF API. cudf.pandas Issues specific to cudf.pandas cudf.polars Issues specific to cudf.polars pylibcudf Issues specific to the pylibcudf package labels Jul 11, 2024
Signed-off-by: Haoyang Li <[email protected]>
@github-actions github-actions bot removed cudf.pandas Issues specific to cudf.pandas cudf.polars Issues specific to cudf.polars labels Jul 11, 2024
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
@github-actions github-actions bot removed Python Affects Python cuDF API. pylibcudf Issues specific to the pylibcudf package labels Jul 11, 2024
Signed-off-by: Haoyang Li <[email protected]>
Signed-off-by: Haoyang Li <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CMake CMake build issue feature request New feature or request Java Affects Java cuDF API. libcudf Affects libcudf (C++/CUDA) code. non-breaking Non-breaking change
Projects
Status: In Progress
Development

Successfully merging this pull request may close these issues.

[FEA] Add min_by aggregate support
3 participants