-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add datafusion-spark
crate
#15168
base: main
Are you sure you want to change the base?
Conversation
specific language governing permissions and limitations | ||
under the License. | ||
--> | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing instructions here
# datafusion-spark: Spark-compatible Expressions | ||
|
||
This crate provides Apache Spark-compatible expressions for use with DataFusion. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Testing instructions here
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example test here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The expm1
probably works identically across all Spark versions and is not affected by different configuration settings, but many expressions are affected by settings such as ANSI mode and different date/time formats and timezones.
How do you think we should handle these different cases with this test approach?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A related question - There will be some functions that we can support with 100% compatibility and some that we cannot. It would be good to think about how we express that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for all the questions, but I am really excited about this 😄 ... what would be involved in being able to run these same sqllogic test files in Spark (either in CI or manually locally) to confirm same/similar results
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are great questions and really good things to discuss. I'm about to go into a meeting but I have a bunch of thoughts that I'll share afterwards in a couple of hours.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No rush!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
expm1
probably works identically across all Spark versions and is not affected by different configuration settings, but many expressions are affected by settings such as ANSI mode and different date/time formats and timezones.How do you think we should handle these different cases with this test approach?
In the Sail code base, auxiliary information is passed into new()
and stored within the struct. For example:
https://github.com/lakehq/sail/blob/be54362ce8bd79bfb3e55214c5a1b80c2c3d2492/crates/sail-plan/src/extension/function/datetime/timestamp_now.rs#L9-L32
#[derive(Debug)]
pub struct TimestampNow {
signature: Signature,
timezone: Arc<str>,
time_unit: TimeUnit,
}
impl TimestampNow {
pub fn new(timezone: Arc<str>, time_unit: TimeUnit) -> Self {
Self {
signature: Signature::nullary(Volatility::Stable),
timezone,
time_unit,
}
}
pub fn timezone(&self) -> &str {
&self.timezone
}
pub fn time_unit(&self) -> &TimeUnit {
&self.time_unit
}
}
And then in our PhysicalExtensionCodec
we can do the following:
https://github.com/lakehq/sail/blob/be54362ce8bd79bfb3e55214c5a1b80c2c3d2492/crates/sail-execution/src/codec.rs#L946-L953
if let Some(func) = node.inner().as_any().downcast_ref::<TimestampNow>() {
let timezone = func.timezone().to_string();
let time_unit: gen_datafusion_common::TimeUnit = func.time_unit().into();
let time_unit = time_unit.as_str_name().to_string();
UdfKind::TimestampNow(gen::TimestampNowUdf {
timezone,
time_unit,
})
If we decide to not use sqllogictest (per #15168 (comment)) then we will have no problem testing UDFs with auxiliary information. There are already tests in DataFusion core for this type of pattern as well:
datafusion/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
Lines 1265 to 1304 in 8061485
async fn test_parameterized_scalar_udf() -> Result<()> { | |
let batch = RecordBatch::try_from_iter([( | |
"text", | |
Arc::new(StringArray::from(vec!["foo", "bar", "foobar", "barfoo"])) as ArrayRef, | |
)])?; | |
let ctx = SessionContext::new(); | |
ctx.register_batch("t", batch)?; | |
let t = ctx.table("t").await?; | |
let foo_udf = ScalarUDF::from(MyRegexUdf::new("fo{2}")); | |
let bar_udf = ScalarUDF::from(MyRegexUdf::new("[Bb]ar")); | |
let plan = LogicalPlanBuilder::from(t.into_optimized_plan()?) | |
.filter( | |
foo_udf | |
.call(vec![col("text")]) | |
.and(bar_udf.call(vec![col("text")])), | |
)? | |
.filter(col("text").is_not_null())? | |
.build()?; | |
assert_eq!( | |
format!("{plan}"), | |
"Filter: t.text IS NOT NULL\n Filter: regex_udf(t.text) AND regex_udf(t.text)\n TableScan: t projection=[text]" | |
); | |
let actual = DataFrame::new(ctx.state(), plan).collect().await?; | |
let expected = [ | |
"+--------+", | |
"| text |", | |
"+--------+", | |
"| foobar |", | |
"| barfoo |", | |
"+--------+", | |
]; | |
assert_batches_eq!(expected, &actual); | |
ctx.deregister_table("t")?; | |
Ok(()) | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A related question - There will be some functions that we can support with 100% compatibility and some that we cannot. It would be good to think about how we express that.
Throw errors when possible, and provide documentation. Depending on how shaky the compatibility for the function is, we may want to avoid implementing it altogether.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
... what would be involved in being able to run these same sqllogic test files in Spark (either in CI or manually locally) to confirm same/similar results
One option:
- A Python script to automatically generate Spark SQL function test cases and their results using PySpark.
- A README for developers explaining how to run the script and commit the test cases.
- An optional CI workflow to verify the correctness of the test cases' ground truth on demand.
Separate topic... Do you have ideas about fuzzy testing and its suitability in DataFusion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is my suggestion.
- Write a Python script that generates interesting test data (probably in Parquet format) with edge cases using PySpark
- Create files containing SQL queries that operate on these test files
- Write a Python script to run those queries via PySpark and write results out to file
- Write a Rust script to run those queries using datafusion-spark and write results out to file
- Write a script that can compare the Spark and datafusion-spark output and report on any differences
# KIND, either express or implied. See the License for the | ||
# specific language governing permissions and limitations | ||
# under the License. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example test here
}; | ||
} | ||
|
||
#[test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example invoke
test here. Direct invocation tests should only be used to verify that the function is correctly implemented. Further tests are required in the sqllogictest
crate (examples for ascii
can be found in this PR).
}; | ||
} | ||
|
||
#[test] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Example invoke
test here. Direct invocation tests should only be used to verify that the function is correctly implemented. Further tests are required in the sqllogictest
crate (examples for expm1
can be found in this PR).
} | ||
|
||
fn name(&self) -> &str { | ||
"spark_ascii" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefix with spark_
because sqllogictest
evaluates both implementations of ascii
.
} | ||
|
||
fn name(&self) -> &str { | ||
"spark_expm1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prefix with spark_
because sqllogictest
may evaluate more than one implementation of expm1
.
SELECT spark_expm1(1::INT); | ||
---- | ||
1.718281828459 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried running this in Spark 3.5.3 and did not get the same answer.
scala> spark.sql("select expm1(1)").show()
+-----------------+
| EXPM1(1)|
+-----------------+
|1.718281828459045|
+-----------------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@andygrove I believe sqllogic is truncating the answer.
I initially had this test, which tested for the value 1.7182818284590453
(slightly more precise than your result) but removed it because cargo test (amd64)
was giving the value 1.718281828459045
(https://github.com/apache/datafusion/actions/runs/13825914914/job/38680868216) while the rest of the cargo tests on different architectures were passing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does sqllogictest have a way to test floating point results within some tolerance?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't look like it:
Remember, the purpose of sqllogictest is to validate the logic behind the evaluation of SQL statements, not the ability to handle extreme values. So keep content in a reasonable range: small integers, short strings, and floating point numbers that use only the most significant bits of an a 32-bit IEEE float.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't have to stick with sqllogictest.
We can create test helpers similar to:
https://github.com/lakehq/datafusion/blob/d78877a55c5e835a07a7ebf23a7bd515faf7d827/datafusion/optimizer/src/analyzer/type_coercion.rs#L2137-L2208
The above link is from an old PR that didn't end up getting merged in, but the general idea seems useful here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol
query R
SELECT spark_expm1(1::INT);
----
1.718281828459
query T
SELECT spark_expm1(1::INT)::STRING;
----
1.7182818284590453
1.718281828459 | ||
|
||
query R | ||
SELECT spark_expm1(a) FROM (VALUES (0::INT), (1::INT)) AS t(a); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd suggest adding tests for a wider range of values and edge cases, such as negative numbers, large positive and negative numbers, NaN, null, and so on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, will do!
50 | ||
|
||
query I | ||
SELECT spark_ascii(a) FROM (VALUES ('Spark'), ('PySpark'), ('Pandas API')) AS t(a); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add tests for edge cases?
Some ideas from ChatGPT (the results are from me actually running them in Spark):
scala> spark.sql("select ascii('😀')").show()
+---------+
|ascii(😀)|
+---------+
| 128512|
+---------+
scala> spark.sql("select ascii('\n')").show()
+---------+
|ascii(\n)|
+---------+
| 10|
+---------+
scala> spark.sql("select ascii('\t')").show()
+---------+
|ascii(\t)|
+---------+
| 9|
+---------+
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, will do!
#[cfg(feature = "spark")] | ||
functions.append(&mut datafusion_spark::all_default_scalar_functions()); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong opinion on this, but I suspect many DataFusion users may not be keen on having Spark-specific work affecting the core.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have a strong opinion on this either. I'm open to alternatives.
Which issue does this PR close?
Closes #5600
Rationale for this change
See discussion in #5600
TL;DR Many projects want Spark-compatible expressions for use with DataFusion. There are some in Comet and there are some in the Sail project.
What changes are included in this PR?
Adding Spark crate.
Are these changes tested?
Are there any user-facing changes?