Skip to content

Commit 6a3ecef

Browse files
authored
Speedup filter_bytes ~-20-40%, filter_native low selectivity (~-37%) (#7463)
* Speedup filter_bytes * Speedup filter_bytes * Cleanup * Cleanup * WIP * Move allocation
1 parent 2d8e084 commit 6a3ecef

File tree

1 file changed

+42
-20
lines changed

1 file changed

+42
-20
lines changed

arrow-select/src/filter.rs

Lines changed: 42 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -554,36 +554,33 @@ fn filter_boolean(array: &BooleanArray, predicate: &FilterPredicate) -> BooleanA
554554
fn filter_native<T: ArrowNativeType>(values: &[T], predicate: &FilterPredicate) -> Buffer {
555555
assert!(values.len() >= predicate.filter.len());
556556

557-
let buffer = match &predicate.strategy {
557+
match &predicate.strategy {
558558
IterationStrategy::SlicesIterator => {
559-
let mut buffer = MutableBuffer::with_capacity(predicate.count * T::get_byte_width());
559+
let mut buffer = Vec::with_capacity(predicate.count);
560560
for (start, end) in SlicesIterator::new(&predicate.filter) {
561561
buffer.extend_from_slice(&values[start..end]);
562562
}
563-
buffer
563+
buffer.into()
564564
}
565565
IterationStrategy::Slices(slices) => {
566-
let mut buffer = MutableBuffer::with_capacity(predicate.count * T::get_byte_width());
566+
let mut buffer = Vec::with_capacity(predicate.count);
567567
for (start, end) in slices {
568568
buffer.extend_from_slice(&values[*start..*end]);
569569
}
570-
buffer
570+
buffer.into()
571571
}
572572
IterationStrategy::IndexIterator => {
573573
let iter = IndexIterator::new(&predicate.filter, predicate.count).map(|x| values[x]);
574574

575575
// SAFETY: IndexIterator is trusted length
576-
unsafe { MutableBuffer::from_trusted_len_iter(iter) }
576+
unsafe { MutableBuffer::from_trusted_len_iter(iter) }.into()
577577
}
578578
IterationStrategy::Indices(indices) => {
579579
let iter = indices.iter().map(|x| values[*x]);
580-
// SAFETY: `Vec::iter` is trusted length
581-
unsafe { MutableBuffer::from_trusted_len_iter(iter) }
580+
iter.collect::<Vec<_>>().into()
582581
}
583582
IterationStrategy::All | IterationStrategy::None => unreachable!(),
584-
};
585-
586-
buffer.into()
583+
}
587584
}
588585

589586
/// `filter` implementation for primitive arrays
@@ -656,29 +653,46 @@ where
656653
(start, end, len)
657654
}
658655

659-
/// Extends the in-progress array by the indexes in the provided iterator
660-
fn extend_idx(&mut self, iter: impl Iterator<Item = usize>) {
656+
fn extend_offsets_idx(&mut self, iter: impl Iterator<Item = usize>) {
661657
self.dst_offsets.extend(iter.map(|idx| {
662658
let start = self.src_offsets[idx].as_usize();
663659
let end = self.src_offsets[idx + 1].as_usize();
664660
let len = OffsetSize::from_usize(end - start).expect("illegal offset range");
665661
self.cur_offset += len;
666-
self.dst_values
667-
.extend_from_slice(&self.src_values[start..end]);
662+
668663
self.cur_offset
669664
}));
670665
}
671666

672-
/// Extends the in-progress array by the ranges in the provided iterator
673-
fn extend_slices(&mut self, iter: impl Iterator<Item = (usize, usize)>) {
667+
/// Extends the in-progress array by the indexes in the provided iterator
668+
fn extend_idx(&mut self, iter: impl Iterator<Item = usize>) {
669+
self.dst_values.reserve_exact(self.cur_offset.as_usize());
670+
671+
for idx in iter {
672+
let start = self.src_offsets[idx].as_usize();
673+
let end = self.src_offsets[idx + 1].as_usize();
674+
self.dst_values
675+
.extend_from_slice(&self.src_values[start..end]);
676+
}
677+
}
678+
679+
fn extend_offsets_slices(&mut self, iter: impl Iterator<Item = (usize, usize)>, count: usize) {
680+
self.dst_offsets.reserve_exact(count);
674681
for (start, end) in iter {
675682
// These can only fail if `array` contains invalid data
676683
for idx in start..end {
677684
let (_, _, len) = self.get_value_range(idx);
678685
self.cur_offset += len;
679-
self.dst_offsets.push(self.cur_offset); // push_unchecked?
686+
self.dst_offsets.push(self.cur_offset);
680687
}
688+
}
689+
}
681690

691+
/// Extends the in-progress array by the ranges in the provided iterator
692+
fn extend_slices(&mut self, iter: impl Iterator<Item = (usize, usize)>) {
693+
self.dst_values.reserve_exact(self.cur_offset.as_usize());
694+
695+
for (start, end) in iter {
682696
let value_start = self.get_value_offset(start);
683697
let value_end = self.get_value_offset(end);
684698
self.dst_values
@@ -699,13 +713,21 @@ where
699713

700714
match &predicate.strategy {
701715
IterationStrategy::SlicesIterator => {
716+
filter.extend_offsets_slices(SlicesIterator::new(&predicate.filter), predicate.count);
702717
filter.extend_slices(SlicesIterator::new(&predicate.filter))
703718
}
704-
IterationStrategy::Slices(slices) => filter.extend_slices(slices.iter().cloned()),
719+
IterationStrategy::Slices(slices) => {
720+
filter.extend_offsets_slices(slices.iter().cloned(), predicate.count);
721+
filter.extend_slices(slices.iter().cloned())
722+
}
705723
IterationStrategy::IndexIterator => {
724+
filter.extend_offsets_idx(IndexIterator::new(&predicate.filter, predicate.count));
706725
filter.extend_idx(IndexIterator::new(&predicate.filter, predicate.count))
707726
}
708-
IterationStrategy::Indices(indices) => filter.extend_idx(indices.iter().cloned()),
727+
IterationStrategy::Indices(indices) => {
728+
filter.extend_offsets_idx(indices.iter().cloned());
729+
filter.extend_idx(indices.iter().cloned())
730+
}
709731
IterationStrategy::All | IterationStrategy::None => unreachable!(),
710732
}
711733

0 commit comments

Comments
 (0)