Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add datafusion-spark crate #15168

Open
wants to merge 13 commits into
base: main
Choose a base branch
from

Conversation

shehabgamin
Copy link
Contributor

@shehabgamin shehabgamin commented Mar 11, 2025

Which issue does this PR close?

Closes #5600

Rationale for this change

See discussion in #5600

TL;DR Many projects want Spark-compatible expressions for use with DataFusion. There are some in Comet and there are some in the Sail project.

What changes are included in this PR?

Adding Spark crate.

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the functions Changes to functions implementation label Mar 11, 2025
@shehabgamin shehabgamin marked this pull request as ready for review March 13, 2025 02:15
@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Mar 13, 2025
specific language governing permissions and limitations
under the License.
-->

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing instructions here

# datafusion-spark: Spark-compatible Expressions

This crate provides Apache Spark-compatible expressions for use with DataFusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Testing instructions here

# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example test here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expm1 probably works identically across all Spark versions and is not affected by different configuration settings, but many expressions are affected by settings such as ANSI mode and different date/time formats and timezones.

How do you think we should handle these different cases with this test approach?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A related question - There will be some functions that we can support with 100% compatibility and some that we cannot. It would be good to think about how we express that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for all the questions, but I am really excited about this 😄 ... what would be involved in being able to run these same sqllogic test files in Spark (either in CI or manually locally) to confirm same/similar results

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are great questions and really good things to discuss. I'm about to go into a meeting but I have a bunch of thoughts that I'll share afterwards in a couple of hours.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No rush!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expm1 probably works identically across all Spark versions and is not affected by different configuration settings, but many expressions are affected by settings such as ANSI mode and different date/time formats and timezones.

How do you think we should handle these different cases with this test approach?

In the Sail code base, auxiliary information is passed into new() and stored within the struct. For example:
https://github.com/lakehq/sail/blob/be54362ce8bd79bfb3e55214c5a1b80c2c3d2492/crates/sail-plan/src/extension/function/datetime/timestamp_now.rs#L9-L32

#[derive(Debug)]
pub struct TimestampNow {
    signature: Signature,
    timezone: Arc<str>,
    time_unit: TimeUnit,
}

impl TimestampNow {
    pub fn new(timezone: Arc<str>, time_unit: TimeUnit) -> Self {
        Self {
            signature: Signature::nullary(Volatility::Stable),
            timezone,
            time_unit,
        }
    }

    pub fn timezone(&self) -> &str {
        &self.timezone
    }

    pub fn time_unit(&self) -> &TimeUnit {
        &self.time_unit
    }
}

And then in our PhysicalExtensionCodec we can do the following:
https://github.com/lakehq/sail/blob/be54362ce8bd79bfb3e55214c5a1b80c2c3d2492/crates/sail-execution/src/codec.rs#L946-L953

if let Some(func) = node.inner().as_any().downcast_ref::<TimestampNow>() {
            let timezone = func.timezone().to_string();
            let time_unit: gen_datafusion_common::TimeUnit = func.time_unit().into();
            let time_unit = time_unit.as_str_name().to_string();
            UdfKind::TimestampNow(gen::TimestampNowUdf {
                timezone,
                time_unit,
            })

If we decide to not use sqllogictest (per #15168 (comment)) then we will have no problem testing UDFs with auxiliary information. There are already tests in DataFusion core for this type of pattern as well:

async fn test_parameterized_scalar_udf() -> Result<()> {
let batch = RecordBatch::try_from_iter([(
"text",
Arc::new(StringArray::from(vec!["foo", "bar", "foobar", "barfoo"])) as ArrayRef,
)])?;
let ctx = SessionContext::new();
ctx.register_batch("t", batch)?;
let t = ctx.table("t").await?;
let foo_udf = ScalarUDF::from(MyRegexUdf::new("fo{2}"));
let bar_udf = ScalarUDF::from(MyRegexUdf::new("[Bb]ar"));
let plan = LogicalPlanBuilder::from(t.into_optimized_plan()?)
.filter(
foo_udf
.call(vec![col("text")])
.and(bar_udf.call(vec![col("text")])),
)?
.filter(col("text").is_not_null())?
.build()?;
assert_eq!(
format!("{plan}"),
"Filter: t.text IS NOT NULL\n Filter: regex_udf(t.text) AND regex_udf(t.text)\n TableScan: t projection=[text]"
);
let actual = DataFrame::new(ctx.state(), plan).collect().await?;
let expected = [
"+--------+",
"| text |",
"+--------+",
"| foobar |",
"| barfoo |",
"+--------+",
];
assert_batches_eq!(expected, &actual);
ctx.deregister_table("t")?;
Ok(())
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A related question - There will be some functions that we can support with 100% compatibility and some that we cannot. It would be good to think about how we express that.

Throw errors when possible, and provide documentation. Depending on how shaky the compatibility for the function is, we may want to avoid implementing it altogether.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... what would be involved in being able to run these same sqllogic test files in Spark (either in CI or manually locally) to confirm same/similar results

One option:

  1. A Python script to automatically generate Spark SQL function test cases and their results using PySpark.
  2. A README for developers explaining how to run the script and commit the test cases.
  3. An optional CI workflow to verify the correctness of the test cases' ground truth on demand.

Separate topic... Do you have ideas about fuzzy testing and its suitability in DataFusion?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is my suggestion.

  1. Write a Python script that generates interesting test data (probably in Parquet format) with edge cases using PySpark
  2. Create files containing SQL queries that operate on these test files
  3. Write a Python script to run those queries via PySpark and write results out to file
  4. Write a Rust script to run those queries using datafusion-spark and write results out to file
  5. Write a script that can compare the Spark and datafusion-spark output and report on any differences

# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example test here

};
}

#[test]
Copy link
Contributor Author

@shehabgamin shehabgamin Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example invoke test here. Direct invocation tests should only be used to verify that the function is correctly implemented. Further tests are required in the sqllogictest crate (examples for ascii can be found in this PR).

};
}

