Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement tree explain for ArrowFileSink #15206

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions datafusion/core/src/datasource/file_format/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ impl DisplayAs for ArrowFileSink {
write!(f, ")")
}
DisplayFormatType::TreeRender => {
// TODO: collect info
write!(f, "")
writeln!(f, "format: arrow")?;
write!(f, "file={}", &self.config.original_url)
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1373,6 +1373,7 @@ mod tests {
let object_store_url = ObjectStoreUrl::local_filesystem();

let file_sink_config = FileSinkConfig {
original_url: String::default(),
object_store_url: object_store_url.clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse(table_path)?],
Expand Down Expand Up @@ -1458,6 +1459,7 @@ mod tests {

// set file config to include partitioning on field_a
let file_sink_config = FileSinkConfig {
original_url: String::default(),
object_store_url: object_store_url.clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
Expand Down Expand Up @@ -1541,6 +1543,7 @@ mod tests {
let object_store_url = ObjectStoreUrl::local_filesystem();

let file_sink_config = FileSinkConfig {
original_url: String::default(),
object_store_url: object_store_url.clone(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1037,6 +1037,7 @@ impl TableProvider for ListingTable {

// Sink related option, apart from format
let config = FileSinkConfig {
original_url: String::default(),
object_store_url: self.table_paths()[0].object_store(),
table_paths: self.table_paths().clone(),
file_groups,
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,7 @@ impl DefaultPhysicalPlanner {
partition_by,
options: source_option_tuples,
}) => {
let original_url = output_url.clone();
let input_exec = children.one()?;
let parsed_url = ListingTableUrl::parse(output_url)?;
let object_store_url = parsed_url.object_store();
Expand Down Expand Up @@ -531,6 +532,7 @@ impl DefaultPhysicalPlanner {

// Set file sink related options
let config = FileSinkConfig {
original_url,
object_store_url,
table_paths: vec![parsed_url],
file_groups: vec![],
Expand Down
6 changes: 2 additions & 4 deletions datafusion/datasource-csv/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -666,10 +666,8 @@ impl DisplayAs for CsvSink {
write!(f, ")")
}
DisplayFormatType::TreeRender => {
if !self.config.file_groups.is_empty() {
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
}
Ok(())
writeln!(f, "format: csv")?;
write!(f, "file={}", &self.config.original_url)
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions datafusion/datasource-json/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,8 @@ impl DisplayAs for JsonSink {
write!(f, ")")
}
DisplayFormatType::TreeRender => {
if !self.config.file_groups.is_empty() {
FileGroupDisplay(&self.config.file_groups).fmt_as(t, f)?;
}
Ok(())
writeln!(f, "format: json")?;
write!(f, "file={}", &self.config.original_url)
}
}
}
Expand Down
2 changes: 2 additions & 0 deletions datafusion/datasource/src/file_sink_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ pub trait FileSink: DataSink {
/// The base configurations to provide when creating a physical plan for
/// writing to any given file format.
pub struct FileSinkConfig {
/// The unresolved URL specified by the user
pub original_url: String,
/// Object store URL, used to get an ObjectStore instance
pub object_store_url: ObjectStoreUrl,
/// A vector of [`PartitionedFile`] structs, each representing a file partition
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto/src/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,7 @@ impl TryFrom<&protobuf::FileSinkConfig> for FileSinkConfig {
protobuf::InsertOp::Replace => InsertOp::Replace,
};
Ok(Self {
original_url: String::default(),
object_store_url: ObjectStoreUrl::parse(&conf.object_store_url)?,
file_groups,
table_paths,
Expand Down
3 changes: 3 additions & 0 deletions datafusion/proto/tests/cases/roundtrip_physical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,7 @@ fn roundtrip_json_sink() -> Result<()> {
let input = Arc::new(PlaceholderRowExec::new(schema.clone()));

let file_sink_config = FileSinkConfig {
original_url: String::default(),
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
Expand Down Expand Up @@ -1317,6 +1318,7 @@ fn roundtrip_csv_sink() -> Result<()> {
let input = Arc::new(PlaceholderRowExec::new(schema.clone()));

let file_sink_config = FileSinkConfig {
original_url: String::default(),
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
Expand Down Expand Up @@ -1372,6 +1374,7 @@ fn roundtrip_parquet_sink() -> Result<()> {
let input = Arc::new(PlaceholderRowExec::new(schema.clone()));

let file_sink_config = FileSinkConfig {
original_url: String::default(),
object_store_url: ObjectStoreUrl::local_filesystem(),
file_groups: vec![PartitionedFile::new("/tmp".to_string(), 1)],
table_paths: vec![ListingTableUrl::parse("file:///")?],
Expand Down
70 changes: 54 additions & 16 deletions datafusion/sqllogictest/test_files/explain_tree.slt
Original file line number Diff line number Diff line change
Expand Up @@ -1716,14 +1716,21 @@ TO 'test_files/scratch/explain_tree/1.json';
physical_plan
01)┌───────────────────────────┐
02)│ DataSinkExec │
03)└─────────────┬─────────────┘
04)┌─────────────┴─────────────┐
05)│ DataSourceExec │
06)│ -------------------- │
07)│ bytes: 2672 │
08)│ format: memory │
09)│ rows: 1 │
10)└───────────────────────────┘
03)│ -------------------- │
04)│ file: │
05)│ test_files/scratch │
06)│ /explain_tree/1 │
07)│ .json │
08)│ │
09)│ format: json │
10)└─────────────┬─────────────┘
11)┌─────────────┴─────────────┐
12)│ DataSourceExec │
13)│ -------------------- │
14)│ bytes: 2672 │
15)│ format: memory │
16)│ rows: 1 │
17)└───────────────────────────┘

query TT
explain COPY (VALUES (1, 'foo', 1, '2023-01-01'), (2, 'bar', 2, '2023-01-02'), (3, 'baz', 3, '2023-01-03'))
Expand All @@ -1732,14 +1739,45 @@ TO 'test_files/scratch/explain_tree/2.csv';
physical_plan
01)┌───────────────────────────┐
02)│ DataSinkExec │
03)└─────────────┬─────────────┘
04)┌─────────────┴─────────────┐
05)│ DataSourceExec │
06)│ -------------------- │
07)│ bytes: 2672 │
08)│ format: memory │
09)│ rows: 1 │
10)└───────────────────────────┘
03)│ -------------------- │
04)│ file: │
05)│ test_files/scratch │
06)│ /explain_tree/2 │
07)│ .csv │
08)│ │
09)│ format: csv │
10)└─────────────┬─────────────┘
11)┌─────────────┴─────────────┐
12)│ DataSourceExec │
13)│ -------------------- │
14)│ bytes: 2672 │
15)│ format: memory │
16)│ rows: 1 │
17)└───────────────────────────┘

query TT
explain COPY (VALUES (1, 'foo', 1, '2023-01-01'), (2, 'bar', 2, '2023-01-02'), (3, 'baz', 3, '2023-01-03'))
TO 'test_files/scratch/explain_tree/3.arrow';
----
physical_plan
01)┌───────────────────────────┐
02)│ DataSinkExec │
03)│ -------------------- │
04)│ file: │
05)│ test_files/scratch │
06)│ /explain_tree/3 │
07)│ .arrow │
08)│ │
09)│ format: arrow │
10)└─────────────┬─────────────┘
11)┌─────────────┴─────────────┐
12)│ DataSourceExec │
13)│ -------------------- │
14)│ bytes: 2672 │
15)│ format: memory │
16)│ rows: 1 │
17)└───────────────────────────┘


# Test explain tree rendering for CoalesceBatchesExec with limit
statement ok
Expand Down