Skip to content

GH-46222: [Python] Allow to specify footer metadata when opening IPC file for writing #46354

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -1976,13 +1976,15 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:

CIpcReadStats stats()

shared_ptr[const CKeyValueMetadata] metadata()

CResult[shared_ptr[CRecordBatchWriter]] MakeStreamWriter(
shared_ptr[COutputStream] sink, const shared_ptr[CSchema]& schema,
CIpcWriteOptions& options)

CResult[shared_ptr[CRecordBatchWriter]] MakeFileWriter(
shared_ptr[COutputStream] sink, const shared_ptr[CSchema]& schema,
CIpcWriteOptions& options)
CIpcWriteOptions& options, shared_ptr[const CKeyValueMetadata] metadata)

CResult[unique_ptr[CMessage]] ReadMessage(CInputStream* stream,
CMemoryPool* pool)
Expand Down
19 changes: 17 additions & 2 deletions python/pyarrow/ipc.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1020,15 +1020,21 @@ cdef class _RecordBatchStreamReader(RecordBatchReader):
cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter):

def _open(self, sink, Schema schema not None,
IpcWriteOptions options=IpcWriteOptions()):
IpcWriteOptions options=IpcWriteOptions(),
metadata=None):
cdef:
shared_ptr[COutputStream] c_sink
shared_ptr[const CKeyValueMetadata] c_meta

self.options = options.c_options
get_writer(sink, &c_sink)

metadata = ensure_metadata(metadata, allow_none=True)
c_meta = pyarrow_unwrap_metadata(metadata)

with nogil:
self.writer = GetResultValue(
MakeFileWriter(c_sink, schema.sp_schema, self.options))
MakeFileWriter(c_sink, schema.sp_schema, self.options, c_meta))

_RecordBatchWithMetadata = namedtuple(
'RecordBatchWithMetadata',
Expand Down Expand Up @@ -1192,6 +1198,15 @@ cdef class _RecordBatchFileReader(_Weakrefable):
raise ValueError("Operation on closed reader")
return _wrap_read_stats(self.reader.get().stats())

@property
def metadata(self):
"""
File-level custom metadata as dict, where both keys and values are byte-like.
This kind of metadata can be written via ``ipc.new_file(..., metadata=...)``.
"""
wrapped = pyarrow_wrap_metadata(self.reader.get().metadata())
return wrapped.to_dict() if wrapped is not None else None


def get_tensor_size(Tensor tensor):
"""
Expand Down
25 changes: 18 additions & 7 deletions python/pyarrow/ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ def __init__(self, source, *, options=None, memory_pool=None):
ARROW_PRE_1_0_METADATA_VERSION=1."""


_ipc_file_writer_class_doc = (
_ipc_writer_class_doc
+ "\n"
+ """\
metadata : dict | pyarrow.KeyValueMetadata, optional
Key/value pairs (both must be bytes-like) that will be stored
in the file footer and are retrievable via
pyarrow.ipc.open_file(...).metadata."""
)


class RecordBatchStreamWriter(lib._RecordBatchStreamWriter):
__doc__ = """Writer for the Arrow streaming binary format

Expand Down Expand Up @@ -109,11 +120,12 @@ class RecordBatchFileWriter(lib._RecordBatchFileWriter):

__doc__ = """Writer to create the Arrow binary file format

{}""".format(_ipc_writer_class_doc)
{}
""".format(_ipc_file_writer_class_doc)

def __init__(self, sink, schema, *, options=None):
def __init__(self, sink, schema, *, options=None, metadata=None):
options = _get_legacy_format_default(options)
self._open(sink, schema, options=options)
self._open(sink, schema, options=options, metadata=metadata)


def _get_legacy_format_default(options):
Expand Down Expand Up @@ -180,9 +192,8 @@ def open_stream(source, *, options=None, memory_pool=None):
memory_pool=memory_pool)


def new_file(sink, schema, *, options=None):
return RecordBatchFileWriter(sink, schema,
options=options)
def new_file(sink, schema, *, options=None, metadata=None):
return RecordBatchFileWriter(sink, schema, options=options, metadata=metadata)


new_file.__doc__ = """\
Expand All @@ -194,7 +205,7 @@ def new_file(sink, schema, *, options=None):
-------
writer : RecordBatchFileWriter
A writer for the given sink
""".format(_ipc_writer_class_doc)
""".format(_ipc_file_writer_class_doc)


def open_file(source, footer_offset=None, *, options=None, memory_pool=None):
Expand Down
27 changes: 27 additions & 0 deletions python/pyarrow/tests/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1304,3 +1304,30 @@ def test_record_batch_reader_cast_nulls():
casted_reader = reader.cast(schema_dst)
with pytest.raises(pa.lib.ArrowInvalid, match="Can't cast array"):
casted_reader.read_all()


def test_record_batch_file_writer_with_metadata():
# https://github.com/apache/arrow/issues/46222
tbl = pa.table({"a": [1, 2, 3]})
meta = {b"creator": b"test", b"version": b"0.1.0"}
sink = pa.BufferOutputStream()

with pa.ipc.new_file(sink, tbl.schema, metadata=meta) as w:
w.write_table(tbl)

buffer = sink.getvalue()
with pa.ipc.open_file(buffer) as r:
assert r.metadata == meta


def test_record_batch_file_writer_with_empty_metadata():
# https://github.com/apache/arrow/issues/46222
tbl = pa.table({"a": [1, 2, 3]})
sink = pa.BufferOutputStream()

with pa.ipc.new_file(sink, tbl.schema) as w:
w.write_table(tbl)

buffer = sink.getvalue()
with pa.ipc.open_file(buffer) as r:
assert r.metadata is None
Loading