#[test]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Example invoke test here. Direct invocation tests should only be used to verify that the function is correctly implemented. Further tests are required in the sqllogictest crate (examples for expm1 can be found in this PR).

}

fn name(&self) -> &str {
"spark_ascii"
Copy link
Contributor Author

@shehabgamin shehabgamin Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefix with spark_ because sqllogictest evaluates both implementations of ascii.

}

fn name(&self) -> &str {
"spark_expm1"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefix with spark_ because sqllogictest may evaluate more than one implementation of expm1.

Comment on lines +24 to +26
SELECT spark_expm1(1::INT);
----
1.718281828459
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried running this in Spark 3.5.3 and did not get the same answer.

scala> spark.sql("select expm1(1)").show()
+-----------------+
|         EXPM1(1)|
+-----------------+
|1.718281828459045|
+-----------------+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andygrove I believe sqllogic is truncating the answer.

I initially had this test, which tested for the value 1.7182818284590453 (slightly more precise than your result) but removed it because cargo test (amd64) was giving the value 1.718281828459045 (https://github.com/apache/datafusion/actions/runs/13825914914/job/38680868216) while the rest of the cargo tests on different architectures were passing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does sqllogictest have a way to test floating point results within some tolerance?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't look like it:

Remember, the purpose of sqllogictest is to validate the logic behind the evaluation of SQL statements, not the ability to handle extreme values. So keep content in a reasonable range: small integers, short strings, and floating point numbers that use only the most significant bits of an a 32-bit IEEE float.

https://www.sqlite.org/sqllogictest/doc/trunk/about.wiki

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't have to stick with sqllogictest.

We can create test helpers similar to:
https://github.com/lakehq/datafusion/blob/d78877a55c5e835a07a7ebf23a7bd515faf7d827/datafusion/optimizer/src/analyzer/type_coercion.rs#L2137-L2208

The above link is from an old PR that didn't end up getting merged in, but the general idea seems useful here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol

query R
SELECT spark_expm1(1::INT);
----
1.718281828459

query T
SELECT spark_expm1(1::INT)::STRING;
----
1.7182818284590453

1.718281828459

query R
SELECT spark_expm1(a) FROM (VALUES (0::INT), (1::INT)) AS t(a);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest adding tests for a wider range of values and edge cases, such as negative numbers, large positive and negative numbers, NaN, null, and so on.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, will do!

50

query I
SELECT spark_ascii(a) FROM (VALUES ('Spark'), ('PySpark'), ('Pandas API')) AS t(a);
Copy link
Member

@andygrove andygrove Mar 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add tests for edge cases?

Some ideas from ChatGPT (the results are from me actually running them in Spark):

scala> spark.sql("select ascii('😀')").show()
+---------+                                                                     
|ascii(😀)|
+---------+
|   128512|
+---------+

scala> spark.sql("select ascii('\n')").show()
+---------+
|ascii(\n)|
+---------+
|       10|
+---------+


scala> spark.sql("select ascii('\t')").show()
+---------+
|ascii(\t)|
+---------+
|        9|
+---------+

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, will do!

Comment on lines +116 to +118
#[cfg(feature = "spark")]
functions.append(&mut datafusion_spark::all_default_scalar_functions());

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion on this, but I suspect many DataFusion users may not be keen on having Spark-specific work affecting the core.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have a strong opinion on this either. I'm open to alternatives.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate functions Changes to functions implementation sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[DISCUSSION] Add separate crate to cover spark builtin functions
2 participants