Skip to content

Commit d09b8fc

Browse files
committed
perf: use byteorder on DataValue::to_raw & DataValue::from_raw
1 parent 9f8a58f commit d09b8fc

File tree

12 files changed

+458
-483
lines changed

12 files changed

+458
-483
lines changed

Cargo.toml

+1-4
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,10 @@ harness = false
3535
[dependencies]
3636
ahash = { version = "0.8" }
3737
bincode = { version = "1" }
38-
bytes = { version = "1" }
38+
byteorder = { version = "1" }
3939
chrono = { version = "0.4" }
4040
comfy-table = { version = "7" }
4141
csv = { version = "1" }
42-
encode_unicode = { version = "1" }
4342
dirs = { version = "5" }
4443
itertools = { version = "0.12" }
4544
ordered-float = { version = "4" }
@@ -68,10 +67,8 @@ tokio = { version = "1.36", features = ["full"], optional = true
6867

6968

7069
[dev-dependencies]
71-
cargo-tarpaulin = { version = "0.27" }
7270
criterion = { version = "0.5", features = ["html_reports"] }
7371
indicatif = { version = "0.17" }
74-
rand_distr = { version = "0.4" }
7572
tempfile = { version = "3.10" }
7673
# Benchmark
7774
sqlite = { version = "0.34" }

README.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,13 @@ run `cargo run -p tpcc --release` to run tpcc
7373
- Tips: TPC-C currently only supports single thread
7474
```shell
7575
<90th Percentile RT (MaxRT)>
76-
New-Order : 0.003 (0.012)
77-
Payment : 0.001 (0.003)
78-
Order-Status : 0.054 (0.188)
79-
Delivery : 0.021 (0.049)
80-
Stock-Level : 0.004 (0.006)
76+
New-Order : 0.003 (0.031)
77+
Payment : 0.001 (0.028)
78+
Order-Status : 0.053 (0.144)
79+
Delivery : 0.020 (0.032)
80+
Stock-Level : 0.004 (0.007)
8181
<TpmC>
82-
7345 Tpmc
82+
7457 Tpmc
8383
```
8484
#### 👉[check more](tpcc/README.md)
8585

src/errors.rs

+7-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::types::LogicalType;
44
use chrono::ParseError;
55
use sqlparser::parser::ParserError;
66
use std::num::{ParseFloatError, ParseIntError, TryFromIntError};
7-
use std::str::ParseBoolError;
7+
use std::str::{ParseBoolError, Utf8Error};
88
use std::string::FromUtf8Error;
99

1010
#[derive(thiserror::Error, Debug)]
@@ -161,6 +161,12 @@ pub enum DatabaseError {
161161
UnsupportedBinaryOperator(LogicalType, BinaryOperator),
162162
#[error("unsupported statement: {0}")]
163163
UnsupportedStmt(String),
164+
#[error("utf8: {0}")]
165+
Utf8(
166+
#[source]
167+
#[from]
168+
Utf8Error,
169+
),
164170
#[error("values length not match, expect {0}, got {1}")]
165171
ValuesLenMismatch(usize, usize),
166172
#[error("the view already exists")]

src/serdes/char.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use crate::errors::DatabaseError;
22
use crate::serdes::{ReferenceSerialization, ReferenceTables};
33
use crate::storage::{TableCache, Transaction};
4-
use encode_unicode::CharExt;
54
use std::io::{Read, Write};
65

76
impl ReferenceSerialization for char {
@@ -11,20 +10,21 @@ impl ReferenceSerialization for char {
1110
_: bool,
1211
_: &mut ReferenceTables,
1312
) -> Result<(), DatabaseError> {
14-
let (bytes, _) = self.to_utf8_array();
13+
let mut buf = [0u8; 2];
14+
self.encode_utf8(&mut buf);
1515

16-
Ok(writer.write_all(&bytes)?)
16+
Ok(writer.write_all(&buf)?)
1717
}
1818

1919
fn decode<T: Transaction, R: Read>(
2020
reader: &mut R,
2121
_: Option<(&T, &TableCache)>,
2222
_: &ReferenceTables,
2323
) -> Result<Self, DatabaseError> {
24-
let mut buf = [0u8; 4];
24+
let mut buf = [0u8; 2];
2525
reader.read_exact(&mut buf)?;
2626

2727
// SAFETY
28-
Ok(char::from_utf8_array(buf).unwrap())
28+
Ok(std::str::from_utf8(&buf)?.chars().next().unwrap())
2929
}
3030
}

src/serdes/data_value.rs

+2-113
Original file line numberDiff line numberDiff line change
@@ -1,118 +1,7 @@
1-
use crate::errors::DatabaseError;
2-
use crate::serdes::{ReferenceSerialization, ReferenceTables};
3-
use crate::storage::{TableCache, Transaction};
1+
use crate::implement_serialization_by_bincode;
42
use crate::types::value::DataValue;
5-
use crate::types::LogicalType;
6-
use std::io::{Read, Write};
73

8-
impl DataValue {
9-
// FIXME: redundant code
10-
pub(crate) fn inner_encode<W: Write>(
11-
&self,
12-
writer: &mut W,
13-
ty: &LogicalType,
14-
) -> Result<(), DatabaseError> {
15-
writer.write_all(&[if self.is_null() { 0u8 } else { 1u8 }])?;
16-
17-
if self.is_null() {
18-
return Ok(());
19-
}
20-
if let DataValue::Tuple(values) = self {
21-
match values {
22-
None => writer.write_all(&[0u8])?,
23-
Some((values, is_upper)) => {
24-
writer.write_all(&[1u8])?;
25-
writer.write_all(&(values.len() as u32).to_le_bytes())?;
26-
for value in values.iter() {
27-
value.inner_encode(writer, &value.logical_type())?
28-
}
29-
writer.write_all(&[if *is_upper { 1u8 } else { 0u8 }])?;
30-
}
31-
}
32-
33-
return Ok(());
34-
}
35-
if ty.raw_len().is_none() {
36-
let mut bytes = Vec::new();
37-
writer.write_all(&(self.to_raw(&mut bytes)? as u32).to_le_bytes())?;
38-
writer.write_all(&bytes)?;
39-
} else {
40-
let _ = self.to_raw(writer)?;
41-
}
42-
43-
Ok(())
44-
}
45-
46-
pub(crate) fn inner_decode<R: Read>(
47-
reader: &mut R,
48-
ty: &LogicalType,
49-
) -> Result<Self, DatabaseError> {
50-
let mut bytes = [0u8; 1];
51-
reader.read_exact(&mut bytes)?;
52-
if bytes[0] == 0 {
53-
return Ok(DataValue::none(ty));
54-
}
55-
if let LogicalType::Tuple(types) = ty {
56-
let mut bytes = [0u8; 1];
57-
reader.read_exact(&mut bytes)?;
58-
let values = match bytes[0] {
59-
0 => None,
60-
1 => {
61-
let mut bytes = [0u8; 4];
62-
reader.read_exact(&mut bytes)?;
63-
let len = u32::from_le_bytes(bytes) as usize;
64-
let mut vec = Vec::with_capacity(len);
65-
66-
for ty in types.iter() {
67-
vec.push(Self::inner_decode(reader, ty)?);
68-
}
69-
let mut bytes = [0u8];
70-
reader.read_exact(&mut bytes)?;
71-
Some((vec, bytes[0] == 1))
72-
}
73-
_ => unreachable!(),
74-
};
75-
76-
return Ok(DataValue::Tuple(values));
77-
}
78-
let value_len = match ty.raw_len() {
79-
None => {
80-
let mut bytes = [0u8; 4];
81-
reader.read_exact(&mut bytes)?;
82-
u32::from_le_bytes(bytes) as usize
83-
}
84-
Some(len) => len,
85-
};
86-
let mut buf = vec![0u8; value_len];
87-
reader.read_exact(&mut buf)?;
88-
89-
Ok(DataValue::from_raw(&buf, ty))
90-
}
91-
}
92-
93-
impl ReferenceSerialization for DataValue {
94-
fn encode<W: Write>(
95-
&self,
96-
writer: &mut W,
97-
is_direct: bool,
98-
reference_tables: &mut ReferenceTables,
99-
) -> Result<(), DatabaseError> {
100-
let ty = self.logical_type();
101-
ty.encode(writer, is_direct, reference_tables)?;
102-
103-
self.inner_encode(writer, &ty)
104-
}
105-
106-
fn decode<T: Transaction, R: Read>(
107-
reader: &mut R,
108-
drive: Option<(&T, &TableCache)>,
109-
reference_tables: &ReferenceTables,
110-
) -> Result<Self, DatabaseError> {
111-
let logical_type = LogicalType::decode(reader, drive, reference_tables)?;
112-
113-
Self::inner_decode(reader, &logical_type)
114-
}
115-
}
4+
implement_serialization_by_bincode!(DataValue);
1165

1176
#[cfg(test)]
1187
pub(crate) mod test {

src/serdes/mod.rs

+6-23
Original file line numberDiff line numberDiff line change
@@ -33,37 +33,20 @@ macro_rules! implement_serialization_by_bincode {
3333
fn encode<W: std::io::Write>(
3434
&self,
3535
writer: &mut W,
36-
is_direct: bool,
37-
reference_tables: &mut $crate::serdes::ReferenceTables,
36+
_: bool,
37+
_: &mut $crate::serdes::ReferenceTables,
3838
) -> Result<(), $crate::errors::DatabaseError> {
39-
let bytes = bincode::serialize(self)?;
40-
$crate::serdes::ReferenceSerialization::encode(
41-
&bytes.len(),
42-
writer,
43-
is_direct,
44-
reference_tables,
45-
)?;
46-
std::io::Write::write_all(writer, &bytes)?;
39+
bincode::serialize_into(writer, self)?;
4740

4841
Ok(())
4942
}
5043

5144
fn decode<T: $crate::storage::Transaction, R: std::io::Read>(
5245
reader: &mut R,
53-
drive: Option<(&T, &$crate::storage::TableCache)>,
54-
reference_tables: &$crate::serdes::ReferenceTables,
46+
_: Option<(&T, &$crate::storage::TableCache)>,
47+
_: &$crate::serdes::ReferenceTables,
5548
) -> Result<Self, $crate::errors::DatabaseError> {
56-
let mut buf = vec![
57-
0u8;
58-
<usize as $crate::serdes::ReferenceSerialization>::decode(
59-
reader,
60-
drive,
61-
reference_tables
62-
)?
63-
];
64-
std::io::Read::read_exact(reader, &mut buf)?;
65-
66-
Ok(bincode::deserialize::<Self>(&buf)?)
49+
Ok(bincode::deserialize_from(reader)?)
6750
}
6851
}
6952
};

src/storage/mod.rs

+24-20
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,12 @@ use crate::errors::DatabaseError;
99
use crate::expression::range_detacher::Range;
1010
use crate::optimizer::core::statistics_meta::{StatisticMetaLoader, StatisticsMeta};
1111
use crate::serdes::ReferenceTables;
12-
use crate::storage::table_codec::TableCodec;
12+
use crate::storage::table_codec::{Bytes, TableCodec};
1313
use crate::types::index::{Index, IndexId, IndexMetaRef, IndexType};
1414
use crate::types::tuple::{Tuple, TupleId};
1515
use crate::types::value::DataValue;
1616
use crate::types::{ColumnId, LogicalType};
1717
use crate::utils::lru::SharedLruCache;
18-
use bytes::Bytes;
1918
use itertools::Itertools;
2019
use std::collections::Bound;
2120
use std::io::Cursor;
@@ -64,8 +63,9 @@ pub trait Transaction: Sized {
6463
let pk_indices = table.primary_keys_indices();
6564
let table_types = table.types();
6665
if columns.is_empty() {
67-
let (i, column) = &table.primary_keys()[0];
68-
columns.push((*i, column.clone()));
66+
for (i, column) in table.primary_keys() {
67+
columns.push((*i, column.clone()));
68+
}
6969
}
7070
let mut tuple_columns = Vec::with_capacity(columns.len());
7171
let mut projections = Vec::with_capacity(columns.len());
@@ -535,7 +535,7 @@ pub trait Transaction: Sized {
535535
// Tips: only `Column`, `IndexMeta`, `TableMeta`
536536
while let Some((key, value)) = column_iter.try_next().ok().flatten() {
537537
if key.starts_with(&table_min) {
538-
let mut cursor = Cursor::new(value.as_ref());
538+
let mut cursor = Cursor::new(value);
539539
columns.push(TableCodec::decode_column::<Self, _>(
540540
&mut cursor,
541541
&reference_tables,
@@ -693,15 +693,18 @@ impl<T: Transaction> IndexImplParams<'_, T> {
693693
) -> Result<Option<Tuple>, DatabaseError> {
694694
let key = TableCodec::encode_tuple_key(self.table_name, tuple_id)?;
695695

696-
Ok(self.tx.get(&key)?.map(|bytes| {
697-
TableCodec::decode_tuple(
698-
&self.table_types,
699-
pk_indices,
700-
&self.projections,
701-
&self.tuple_schema_ref,
702-
&bytes,
703-
)
704-
}))
696+
self.tx
697+
.get(&key)?
698+
.map(|bytes| {
699+
TableCodec::decode_tuple(
700+
&self.table_types,
701+
pk_indices,
702+
&self.projections,
703+
&self.tuple_schema_ref,
704+
&bytes,
705+
)
706+
})
707+
.transpose()
705708
}
706709
}
707710

@@ -760,13 +763,13 @@ impl<T: Transaction> IndexImpl<T> for PrimaryKeyIndexImpl {
760763
pk_indices: &PrimaryKeyIndices,
761764
params: &IndexImplParams<T>,
762765
) -> Result<Tuple, DatabaseError> {
763-
Ok(TableCodec::decode_tuple(
766+
TableCodec::decode_tuple(
764767
&params.table_types,
765768
pk_indices,
766769
&params.projections,
767770
&params.tuple_schema_ref,
768771
bytes,
769-
))
772+
)
770773
}
771774

772775
fn eq_to_res<'a>(
@@ -786,7 +789,8 @@ impl<T: Transaction> IndexImpl<T> for PrimaryKeyIndexImpl {
786789
&params.tuple_schema_ref,
787790
&bytes,
788791
)
789-
});
792+
})
793+
.transpose()?;
790794
Ok(IndexResult::Tuple(tuple))
791795
}
792796

@@ -804,7 +808,7 @@ fn secondary_index_lookup<T: Transaction>(
804808
pk_indices: &PrimaryKeyIndices,
805809
params: &IndexImplParams<T>,
806810
) -> Result<Tuple, DatabaseError> {
807-
let tuple_id = TableCodec::decode_index(bytes, &params.index_meta.pk_ty)?;
811+
let tuple_id = TableCodec::decode_index(bytes)?;
808812
params
809813
.get_tuple_by_id(pk_indices, &tuple_id)?
810814
.ok_or(DatabaseError::TupleIdNotFound(tuple_id))
@@ -829,7 +833,7 @@ impl<T: Transaction> IndexImpl<T> for UniqueIndexImpl {
829833
let Some(bytes) = params.tx.get(&self.bound_key(params, value)?)? else {
830834
return Ok(IndexResult::Tuple(None));
831835
};
832-
let tuple_id = TableCodec::decode_index(&bytes, &params.index_meta.pk_ty)?;
836+
let tuple_id = TableCodec::decode_index(&bytes)?;
833837
let tuple = params
834838
.get_tuple_by_id(pk_indices, &tuple_id)?
835839
.ok_or(DatabaseError::TupleIdNotFound(tuple_id))?;
@@ -950,7 +954,7 @@ impl<'a, T: Transaction + 'a> Iter for TupleIter<'a, T> {
950954
&self.projections,
951955
&self.tuple_columns,
952956
&value,
953-
);
957+
)?;
954958

955959
if let Some(num) = self.limit.as_mut() {
956960
num.sub_assign(1);

0 commit comments

Comments
 (0)