Improve KeyboardInterrupt handling during Arrow C stream reads in Python bindings#1
Merged
Merged
Conversation
Updated wait_for_future to surface pending Python exceptions by executing bytecode during signal checks, ensuring that asynchronous interrupts are processed promptly. Enhanced PartitionedDataFrameStreamReader to cancel remaining partition streams on projection errors or Python interrupts, allowing for clean iteration stops. Added regression tests to validate interrupted Arrow C stream reads and improve timing for RecordBatchReader.read_all cancellations.
…g and readability
…k for KeyboardInterrupt
This reverts commit 784929d.
Author
|
The Jupyter notebook is saved in commit 784929d for testing. |
nuno-faria
pushed a commit
that referenced
this pull request
Jun 4, 2026
* Add AGENTS.md and enrich __init__.py module docstring Add python/datafusion/AGENTS.md as a comprehensive DataFrame API guide for AI agents and users. It ships with pip automatically (Maturin includes everything under python-source = "python"). Covers core abstractions, import conventions, data loading, all DataFrame operations, expression building, a SQL-to-DataFrame reference table, common pitfalls, idiomatic patterns, and a categorized function index. Enrich the __init__.py module docstring from 2 lines to a full overview with core abstractions, a quick-start example, and a pointer to AGENTS.md. Closes apache#1394 (PR 1a) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Clarify audience of root vs package AGENTS.md The root AGENTS.md (symlinked as CLAUDE.md) is for contributors working on the project. Add a pointer to python/datafusion/AGENTS.md which is the user-facing DataFrame API guide shipped with the package. Also add the Apache license header to the package AGENTS.md. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add PR template and pre-commit check guidance to AGENTS.md Document that all PRs must follow .github/pull_request_template.md and that pre-commit hooks must pass before committing. List all configured hooks (actionlint, ruff, ruff-format, cargo fmt, cargo clippy, codespell, uv-lock) and the command to run them manually. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Remove duplicated hook list from AGENTS.md Let the hooks be discoverable from .pre-commit-config.yaml rather than maintaining a separate list that can drift. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Fix AGENTS.md: Arrow C Data Interface, aggregate filter, fluent example - Clarify that DataFusion works with any Arrow C Data Interface implementation, not just PyArrow. - Show the filter keyword argument on aggregate functions (the idiomatic HAVING equivalent) instead of the post-aggregate .filter() pattern. - Update the SQL reference table to show FILTER (WHERE ...) syntax. - Remove the now-incorrect "Aggregate then filter for HAVING" pitfall. - Add .collect() to the fluent chaining example so the result is clearly materialized. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Update agents file after working through the first tpc-h query using only the text description * Add feedback from working through each of the TPC-H queries * Address Copilot review feedback on AGENTS.md - Wrap CASE/WHEN method-chain examples in parentheses and assign to a variable so they are valid Python as shown (Copilot #1, apache#2). - Fix INTERSECT/EXCEPT mapping: the default distinct=False corresponds to INTERSECT ALL / EXCEPT ALL, not the distinct forms. Updated both the Set Operations section and the SQL reference table to show both the ALL and distinct variants (Copilot apache#4). - Change write_parquet / write_csv / write_json examples to file-style paths (output.parquet, etc.) to match the convention used in existing tests and examples. Note that a directory path is also valid for partitioned output (Copilot apache#5). Verified INTERSECT/EXCEPT semantics with a script: df1.intersect(df2) -> [1, 1, 2] (= INTERSECT ALL) df1.intersect(df2, distinct=True) -> [1, 2] (= INTERSECT) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Use short-form comparisons in AGENTS.md examples Drop lit() on the RHS of comparison operators since Expr auto-wraps raw Python values, matching the style the guide recommends (Copilot apache#3, apache#6). Updates examples in the Aggregation, CASE/WHEN, SQL reference table, Common Pitfalls, Fluent Chaining, and Variables-as-CTEs sections, plus the __init__.py quick-start snippet. Prose explanations of the rule (which cite the long form as the thing to avoid) are left unchanged. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Move user guide from python/datafusion/AGENTS.md to SKILL.md The in-wheel AGENTS.md was not a real distribution channel -- no shipping agent walks site-packages for AGENTS.md files. Moving to SKILL.md at the repo root, with YAML frontmatter, lets the skill ecosystems (npx skills, Claude Code plugin marketplaces, community aggregators) discover it. Update the pointers in the contributor AGENTS.md and the __init__.py module docstring accordingly. The docstring now references the GitHub URL since the file no longer ships with the wheel. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Address review feedback: doctest, streaming, date/timestamp - Convert the __init__.py quick-start block to doctest format so it is picked up by `pytest --doctest-modules` (already the project default), preventing silent rot. - Extract streaming into its own SKILL.md subsection with guidance on when to prefer execute_stream() over collect(), sync and async iteration, and execute_stream_partitioned() for per-partition streams. - Generalize the date-arithmetic rule from Date32 to both Date32 and Date64 (both reject Duration at any precision, both accept month_day_nano_interval), and note that Timestamp columns differ and do accept Duration. - Document the PyArrow-inherited type mapping returned by to_pydict()/to_pylist(), including the nanosecond fallback to pandas.Timestamp / pandas.Timedelta and the to_pandas() footgun where date columns come back as an object dtype. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> * Distinguish user guide from agent reference in module docstring The docstring pointed readers at SKILL.md as a "comprehensive guide," but SKILL.md is written in a dense, skill-oriented format for agents — humans are better served by the online user guide. Put the online docs first as the primary reference and label the SKILL.md link as the agent reference. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Rationale for this change
After upgrading to the latest DataFusion version, we observed that long‑running Python operations backed by Arrow C streams were no longer reliably interruptible via
KeyboardInterrupt(e.g. Ctrl+C).The root cause is that simply calling
py.check_signals()is not sufficient to trigger CPython's signal handling for pending signals; it only raises an exception if a signal has already been processed by a previous Python API call. When our async utility sits in a Rust/Tokio loop without executing any Python code, pending signals (likeSIGINT) may never be processed, soKeyboardInterruptis not delivered to user code.This PR fixes the problem by:
Together, these changes restore responsive
KeyboardInterruptbehavior for long‑running DataFusion/Arrow C stream operations in the Python bindings after the DataFusion upgrade.What changes are included in this PR?
Rust async signal handling (
src/utils.rs)Import
std::ffi::CStringto safely construct C string literals for the Python C API.In
future_into_py, replace the barepy.check_signals()call inside thetokio::select!loop with a small closure that:CStringfor the Python code"pass".py.runwith this code, causing the interpreter to execute a no‑op Python statement.py.check_signals()to raiseKeyboardInterrupt(or other signal‑related exceptions) if a signal was processed while running the code.This ensures that pending signals are actually processed while the Rust future is waiting, making async operations interruptible again.
Python test for Arrow C stream interruption (
python/tests/test_dataframe.py)Rework
test_arrow_c_stream_interruptedto:read_startedevent to signal when the background thread begins consuming theRecordBatchReader.read_thread_idusingthreading.get_ident()inside the read thread, instead of assuming that the main thread is doing the read.read_exceptionlist to capture the outcome of the read operation (e.g.KeyboardInterrupt, timeout, or unexpected exception).Add a
read_streamhelper function that:Sets
read_thread_id.Sets the
read_startedevent to inform the interrupt thread that the read is in progress.Calls
reader.read_all()and records the result:RuntimeError("Read completed without interruption")intoread_exception.KeyboardInterruptis raised, it records that.Update
trigger_interruptto:read_startedwith amax_wait_timetimeout.PyThreadState_SetAsyncExcto raiseKeyboardInterruptspecifically in theread_thread_idinstead of the main thread.RuntimeErrorif the injection fails.Start both the read thread and the interrupt thread, then:
jointhe read thread with a 10‑second timeout and fail the test if it is still alive (indicating a hang).KeyboardInterruptor clearly contains"KeyboardInterrupt"in its message, to handle cases where it might be wrapped.jointhe interrupt thread with a short timeout.Overall, the test now:
KeyboardInterrupt.Are these changes tested?
Yes.
The existing
test_arrow_c_stream_interruptedhas been rewritten to more accurately model the real‑world scenario of interrupting a blocking Arrow C stream read running in a background thread.The test validates that:
KeyboardInterrupt(or an exception clearly indicatingKeyboardInterrupt) is raised when we inject the signal into the read thread.No additional tests were required beyond this updated test, as it directly exercises the new behavior introduced by the
future_into_pychanges when integrated with the Python runtime.Test in Jupyter Notebook
Are there any user-facing changes?
Yes, but they are behavioral improvements rather than API changes:
KeyboardInterruptpromptly.No breaking changes to public APIs are introduced by this PR. The only observable change is more reliable and responsive handling of
KeyboardInterruptduring long‑running operations.