Skip to content

feat: Set/cancel with job tag and make max broadcast table size configurable #1693

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

wForget
Copy link
Member

@wForget wForget commented Apr 29, 2025

Which issue does this PR close?

Closes #1692, #1691.

Rationale for this change

Improve CometBroadcastExchangeExec:

  • Make max broadcast table size configurable
  • Set/cancel with job tag on spark 3.5+

What changes are included in this PR?

How are these changes tested?

added maxBroadcastTableSize test

@wForget wForget marked this pull request as draft April 29, 2025 08:53
@wForget wForget marked this pull request as ready for review April 30, 2025 08:33
@codecov-commenter
Copy link

codecov-commenter commented Apr 30, 2025

Codecov Report

Attention: Patch coverage is 80.00000% with 1 line in your changes missing coverage. Please review.

Project coverage is 58.72%. Comparing base (f09f8af) to head (b1f6ff7).
Report is 173 commits behind head on main.

Files with missing lines Patch % Lines
...e/spark/sql/comet/CometBroadcastExchangeExec.scala 80.00% 1 Missing ⚠️
Additional details and impacted files
@@             Coverage Diff              @@
##               main    #1693      +/-   ##
============================================
+ Coverage     56.12%   58.72%   +2.59%     
- Complexity      976     1083     +107     
============================================
  Files           119      126       +7     
  Lines         11743    12565     +822     
  Branches       2251     2361     +110     
============================================
+ Hits           6591     7379     +788     
- Misses         4012     4014       +2     
- Partials       1140     1172      +32     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

}

def maxBroadcastTableBytes(conf: SQLConf): Long = {
JavaUtils.byteStringAsBytes(conf.getConfString(SPARK_MAX_BROADCAST_TABLE_SIZE, "8GB"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conf is only available since Spark 4.1, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conf is only available since Spark 4.1, right?

Yes, but we can port it to any version. For the native engine, we may configure a smaller heap memory, which makes broadcast more likely to cause OOM. I think we can use this configuration to limit the maximum bytes of broadcast.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see it being back-ported to the 3.x branch, but as long as this builds against older releases and doesn't throw an exception that the conf could not be found, we are ok.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

feat: Set/cancel with job tag for CometBroadcastExchangeExec
3 participants