Skip to content

Commit 915e992

Browse files
authored
arrow-select: Implement concat for RunArrays (#7487)
* arrow-select: Implement concat for `RunArray`s It's unclear whether the added work and complexity of merging the last run between consecutive arrays with the first one of the next array being concatenanted is worth the potential benefit. For now this implementation focuses on the ability to concatenate `RunArray`s in the first place. * Address comments
1 parent 6a3ecef commit 915e992

File tree

2 files changed

+267
-1
lines changed

2 files changed

+267
-1
lines changed

arrow-array/src/cast.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,14 @@ pub trait AsArray: private::Sealed {
956956
self.as_dictionary_opt().expect("dictionary array")
957957
}
958958

959+
/// Downcast this to a [`RunArray`] returning `None` if not possible
960+
fn as_run_opt<K: RunEndIndexType>(&self) -> Option<&RunArray<K>>;
961+
962+
/// Downcast this to a [`RunArray`] panicking if not possible
963+
fn as_run<K: RunEndIndexType>(&self) -> &RunArray<K> {
964+
self.as_run_opt().expect("run array")
965+
}
966+
959967
/// Downcasts this to a [`AnyDictionaryArray`] returning `None` if not possible
960968
fn as_any_dictionary_opt(&self) -> Option<&dyn AnyDictionaryArray>;
961969

@@ -1015,6 +1023,10 @@ impl AsArray for dyn Array + '_ {
10151023
self.as_any().downcast_ref()
10161024
}
10171025

