Skip to content

Commit 80e1e53

Browse files
fix: Fix RuntimeError when serializing the same DataFrame from multiple threads (#22844)
1 parent b71e079 commit 80e1e53

File tree

2 files changed

+21
-2
lines changed

2 files changed

+21
-2
lines changed

crates/polars-python/src/dataframe/serde.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,18 +6,28 @@ use polars_io::mmap::ReaderBytes;
66
use pyo3::prelude::*;
77

88
use super::PyDataFrame;
9+
use crate::error::PyPolarsErr;
910
use crate::exceptions::ComputeError;
1011
use crate::file::{get_file_like, get_mmap_bytes_reader};
1112
use crate::utils::EnterPolarsExt;
1213

1314
#[pymethods]
1415
impl PyDataFrame {
1516
/// Serialize into binary data.
16-
fn serialize_binary(&mut self, py: Python<'_>, py_f: PyObject) -> PyResult<()> {
17+
fn serialize_binary(slf: Bound<'_, Self>, py_f: PyObject) -> PyResult<()> {
1718
let file = get_file_like(py_f, true)?;
1819
let mut writer = BufWriter::new(file);
1920

20-
py.enter_polars(|| self.df.serialize_into_writer(&mut writer))
21+
let mut slf_1 = slf.try_borrow_mut();
22+
let slf_1: Result<&mut PyDataFrame, _> = slf_1.as_deref_mut();
23+
let mut slf_2: Option<PyDataFrame> = (slf_1.is_err()).then(|| (*slf.borrow()).clone());
24+
25+
let slf: &mut PyDataFrame = slf_1.unwrap_or_else(|_| slf_2.as_mut().unwrap());
26+
27+
Ok(slf
28+
.df
29+
.serialize_into_writer(&mut writer)
30+
.map_err(PyPolarsErr::from)?)
2131
}
2232

2333
/// Deserialize a file-like object containing binary data into a DataFrame.

py-polars/tests/unit/dataframe/test_serde.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
from __future__ import annotations
22

33
import io
4+
import pickle
45
from datetime import date, datetime, timedelta
56
from decimal import Decimal as D
7+
from multiprocessing.pool import ThreadPool
68
from typing import TYPE_CHECKING, Any
79

810
import pytest
@@ -211,3 +213,10 @@ def test_df_serde_list_of_null_17230() -> None:
211213
ser = df.serialize(format="json")
212214
result = pl.DataFrame.deserialize(io.StringIO(ser), format="json")
213215
assert_frame_equal(result, df)
216+
217+
218+
def test_df_serialize_from_multiple_python_threads_22364() -> None:
219+
df = pl.DataFrame({"A": [1, 2, 3, 4]})
220+
221+
with ThreadPool(4) as tp:
222+
tp.map(pickle.dumps, [df] * 1_000)

0 commit comments

Comments
 (0)