Skip to content

Freeze/Lock when filtering two joined frames scanned from parquet #22641

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
2 tasks done
alexander-rush opened this issue May 7, 2025 · 1 comment · May be fixed by #22672
Open
2 tasks done

Freeze/Lock when filtering two joined frames scanned from parquet #22641

alexander-rush opened this issue May 7, 2025 · 1 comment · May be fixed by #22672
Assignees
Labels
accepted Ready for implementation bug Something isn't working P-high Priority: high python Related to Python Polars regression Issue introduced by a new release

Comments

@alexander-rush
Copy link

alexander-rush commented May 7, 2025

Checks

  • I have checked that this issue has not already been reported.
  • I have confirmed this bug exists on the latest version of Polars.

Reproducible example

NUM_THREADS = 1  # Observe deadlock if NUM_THREADS = 1
TEST_PARQUET_PATH_1 = f"./test1.parquet"
TEST_PARQUET_PATH_2 = f"./test2.parquet"

import os
os.environ["POLARS_MAX_THREADS"] = str(NUM_THREADS)
os.environ["POLARS_VERBOSE"]="1"

import polars as pl
print(f"PID: {os.getpid()}")
print(f"Thread pool size: {pl.thread_pool_size()}")
assert pl.thread_pool_size() == NUM_THREADS, "Just in case we're in a notebook and forget to reload"

import numpy as np
import datetime as dt

start_date=dt.date(2021, 1, 1)
end_date=dt.date(2021, 12, 31)

ints = [x for x in range(114)]
todays = [dt.date(2021, 1, 1) + dt.timedelta(days=x) for x in range(508)]
times = [y for x in todays for y in [dt.datetime.combine(x, dt.time(hour=h)) for h in range(11)]]
todays_df = pl.DataFrame({
    "times": times,
})
ints_df = pl.DataFrame({"ints": ints})

index_df = todays_df.join(ints_df, how="cross").with_columns(pl.col("times").dt.date().alias("today"))
df = index_df.with_columns(
    pl.lit(np.random.rand(index_df.height)).alias(f"floats{x}") for x in range(8)
)
df2 = index_df.with_columns(
    pl.lit(np.random.rand(index_df.height)).alias(f"floats2{x}") for x in range(8)
)

df.write_parquet(TEST_PARQUET_PATH_1)
df2.write_parquet(TEST_PARQUET_PATH_2)
# Observe deadlock at this line
reloaded_filtered_parquet = (
    pl.scan_parquet(TEST_PARQUET_PATH_1)
    .join(
        pl.scan_parquet(TEST_PARQUET_PATH_2), on=["times", "ints", "today",], how="inner",
    )
    .filter((start_date <= pl.col("today")) & (pl.col("today") <= end_date)).filter(pl.sum_horizontal([f'floats{h}' for h in range(8)]).is_nan().not_())
    .collect()
)
print("Parquet round-tripped and filtered successfully")

Log output

join parallel: true
CROSS join dataframes finished
_init_credential_provider_builder(): credential_provider_init = None
Writeable: try_new: local: ./test1.parquet (canonicalize: Ok("test1.parquet"))
_init_credential_provider_builder(): credential_provider_init = None
Writeable: try_new: local: ./test2.parquet (canonicalize: Ok("test2.parquet"))
_init_credential_provider_builder(): credential_provider_init = None
_init_credential_provider_builder(): credential_provider_init = None
join parallel: true
polars-stream: updating graph state
async thread count: 1
polars-stream: running multi-scan[parquet] in subgraph
polars-stream: running in-memory-sink in subgraph
[MultiScanTaskInitializer]: spawn_background_tasks(), 1 sources, reader name: parquet, ReaderCapabilities(ROW_INDEX | PRE_SLICE | NEGATIVE_PRE_SLICE | PARTIAL_FILTER | FULL_FILTER)
[MultiScanTaskInitializer]: predicate: Some("<predicate>"), skip files mask: None, predicate to reader: Some("<predicate>")
[MultiScanTaskInitializer]: scan_source_idx: 0 extra_ops: ExtraOperations { row_index: None, pre_slice: None, cast_columns_policy: ErrorOnMismatch, missing_columns_policy: Raise, include_file_paths: None, predicate: Some(scan_io_predicate) }
[MultiScanTaskInitializer]: Readers init range: 0..1 (1 / 1 files)
[ReaderStarter]: max_concurrent_scans: 1
[MultiScan]: Initialize source 0
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: got reader, n_readers_received: 1
memory prefetch function: madvise_willneed
[ParquetFileReader]: project: 11 / 11, pre_slice: None, resolved_pre_slice: None, row_index: None, predicate: Some("<predicate>")
[ParquetFileReader]: Config { num_pipelines: 1, row_group_prefetch_size: 128, min_values_per_thread: 16777216 }
[ParquetFileReader]: Pre-filtered decode enabled (9 live, 2 non-live)
[ParquetFileReader]: ideal_morsel_size: 100000

Issue description

I do not observe this error in polars 1.27.1, but I do observe it in 1.28.0, 1.28.1 and 1.29.0

If NUM_THREADS is greater than 1, the operation seems to complete successfully, otherwise the script freezes entirely in the collect() call.

Expected behavior

Expected to see the operation complete in finite time and print the final log line.

Installed versions

--------Version info---------
Polars:              1.29.0
Index type:          UInt32
Platform:            Linux-5.15.0-91-generic-x86_64-with-glibc2.35
Python:              3.10.12 (main, Feb  4 2025, 14:57:36) [GCC 11.4.0]
LTS CPU:             False

----Optional dependencies----
Azure CLI            <not installed>
adbc_driver_manager  <not installed>
altair               <not installed>
azure.identity       <not installed>
boto3                <not installed>
cloudpickle          <not installed>
connectorx           <not installed>
deltalake            <not installed>
fastexcel            <not installed>
fsspec               <not installed>
gevent               <not installed>
google.auth          <not installed>
great_tables         <not installed>
matplotlib           <not installed>
numpy                2.2.5
openpyxl             <not installed>
pandas               <not installed>
polars_cloud         <not installed>
pyarrow              <not installed>
pydantic             <not installed>
pyiceberg            <not installed>
sqlalchemy           <not installed>
torch                <not installed>
xlsx2csv             <not installed>
xlsxwriter           <not installed>
@alexander-rush alexander-rush added bug Something isn't working python Related to Python Polars needs triage Awaiting prioritization by a maintainer labels May 7, 2025
@nameexhaustion nameexhaustion added regression Issue introduced by a new release accepted Ready for implementation P-high Priority: high and removed needs triage Awaiting prioritization by a maintainer labels May 7, 2025
@nameexhaustion
Copy link
Collaborator

nameexhaustion commented May 7, 2025

sum_horizontal is calling into rayon here from the new-streaming parquet statistics evaluation, but the single rayon thread is blocked on the StreamingQueryExecutor.

@nameexhaustion nameexhaustion self-assigned this May 7, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
accepted Ready for implementation bug Something isn't working P-high Priority: high python Related to Python Polars regression Issue introduced by a new release
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants