Skip to content

Commit af34416

Browse files
committed
GH-46222: [Python] Allow to specify footer when opening IPC file for writing (#46222)
1 parent 03bdb2a commit af34416

File tree

4 files changed

+42
-9
lines changed

4 files changed

+42
-9
lines changed

python/pyarrow/includes/libarrow.pxd

+3-1
Original file line numberDiff line numberDiff line change
@@ -1976,13 +1976,15 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
19761976

19771977
CIpcReadStats stats()
19781978

1979+
shared_ptr[const CKeyValueMetadata] metadata()
1980+
19791981
CResult[shared_ptr[CRecordBatchWriter]] MakeStreamWriter(
19801982
shared_ptr[COutputStream] sink, const shared_ptr[CSchema]& schema,
19811983
CIpcWriteOptions& options)
19821984

19831985
CResult[shared_ptr[CRecordBatchWriter]] MakeFileWriter(
19841986
shared_ptr[COutputStream] sink, const shared_ptr[CSchema]& schema,
1985-
CIpcWriteOptions& options)
1987+
CIpcWriteOptions& options, shared_ptr[const CKeyValueMetadata] metadata)
19861988

19871989
CResult[unique_ptr[CMessage]] ReadMessage(CInputStream* stream,
19881990
CMemoryPool* pool)

python/pyarrow/ipc.pxi

+15-2
Original file line numberDiff line numberDiff line change
@@ -1020,15 +1020,21 @@ cdef class _RecordBatchStreamReader(RecordBatchReader):
10201020
cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):
10211021

10221022
def _open(self, sink, Schema schema not None,
1023-
IpcWriteOptions options=IpcWriteOptions()):
1023+
IpcWriteOptions options=IpcWriteOptions(),
1024+
metadata=None):
10241025
cdef:
10251026
shared_ptr[COutputStream] c_sink
1027+
shared_ptr[const CKeyValueMetadata] c_meta
10261028

10271029
self.options = options.c_options
10281030
get_writer(sink, &c_sink)
1031+
1032+
metadata = ensure_metadata(metadata, allow_none=True)
1033+
c_meta = pyarrow_unwrap_metadata(metadata)
1034+
10291035
with nogil:
10301036
self.writer = GetResultValue(
1031-
MakeFileWriter(c_sink, schema.sp_schema, self.options))
1037+
MakeFileWriter(c_sink, schema.sp_schema, self.options, c_meta))
10321038

10331039
_RecordBatchWithMetadata = namedtuple(
10341040
'RecordBatchWithMetadata',
@@ -1192,6 +1198,13 @@ cdef class _RecordBatchFileReader(_Weakrefable):
11921198
raise ValueError("Operation on closed reader")
11931199
return _wrap_read_stats(self.reader.get().stats())
11941200

1201+
@property
1202+
def metadata(self):
1203+
"""
1204+
File-level custom KeyValueMetadata written via ``ipc.new_file(..., metadata=)``.
1205+
"""
1206+
return pyarrow_wrap_metadata(self.reader.get().metadata())
1207+
11951208

11961209
def get_tensor_size(Tensor tensor):
11971210
"""

python/pyarrow/ipc.py

+10-6
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,16 @@ class RecordBatchFileWriter(lib._RecordBatchFileWriter):
109109

110110
__doc__ = """Writer to create the Arrow binary file format
111111
112-
{}""".format(_ipc_writer_class_doc)
112+
{}
113+
metadata : dict | pyarrow.KeyValueMetadata, optional
114+
Key/value pairs (both **must** be bytes-like) that will be stored
115+
in the file footer and are retrievable with
116+
``ipc.open_file(...).metadata``.
117+
""".format(_ipc_writer_class_doc)
113118

114-
def __init__(self, sink, schema, *, options=None):
119+
def __init__(self, sink, schema, *, options=None, metadata=None):
115120
options = _get_legacy_format_default(options)
116-
self._open(sink, schema, options=options)
121+
self._open(sink, schema, options=options, metadata=metadata)
117122

118123

119124
def _get_legacy_format_default(options):
@@ -180,9 +185,8 @@ def open_stream(source, *, options=None, memory_pool=None):
180185
memory_pool=memory_pool)
181186

182187

183-
def new_file(sink, schema, *, options=None):
184-
return RecordBatchFileWriter(sink, schema,
185-
options=options)
188+
def new_file(sink, schema, *, metadata=None, options=None):
189+
return RecordBatchFileWriter(sink, schema, options=options, metadata=metadata)
186190

187191

188192
new_file.__doc__ = """\

python/pyarrow/tests/test_ipc.py

+14
Original file line numberDiff line numberDiff line change
@@ -1304,3 +1304,17 @@ def test_record_batch_reader_cast_nulls():
13041304
casted_reader = reader.cast(schema_dst)
13051305
with pytest.raises(pa.lib.ArrowInvalid, match="Can't cast array"):
13061306
casted_reader.read_all()
1307+
1308+
1309+
def test_record_batch_file_writer_with_metadata():
1310+
# https://github.com/apache/arrow/issues/46222
1311+
tbl = pa.table({"a": [1, 2, 3]})
1312+
meta = {b"creator": b"test", b"version": b"0.1.0"}
1313+
sink = pa.BufferOutputStream()
1314+
1315+
with pa.ipc.new_file(sink, tbl.schema, metadata=meta) as w:
1316+
w.write_table(tbl)
1317+
1318+
buffer = sink.getvalue()
1319+
with pa.ipc.open_file(buffer) as r:
1320+
assert dict(r.metadata) == meta

0 commit comments

Comments
 (0)