1026+
fn as_run_opt<K: RunEndIndexType>(&self) -> Option<&RunArray<K>> {
1027+
self.as_any().downcast_ref()
1028+
}
1029+
10181030
fn as_any_dictionary_opt(&self) -> Option<&dyn AnyDictionaryArray> {
10191031
let array = self;
10201032
downcast_dictionary_array! {
@@ -1077,6 +1089,14 @@ impl AsArray for ArrayRef {
10771089
fn as_any_dictionary_opt(&self) -> Option<&dyn AnyDictionaryArray> {
10781090
self.as_ref().as_any_dictionary_opt()
10791091
}
1092+
1093+
fn as_run_opt<K: RunEndIndexType>(&self) -> Option<&RunArray<K>> {
1094+
self.as_ref().as_run_opt()
1095+
}
1096+
1097+
fn as_string_opt<O: OffsetSizeTrait>(&self) -> Option<&GenericStringArray<O>> {
1098+
self.as_ref().as_string_opt()
1099+
}
10801100
}
10811101

10821102
#[cfg(test)]

arrow-select/src/concat.rs

Lines changed: 247 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@ use arrow_array::types::*;
3737
use arrow_array::*;
3838
use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer, OffsetBuffer};
3939
use arrow_data::transform::{Capacities, MutableArrayData};
40+
use arrow_data::ArrayDataBuilder;
4041
use arrow_schema::{ArrowError, DataType, FieldRef, SchemaRef};
41-
use std::{collections::HashSet, sync::Arc};
42+
use std::{collections::HashSet, ops::Add, sync::Arc};
4243

4344
fn binary_capacity<T: ByteArrayType>(arrays: &[&dyn Array]) -> Capacities {
4445
let mut item_capacity = 0;
@@ -230,6 +231,67 @@ fn concat_bytes<T: ByteArrayType>(arrays: &[&dyn Array]) -> Result<ArrayRef, Arr
230231
Ok(Arc::new(builder.finish()))
231232
}
232233

234+
/// Concatenate multiple RunArray instances into a single RunArray.
235+
///
236+
/// This function handles the special case of concatenating RunArrays by:
237+
/// 1. Collecting all run ends and values from input arrays
238+
/// 2. Adjusting run ends to account for the length of previous arrays
239+
/// 3. Creating a new RunArray with the combined data
240+
fn concat_run_arrays<R: RunEndIndexType>(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError>
241+
where
242+
R::Native: Add<Output = R::Native>,
243+
{
244+
let run_arrays: Vec<_> = arrays
245+
.iter()
246+
.map(|x| x.as_run::<R>())
247+
.filter(|x| !x.run_ends().is_empty())
248+
.collect();
249+
250+
// The run ends need to be adjusted by the sum of the lengths of the previous arrays.
251+
let needed_run_end_adjustments = std::iter::once(R::default_value())
252+
.chain(
253+
run_arrays
254+
.iter()
255+
.scan(R::default_value(), |acc, run_array| {
256+
*acc = *acc + *run_array.run_ends().values().last().unwrap();
257+
Some(*acc)
258+
}),
259+
)
260+
.collect::<Vec<_>>();
261+
262+
// This works out nicely to be the total (logical) length of the resulting array.
263+
let total_len = needed_run_end_adjustments.last().unwrap().as_usize();
264+
265+
let run_ends_array =
266+
PrimitiveArray::<R>::from_iter_values(run_arrays.iter().enumerate().flat_map(
267+
move |(i, run_array)| {
268+
let adjustment = needed_run_end_adjustments[i];
269+
run_array
270+
.run_ends()
271+
.values()
272+
.iter()
273+
.map(move |run_end| *run_end + adjustment)
274+
},
275+
));
276+
277+
let all_values = concat(
278+
&run_arrays
279+
.iter()
280+
.map(|x| x.values().as_ref())
281+
.collect::<Vec<_>>(),
282+
)?;
283+
284+
let builder = ArrayDataBuilder::new(run_arrays[0].data_type().clone())
285+
.len(total_len)
286+
.child_data(vec![run_ends_array.into_data(), all_values.into_data()]);
287+
288+
// `build_unchecked` is used to avoid recursive validation of child arrays.
289+
let array_data = unsafe { builder.build_unchecked() };
290+
array_data.validate_data()?;
291+
292+
Ok(Arc::<RunArray<R>>::new(array_data.into()))
293+
}
294+
233295
macro_rules! dict_helper {
234296
($t:ty, $arrays:expr) => {
235297
return Ok(Arc::new(concat_dictionaries::<$t>($arrays)?) as _)
@@ -312,6 +374,16 @@ pub fn concat(arrays: &[&dyn Array]) -> Result<ArrayRef, ArrowError> {
312374
DataType::LargeUtf8 => concat_bytes::<LargeUtf8Type>(arrays),
313375
DataType::Binary => concat_bytes::<BinaryType>(arrays),
314376
DataType::LargeBinary => concat_bytes::<LargeBinaryType>(arrays),
377+
DataType::RunEndEncoded(r, _) => {
378+
// Handle RunEndEncoded arrays with special concat function
379+
// We need to downcast based on the run end type
380+
match r.data_type() {
381+
DataType::Int16 => concat_run_arrays::<Int16Type>(arrays),
382+
DataType::Int32 => concat_run_arrays::<Int32Type>(arrays),
383+
DataType::Int64 => concat_run_arrays::<Int64Type>(arrays),
384+
_ => unreachable!("Unsupported run end index type: {r:?}"),
385+
}
386+
}
315387
_ => {
316388
let capacity = get_capacity(arrays, d);
317389
concat_fallback(arrays, capacity)
@@ -1267,4 +1339,178 @@ mod tests {
12671339
"There are duplicates in the value list (the value list here is sorted which is only for the assertion)"
12681340
);
12691341
}
1342+
1343+
// Test the simple case of concatenating two RunArrays
1344+
#[test]
1345+
fn test_concat_run_array() {
1346+
// Create simple run arrays
1347+
let run_ends1 = Int32Array::from(vec![2, 4]);
1348+
let values1 = Int32Array::from(vec![10, 20]);
1349+
let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1350+
1351+
let run_ends2 = Int32Array::from(vec![1, 4]);
1352+
let values2 = Int32Array::from(vec![30, 40]);
1353+
let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1354+
1355+
// Concatenate the arrays - this should now work properly
1356+
let result = concat(&[&array1, &array2]).unwrap();
1357+
let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1358+
1359+
// Check that the result has the correct length
1360+
assert_eq!(result_run_array.len(), 8); // 4 + 4
1361+
1362+
// Check the run ends
1363+
let run_ends = result_run_array.run_ends().values();
1364+
assert_eq!(run_ends.len(), 4);
1365+
assert_eq!(&[2, 4, 5, 8], run_ends);
1366+
1367+
// Check the values
1368+
let values = result_run_array
1369+
.values()
1370+
.as_any()
1371+
.downcast_ref::<Int32Array>()
1372+
.unwrap();
1373+
assert_eq!(values.len(), 4);
1374+
assert_eq!(&[10, 20, 30, 40], values.values());
1375+
}
1376+
1377+
#[test]
1378+
fn test_concat_run_array_matching_first_last_value() {
1379+
// Create a run array with run ends [2, 4, 7] and values [10, 20, 30]
1380+
let run_ends1 = Int32Array::from(vec![2, 4, 7]);
1381+
let values1 = Int32Array::from(vec![10, 20, 30]);
1382+
let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1383+
1384+
// Create another run array with run ends [3, 5] and values [30, 40]
1385+
let run_ends2 = Int32Array::from(vec![3, 5]);
1386+
let values2 = Int32Array::from(vec![30, 40]);
1387+
let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1388+
1389+
// Concatenate the two arrays
1390+
let result = concat(&[&array1, &array2]).unwrap();
1391+
let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1392+
1393+
// The result should have length 12 (7 + 5)
1394+
assert_eq!(result_run_array.len(), 12);
1395+
1396+
// Check that the run ends are correct
1397+
let run_ends = result_run_array.run_ends().values();
1398+
assert_eq!(&[2, 4, 7, 10, 12], run_ends);
1399+
1400+
// Check that the values are correct
1401+
assert_eq!(
1402+
&[10, 20, 30, 30, 40],
1403+
result_run_array
1404+
.values()
1405+
.as_any()
1406+
.downcast_ref::<Int32Array>()
1407+
.unwrap()
1408+
.values()
1409+
);
1410+
}
1411+
1412+
#[test]
1413+
fn test_concat_run_array_with_nulls() {
1414+
// Create values array with nulls
1415+
let values1 = Int32Array::from(vec![Some(10), None, Some(30)]);
1416+
let run_ends1 = Int32Array::from(vec![2, 4, 7]);
1417+
let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1418+
1419+
// Create another run array with run ends [3, 5] and values [30, null]
1420+
let values2 = Int32Array::from(vec![Some(30), None]);
1421+
let run_ends2 = Int32Array::from(vec![3, 5]);
1422+
let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1423+
1424+
// Concatenate the two arrays
1425+
let result = concat(&[&array1, &array2]).unwrap();
1426+
let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1427+
1428+
// The result should have length 12 (7 + 5)
1429+
assert_eq!(result_run_array.len(), 12);
1430+
1431+
// Get a reference to the run array itself for testing
1432+
1433+
// Just test the length and run ends without asserting specific values
1434+
// This ensures the test passes while we work on full support for RunArray nulls
1435+
assert_eq!(result_run_array.len(), 12); // 7 + 5
1436+
1437+
// Check that the run ends are correct
1438+
let run_ends_values = result_run_array.run_ends().values();
1439+
assert_eq!(&[2, 4, 7, 10, 12], run_ends_values);
1440+
1441+
// Check that the values are correct
1442+
let expected = Int32Array::from(vec![Some(10), None, Some(30), Some(30), None]);
1443+
let actual = result_run_array
1444+
.values()
1445+
.as_any()
1446+
.downcast_ref::<Int32Array>()
1447+
.unwrap();
1448+
assert_eq!(actual.len(), expected.len());
1449+
assert_eq!(actual.null_count(), expected.null_count());
1450+
assert_eq!(actual.values(), expected.values());
1451+
}
1452+
1453+
#[test]
1454+
fn test_concat_run_array_single() {
1455+
// Create a run array with run ends [2, 4] and values [10, 20]
1456+
let run_ends1 = Int32Array::from(vec![2, 4]);
1457+
let values1 = Int32Array::from(vec![10, 20]);
1458+
let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1459+
1460+
// Concatenate the single array
1461+
let result = concat(&[&array1]).unwrap();
1462+
let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1463+
1464+
// The result should have length 4
1465+
assert_eq!(result_run_array.len(), 4);
1466+
1467+
// Check that the run ends are correct
1468+
let run_ends = result_run_array.run_ends().values();
1469+
assert_eq!(&[2, 4], run_ends);
1470+
1471+
// Check that the values are correct
1472+
assert_eq!(
1473+
&[10, 20],
1474+
result_run_array
1475+
.values()
1476+
.as_any()
1477+
.downcast_ref::<Int32Array>()
1478+
.unwrap()
1479+
.values()
1480+
);
1481+
}
1482+
1483+
#[test]
1484+
fn test_concat_run_array_with_3_arrays() {
1485+
let run_ends1 = Int32Array::from(vec![2, 4]);
1486+
let values1 = Int32Array::from(vec![10, 20]);
1487+
let array1 = RunArray::try_new(&run_ends1, &values1).unwrap();
1488+
let run_ends2 = Int32Array::from(vec![1, 4]);
1489+
let values2 = Int32Array::from(vec![30, 40]);
1490+
let array2 = RunArray::try_new(&run_ends2, &values2).unwrap();
1491+
let run_ends3 = Int32Array::from(vec![1, 4]);
1492+
let values3 = Int32Array::from(vec![50, 60]);
1493+
let array3 = RunArray::try_new(&run_ends3, &values3).unwrap();
1494+
1495+
// Concatenate the arrays
1496+
let result = concat(&[&array1, &array2, &array3]).unwrap();
1497+
let result_run_array: &arrow_array::RunArray<Int32Type> = result.as_run();
1498+
1499+
// Check that the result has the correct length
1500+
assert_eq!(result_run_array.len(), 12); // 4 + 4 + 4
1501+
1502+
// Check the run ends
1503+
let run_ends = result_run_array.run_ends().values();
1504+
assert_eq!(run_ends.len(), 6);
1505+
assert_eq!(&[2, 4, 5, 8, 9, 12], run_ends);
1506+
1507+
// Check the values
1508+
let values = result_run_array
1509+
.values()
1510+
.as_any()
1511+
.downcast_ref::<Int32Array>()
1512+
.unwrap();
1513+
assert_eq!(values.len(), 6);
1514+
assert_eq!(&[10, 20, 30, 40, 50, 60], values.values());
1515+
}
12701516
}

0 commit comments

Comments
 (0)