Skip to content

Commit 7261909

Browse files
authored
Minor: Add a parquet row_filter test, reduce some test boiler plate (#7522)
* Add single row filter test * DRI * Update parquet/src/arrow/async_reader/mod.rs * Improve comments for other tests
1 parent d6a2351 commit 7261909

File tree

1 file changed

+95
-75
lines changed
  • parquet/src/arrow/async_reader

1 file changed

+95
-75
lines changed

parquet/src/arrow/async_reader/mod.rs

Lines changed: 95 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -1135,6 +1135,16 @@ mod tests {
11351135
requests: Arc<Mutex<Vec<Range<usize>>>>,
11361136
}
11371137

1138+
impl TestReader {
1139+
fn new(data: Bytes) -> Self {
1140+
Self {
1141+
data,
1142+
metadata: Default::default(),
1143+
requests: Default::default(),
1144+
}
1145+
}
1146+
}
1147+
11381148
impl AsyncFileReader for TestReader {
11391149
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, Result<Bytes>> {
11401150
let range = range.clone();
@@ -1167,11 +1177,7 @@ mod tests {
11671177
let path = format!("{testdata}/alltypes_plain.parquet");
11681178
let data = Bytes::from(std::fs::read(path).unwrap());
11691179

1170-
let async_reader = TestReader {
1171-
data: data.clone(),
1172-
metadata: Default::default(),
1173-
requests: Default::default(),
1174-
};
1180+
let async_reader = TestReader::new(data.clone());
11751181

11761182
let requests = async_reader.requests.clone();
11771183
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
@@ -1220,11 +1226,7 @@ mod tests {
12201226
let path = format!("{testdata}/alltypes_plain.parquet");
12211227
let data = Bytes::from(std::fs::read(path).unwrap());
12221228

1223-
let async_reader = TestReader {
1224-
data: data.clone(),
1225-
metadata: Default::default(),
1226-
requests: Default::default(),
1227-
};
1229+
let async_reader = TestReader::new(data.clone());
12281230

12291231
let requests = async_reader.requests.clone();
12301232
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
@@ -1281,11 +1283,7 @@ mod tests {
12811283
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
12821284
let data = Bytes::from(std::fs::read(path).unwrap());
12831285

1284-
let async_reader = TestReader {
1285-
data: data.clone(),
1286-
metadata: Default::default(),
1287-
requests: Default::default(),
1288-
};
1286+
let async_reader = TestReader::new(data.clone());
12891287

12901288
let options = ArrowReaderOptions::new().with_page_index(true);
12911289
let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
@@ -1350,11 +1348,7 @@ mod tests {
13501348

13511349
assert_eq!(metadata.num_row_groups(), 1);
13521350

1353-
let async_reader = TestReader {
1354-
data: data.clone(),
1355-
metadata: Default::default(),
1356-
requests: Default::default(),
1357-
};
1351+
let async_reader = TestReader::new(data.clone());
13581352

13591353
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
13601354
.await
@@ -1391,11 +1385,7 @@ mod tests {
13911385
let path = format!("{testdata}/alltypes_tiny_pages_plain.parquet");
13921386
let data = Bytes::from(std::fs::read(path).unwrap());
13931387

1394-
let async_reader = TestReader {
1395-
data: data.clone(),
1396-
metadata: Default::default(),
1397-
requests: Default::default(),
1398-
};
1388+
let async_reader = TestReader::new(data.clone());
13991389

14001390
let options = ArrowReaderOptions::new().with_page_index(true);
14011391
let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
@@ -1469,11 +1459,7 @@ mod tests {
14691459

14701460
let selection = RowSelection::from(selectors);
14711461

1472-
let async_reader = TestReader {
1473-
data: data.clone(),
1474-
metadata: Default::default(),
1475-
requests: Default::default(),
1476-
};
1462+
let async_reader = TestReader::new(data.clone());
14771463

14781464
let options = ArrowReaderOptions::new().with_page_index(true);
14791465
let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
@@ -1535,11 +1521,7 @@ mod tests {
15351521

15361522
let selection = RowSelection::from(selectors);
15371523

1538-
let async_reader = TestReader {
1539-
data: data.clone(),
1540-
metadata: Default::default(),
1541-
requests: Default::default(),
1542-
};
1524+
let async_reader = TestReader::new(data.clone());
15431525

15441526
let options = ArrowReaderOptions::new().with_page_index(true);
15451527
let builder = ParquetRecordBatchStreamBuilder::new_with_options(async_reader, options)
@@ -1566,6 +1548,70 @@ mod tests {
15661548

15671549
#[tokio::test]
15681550
async fn test_row_filter() {
1551+
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
1552+
let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
1553+
let data = RecordBatch::try_from_iter([
1554+
("a", Arc::new(a) as ArrayRef),
1555+
("b", Arc::new(b) as ArrayRef),
1556+
])
1557+
.unwrap();
1558+
1559+
let mut buf = Vec::with_capacity(1024);
1560+
let mut writer = ArrowWriter::try_new(&mut buf, data.schema(), None).unwrap();
1561+
writer.write(&data).unwrap();
1562+
writer.close().unwrap();
1563+
1564+
let data: Bytes = buf.into();
1565+
let metadata = ParquetMetaDataReader::new()
1566+
.parse_and_finish(&data)
1567+
.unwrap();
1568+
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
1569+
1570+
let test = TestReader::new(data);
1571+
let requests = test.requests.clone();
1572+
1573+
let a_scalar = StringArray::from_iter_values(["b"]);
1574+
let a_filter = ArrowPredicateFn::new(
1575+
ProjectionMask::leaves(&parquet_schema, vec![0]),
1576+
move |batch| eq(batch.column(0), &Scalar::new(&a_scalar)),
1577+
);
1578+
1579+
let filter = RowFilter::new(vec![Box::new(a_filter)]);
1580+
1581+
let mask = ProjectionMask::leaves(&parquet_schema, vec![0, 1]);
1582+
let stream = ParquetRecordBatchStreamBuilder::new(test)
1583+
.await
1584+
.unwrap()
1585+
.with_projection(mask.clone())
1586+
.with_batch_size(1024)
1587+
.with_row_filter(filter)
1588+
.build()
1589+
.unwrap();
1590+
1591+
let batches: Vec<_> = stream.try_collect().await.unwrap();
1592+
assert_eq!(batches.len(), 1);
1593+
1594+
let batch = &batches[0];
1595+
assert_eq!(batch.num_columns(), 2);
1596+
1597+
// Filter should have kept only rows with "b" in column 0
1598+
assert_eq!(
1599+
batch.column(0).as_ref(),
1600+
&StringArray::from_iter_values(["b", "b", "b"])
1601+
);
1602+
assert_eq!(
1603+
batch.column(1).as_ref(),
1604+
&StringArray::from_iter_values(["2", "3", "4"])
1605+
);
1606+
1607+
// Should only have made 2 requests:
1608+
// * First request fetches data for evaluating the predicate
1609+
// * Second request fetches data for evaluating the projection
1610+
assert_eq!(requests.lock().unwrap().len(), 2);
1611+
}
1612+
1613+
#[tokio::test]
1614+
async fn test_two_row_filters() {
15691615
let a = StringArray::from_iter_values(["a", "b", "b", "b", "c", "c"]);
15701616
let b = StringArray::from_iter_values(["1", "2", "3", "4", "5", "6"]);
15711617
let c = Int32Array::from_iter(0..6);
@@ -1587,11 +1633,7 @@ mod tests {
15871633
.unwrap();
15881634
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
15891635

1590-
let test = TestReader {
1591-
data,
1592-
metadata: Default::default(),
1593-
requests: Default::default(),
1594-
};
1636+
let test = TestReader::new(data);
15951637
let requests = test.requests.clone();
15961638

15971639
let a_scalar = StringArray::from_iter_values(["b"]);
@@ -1634,6 +1676,9 @@ mod tests {
16341676
assert_eq!(val, 3);
16351677

16361678
// Should only have made 3 requests
1679+
// * First request fetches data for evaluating the first predicate
1680+
// * Second request fetches data for evaluating the second predicate
1681+
// * Third request fetches data for evaluating the projection
16371682
assert_eq!(requests.lock().unwrap().len(), 3);
16381683
}
16391684

@@ -1664,11 +1709,7 @@ mod tests {
16641709

16651710
assert_eq!(metadata.num_row_groups(), 2);
16661711

1667-
let test = TestReader {
1668-
data,
1669-
metadata: Default::default(),
1670-
requests: Default::default(),
1671-
};
1712+
let test = TestReader::new(data);
16721713

16731714
let stream = ParquetRecordBatchStreamBuilder::new(test.clone())
16741715
.await
@@ -1755,11 +1796,7 @@ mod tests {
17551796

17561797
assert_eq!(metadata.num_row_groups(), 1);
17571798

1758-
let async_reader = TestReader {
1759-
data: data.clone(),
1760-
metadata: Default::default(),
1761-
requests: Default::default(),
1762-
};
1799+
let async_reader = TestReader::new(data.clone());
17631800

17641801
let a_filter =
17651802
ArrowPredicateFn::new(ProjectionMask::leaves(&parquet_schema, vec![1]), |batch| {
@@ -1823,11 +1860,7 @@ mod tests {
18231860

18241861
assert_eq!(metadata.num_row_groups(), 1);
18251862

1826-
let async_reader = TestReader {
1827-
data: data.clone(),
1828-
metadata: Default::default(),
1829-
requests: Default::default(),
1830-
};
1863+
let async_reader = TestReader::new(data.clone());
18311864

18321865
let requests = async_reader.requests.clone();
18331866
let (_, fields) = parquet_to_arrow_schema_and_fields(
@@ -1893,11 +1926,7 @@ mod tests {
18931926
let path = format!("{testdata}/alltypes_plain.parquet");
18941927
let data = Bytes::from(std::fs::read(path).unwrap());
18951928

1896-
let async_reader = TestReader {
1897-
data: data.clone(),
1898-
metadata: Default::default(),
1899-
requests: Default::default(),
1900-
};
1929+
let async_reader = TestReader::new(data.clone());
19011930

19021931
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
19031932
.await
@@ -2036,11 +2065,7 @@ mod tests {
20362065
let testdata = arrow::util::test_util::parquet_test_data();
20372066
let path = format!("{testdata}/data_index_bloom_encoding_stats.parquet");
20382067
let data = Bytes::from(std::fs::read(path).unwrap());
2039-
let async_reader = TestReader {
2040-
data: data.clone(),
2041-
metadata: Default::default(),
2042-
requests: Default::default(),
2043-
};
2068+
let async_reader = TestReader::new(data.clone());
20442069
let builder = ParquetRecordBatchStreamBuilder::new(async_reader)
20452070
.await
20462071
.unwrap();
@@ -2063,11 +2088,7 @@ mod tests {
20632088
}
20642089

20652090
async fn test_get_row_group_column_bloom_filter(data: Bytes, with_length: bool) {
2066-
let async_reader = TestReader {
2067-
data: data.clone(),
2068-
metadata: Default::default(),
2069-
requests: Default::default(),
2070-
};
2091+
let async_reader = TestReader::new(data.clone());
20712092

20722093
let mut builder = ParquetRecordBatchStreamBuilder::new(async_reader)
20732094
.await
@@ -2206,11 +2227,7 @@ mod tests {
22062227
.unwrap();
22072228
let parquet_schema = metadata.file_metadata().schema_descr_ptr();
22082229

2209-
let test = TestReader {
2210-
data,
2211-
metadata: Default::default(),
2212-
requests: Default::default(),
2213-
};
2230+
let test = TestReader::new(data);
22142231
let requests = test.requests.clone();
22152232

22162233
let a_scalar = StringArray::from_iter_values(["b"]);
@@ -2261,6 +2278,9 @@ mod tests {
22612278
assert_eq!(val, 3);
22622279

22632280
// Should only have made 3 requests
2281+
// * First request fetches data for evaluating the first predicate
2282+
// * Second request fetches data for evaluating the second predicate
2283+
// * Third request fetches data for evaluating the projection
22642284
assert_eq!(requests.lock().unwrap().len(), 3);
22652285
}
22662286

0 commit comments

Comments
 (0)