Skip to content

Commit f690fc8

Browse files
committed
Remove dict_id from schema and make it an IPC concern only
1 parent 6f3a8f0 commit f690fc8

File tree

24 files changed

+480
-589
lines changed

24 files changed

+480
-589
lines changed

arrow-flight/src/decode.rs

+14-1
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,7 @@ impl FlightDataDecoder {
282282

283283
self.state = Some(FlightStreamState {
284284
schema: Arc::clone(&schema),
285+
schema_message: data.clone(),
285286
dictionaries_by_field,
286287
});
287288
Ok(Some(DecodedFlightData::new_schema(data, schema)))
@@ -302,10 +303,15 @@ impl FlightDataDecoder {
302303
)
303304
})?;
304305

306+
let ipc_schema = arrow_ipc::root_as_message(&state.schema_message.data_header)
307+
.unwrap()
308+
.header_as_schema()
309+
.unwrap();
310+
305311
arrow_ipc::reader::read_dictionary(
306312
&buffer,
307313
dictionary_batch,
308-
&state.schema,
314+
ipc_schema,
309315
&mut state.dictionaries_by_field,
310316
&message.version(),
311317
)
@@ -325,8 +331,14 @@ impl FlightDataDecoder {
325331
));
326332
};
327333

334+
let ipc_schema = arrow_ipc::root_as_message(&state.schema_message.data_header)
335+
.unwrap()
336+
.header_as_schema()
337+
.unwrap();
338+
328339
let batch = flight_data_to_arrow_batch(
329340
&data,
341+
ipc_schema,
330342
Arc::clone(&state.schema),
331343
&state.dictionaries_by_field,
332344
)
@@ -382,6 +394,7 @@ impl futures::Stream for FlightDataDecoder {
382394
#[derive(Debug)]
383395
struct FlightStreamState {
384396
schema: SchemaRef,
397+
schema_message: FlightData,
385398
dictionaries_by_field: HashMap<i64, ArrayRef>,
386399
}
387400

arrow-flight/src/encode.rs

+6-23
Original file line numberDiff line numberDiff line change
@@ -535,15 +535,11 @@ fn prepare_field_for_flight(
535535
)
536536
.with_metadata(field.metadata().clone())
537537
} else {
538-
#[allow(deprecated)]
539-
let dict_id = dictionary_tracker.set_dict_id(field.as_ref());
540-
541-
#[allow(deprecated)]
538+
dictionary_tracker.next_dict_id();
542539
Field::new_dict(
543540
field.name(),
544541
field.data_type().clone(),
545542
field.is_nullable(),
546-
dict_id,
547543
field.dict_is_ordered().unwrap_or_default(),
548544
)
549545
.with_metadata(field.metadata().clone())
@@ -585,14 +581,11 @@ fn prepare_schema_for_flight(
585581
)
586582
.with_metadata(field.metadata().clone())
587583
} else {
588-
#[allow(deprecated)]
589-
let dict_id = dictionary_tracker.set_dict_id(field.as_ref());
590-
#[allow(deprecated)]
584+
dictionary_tracker.next_dict_id();
591585
Field::new_dict(
592586
field.name(),
593587
field.data_type().clone(),
594588
field.is_nullable(),
595-
dict_id,
596589
field.dict_is_ordered().unwrap_or_default(),
597590
)
598591
.with_metadata(field.metadata().clone())
@@ -654,16 +647,10 @@ struct FlightIpcEncoder {
654647

655648
impl FlightIpcEncoder {
656649
fn new(options: IpcWriteOptions, error_on_replacement: bool) -> Self {
657-
#[allow(deprecated)]
658-
let preserve_dict_id = options.preserve_dict_id();
659650
Self {
660651
options,
661652
data_gen: IpcDataGenerator::default(),
662-
#[allow(deprecated)]
663-
dictionary_tracker: DictionaryTracker::new_with_preserve_dict_id(
664-
error_on_replacement,
665-
preserve_dict_id,
666-
),
653+
dictionary_tracker: DictionaryTracker::new(error_on_replacement),
667654
}
668655
}
669656

@@ -1547,9 +1534,8 @@ mod tests {
15471534
async fn verify_flight_round_trip(mut batches: Vec<RecordBatch>) {
15481535
let expected_schema = batches.first().unwrap().schema();
15491536

1550-
#[allow(deprecated)]
15511537
let encoder = FlightDataEncoderBuilder::default()
1552-
.with_options(IpcWriteOptions::default().with_preserve_dict_id(false))
1538+
.with_options(IpcWriteOptions::default())
15531539
.with_dictionary_handling(DictionaryHandling::Resend)
15541540
.build(futures::stream::iter(batches.clone().into_iter().map(Ok)));
15551541

@@ -1575,8 +1561,7 @@ mod tests {
15751561
HashMap::from([("some_key".to_owned(), "some_value".to_owned())]),
15761562
);
15771563

1578-
#[allow(deprecated)]
1579-
let mut dictionary_tracker = DictionaryTracker::new_with_preserve_dict_id(false, true);
1564+
let mut dictionary_tracker = DictionaryTracker::new(false);
15801565

15811566
let got = prepare_schema_for_flight(&schema, &mut dictionary_tracker, false);
15821567
assert!(got.metadata().contains_key("some_key"));
@@ -1606,9 +1591,7 @@ mod tests {
16061591
options: &IpcWriteOptions,
16071592
) -> (Vec<FlightData>, FlightData) {
16081593
let data_gen = IpcDataGenerator::default();
1609-
#[allow(deprecated)]
1610-
let mut dictionary_tracker =
1611-
DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
1594+
let mut dictionary_tracker = DictionaryTracker::new(false);
16121595

16131596
let (encoded_dictionaries, encoded_batch) = data_gen
16141597
.encoded_batch(batch, &mut dictionary_tracker, options)

arrow-flight/src/lib.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -149,9 +149,7 @@ pub struct IpcMessage(pub Bytes);
149149

150150
fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
151151
let data_gen = writer::IpcDataGenerator::default();
152-
#[allow(deprecated)]
153-
let mut dict_tracker =
154-
writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
152+
let mut dict_tracker = writer::DictionaryTracker::new(false);
155153
data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options)
156154
}
157155

arrow-flight/src/sql/client.rs

+2
Original file line numberDiff line numberDiff line change
@@ -707,6 +707,7 @@ pub enum ArrowFlightData {
707707
pub fn arrow_data_from_flight_data(
708708
flight_data: FlightData,
709709
arrow_schema_ref: &SchemaRef,
710+
ipc_schema: arrow_ipc::Schema,
710711
) -> Result<ArrowFlightData, ArrowError> {
711712
let ipc_message = root_as_message(&flight_data.data_header[..])
712713
.map_err(|err| ArrowError::ParseError(format!("Unable to get root as message: {err:?}")))?;
@@ -723,6 +724,7 @@ pub fn arrow_data_from_flight_data(
723724
let record_batch = read_record_batch(
724725
&Buffer::from(flight_data.data_body),
725726
ipc_record_batch,
727+
ipc_schema,
726728
arrow_schema_ref.clone(),
727729
&dictionaries_by_field,
728730
None,

arrow-flight/src/utils.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ pub fn flight_data_to_batches(flight_data: &[FlightData]) -> Result<Vec<RecordBa
4444
let mut batches = vec![];
4545
let dictionaries_by_id = HashMap::new();
4646
for datum in flight_data[1..].iter() {
47-
let batch = flight_data_to_arrow_batch(datum, schema.clone(), &dictionaries_by_id)?;
47+
let batch =
48+
flight_data_to_arrow_batch(datum, ipc_schema, schema.clone(), &dictionaries_by_id)?;
4849
batches.push(batch);
4950
}
5051
Ok(batches)
@@ -53,6 +54,7 @@ pub fn flight_data_to_batches(flight_data: &[FlightData]) -> Result<Vec<RecordBa
5354
/// Convert `FlightData` (with supplied schema and dictionaries) to an arrow `RecordBatch`.
5455
pub fn flight_data_to_arrow_batch(
5556
data: &FlightData,
57+
ipc_schema: arrow_ipc::Schema,
5658
schema: SchemaRef,
5759
dictionaries_by_id: &HashMap<i64, ArrayRef>,
5860
) -> Result<RecordBatch, ArrowError> {
@@ -71,6 +73,7 @@ pub fn flight_data_to_arrow_batch(
7173
reader::read_record_batch(
7274
&Buffer::from(data.data_body.as_ref()),
7375
batch,
76+
ipc_schema,
7477
schema,
7578
dictionaries_by_id,
7679
None,
@@ -90,9 +93,7 @@ pub fn batches_to_flight_data(
9093
let mut flight_data = vec![];
9194

9295
let data_gen = writer::IpcDataGenerator::default();
93-
#[allow(deprecated)]
94-
let mut dictionary_tracker =
95-
writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
96+
let mut dictionary_tracker = writer::DictionaryTracker::new(false);
9697

9798
for batch in batches.iter() {
9899
let (encoded_dictionaries, encoded_batch) =

arrow-integration-test/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ all-features = true
3838
[dependencies]
3939
arrow = { workspace = true }
4040
arrow-buffer = { workspace = true }
41+
arrow-ipc = { workspace = true }
4142
hex = { version = "0.4", default-features = false, features = ["std"] }
4243
serde = { version = "1.0", default-features = false, features = ["rc", "derive"] }
4344
serde_json = { version = "1.0", default-features = false, features = ["std"] }

arrow-integration-test/src/field.rs

+15-20
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use crate::{data_type_from_json, data_type_to_json};
1919
use arrow::datatypes::{DataType, Field};
2020
use arrow::error::{ArrowError, Result};
21+
use arrow_ipc::writer::DictionaryTracker;
2122
use std::collections::HashMap;
2223
use std::sync::Arc;
2324

@@ -218,7 +219,6 @@ pub fn field_from_json(json: &serde_json::Value) -> Result<Field> {
218219
_ => data_type,
219220
};
220221

221-
let mut dict_id = 0;
222222
let mut dict_is_ordered = false;
223223

224224
let data_type = match map.get("dictionary") {
@@ -231,14 +231,6 @@ pub fn field_from_json(json: &serde_json::Value) -> Result<Field> {
231231
));
232232
}
233233
};
234-
dict_id = match dictionary.get("id") {
235-
Some(Value::Number(n)) => n.as_i64().unwrap(),
236-
_ => {
237-
return Err(ArrowError::ParseError(
238-
"Field missing 'id' attribute".to_string(),
239-
));
240-
}
241-
};
242234
dict_is_ordered = match dictionary.get("isOrdered") {
243235
Some(&Value::Bool(n)) => n,
244236
_ => {
@@ -252,8 +244,7 @@ pub fn field_from_json(json: &serde_json::Value) -> Result<Field> {
252244
_ => data_type,
253245
};
254246

255-
#[allow(deprecated)]
256-
let mut field = Field::new_dict(name, data_type, nullable, dict_id, dict_is_ordered);
247+
let mut field = Field::new_dict(name, data_type, nullable, dict_is_ordered);
257248
field.set_metadata(metadata);
258249
Ok(field)
259250
}
@@ -264,27 +255,28 @@ pub fn field_from_json(json: &serde_json::Value) -> Result<Field> {
264255
}
265256

266257
/// Generate a JSON representation of the `Field`.
267-
pub fn field_to_json(field: &Field) -> serde_json::Value {
258+
pub fn field_to_json(dict_tracker: &mut DictionaryTracker, field: &Field) -> serde_json::Value {
268259
let children: Vec<serde_json::Value> = match field.data_type() {
269-
DataType::Struct(fields) => fields.iter().map(|x| field_to_json(x.as_ref())).collect(),
260+
DataType::Struct(fields) => fields
261+
.iter()
262+
.map(|x| field_to_json(dict_tracker, x.as_ref()))
263+
.collect(),
270264
DataType::List(field)
271265
| DataType::LargeList(field)
272266
| DataType::FixedSizeList(field, _)
273-
| DataType::Map(field, _) => vec![field_to_json(field)],
267+
| DataType::Map(field, _) => vec![field_to_json(dict_tracker, field)],
274268
_ => vec![],
275269
};
276270

277271
match field.data_type() {
278272
DataType::Dictionary(ref index_type, ref value_type) => {
279-
#[allow(deprecated)]
280-
let dict_id = field.dict_id().unwrap();
281273
serde_json::json!({
282274
"name": field.name(),
283275
"nullable": field.is_nullable(),
284276
"type": data_type_to_json(value_type),
285277
"children": children,
286278
"dictionary": {
287-
"id": dict_id,
279+
"id": dict_tracker.next_dict_id(),
288280
"indexType": data_type_to_json(index_type),
289281
"isOrdered": field.dict_is_ordered().unwrap(),
290282
}
@@ -345,7 +337,8 @@ mod tests {
345337
}"#,
346338
)
347339
.unwrap();
348-
assert_eq!(value, field_to_json(&f));
340+
let mut dictionary_tracker = DictionaryTracker::new(false);
341+
assert_eq!(value, field_to_json(&mut dictionary_tracker, &f));
349342
}
350343

351344
#[test]
@@ -398,7 +391,8 @@ mod tests {
398391
}"#,
399392
)
400393
.unwrap();
401-
assert_eq!(value, field_to_json(&f));
394+
let mut dictionary_tracker = DictionaryTracker::new(false);
395+
assert_eq!(value, field_to_json(&mut dictionary_tracker, &f));
402396
}
403397

404398
#[test]
@@ -415,7 +409,8 @@ mod tests {
415409
}"#,
416410
)
417411
.unwrap();
418-
assert_eq!(value, field_to_json(&f));
412+
let mut dictionary_tracker = DictionaryTracker::new(false);
413+
assert_eq!(value, field_to_json(&mut dictionary_tracker, &f));
419414
}
420415
#[test]
421416
fn parse_struct_from_json() {

0 commit comments

Comments
 (0)