Skip to content

Improve KeyboardInterrupt handling during Arrow C stream reads in Python bindings#1

Merged
nuno-faria merged 15 commits into
nuno-faria:datafusion_51from
kosiew:pr-1311
Nov 29, 2025
Merged

Improve KeyboardInterrupt handling during Arrow C stream reads in Python bindings#1
nuno-faria merged 15 commits into
nuno-faria:datafusion_51from
kosiew:pr-1311

Conversation

@kosiew

@kosiew kosiew commented Nov 29, 2025

Copy link
Copy Markdown

Rationale for this change

After upgrading to the latest DataFusion version, we observed that long‑running Python operations backed by Arrow C streams were no longer reliably interruptible via KeyboardInterrupt (e.g. Ctrl+C).

The root cause is that simply calling py.check_signals() is not sufficient to trigger CPython's signal handling for pending signals; it only raises an exception if a signal has already been processed by a previous Python API call. When our async utility sits in a Rust/Tokio loop without executing any Python code, pending signals (like SIGINT) may never be processed, so KeyboardInterrupt is not delivered to user code.

This PR fixes the problem by:

  • Forcing the Python interpreter to run a trivial no‑op statement periodically from the async loop so that pending signals are processed.
  • Tightening and modernizing the Python test so it actually interrupts the thread performing the Arrow C stream read, rather than assuming the main thread is doing the read. This makes the test robust and aligned with the new behavior.

Together, these changes restore responsive KeyboardInterrupt behavior for long‑running DataFusion/Arrow C stream operations in the Python bindings after the DataFusion upgrade.

What changes are included in this PR?

  1. Rust async signal handling (src/utils.rs)

    • Import std::ffi::CString to safely construct C string literals for the Python C API.

    • In future_into_py, replace the bare py.check_signals() call inside the tokio::select! loop with a small closure that:

      • Constructs a CString for the Python code "pass".
      • Calls py.run with this code, causing the interpreter to execute a no‑op Python statement.
      • Then calls py.check_signals() to raise KeyboardInterrupt (or other signal‑related exceptions) if a signal was processed while running the code.
    • This ensures that pending signals are actually processed while the Rust future is waiting, making async operations interruptible again.

  2. Python test for Arrow C stream interruption (python/tests/test_dataframe.py)

    • Rework test_arrow_c_stream_interrupted to:

      • Introduce a read_started event to signal when the background thread begins consuming the RecordBatchReader.
      • Track the read_thread_id using threading.get_ident() inside the read thread, instead of assuming that the main thread is doing the read.
      • Use a shared read_exception list to capture the outcome of the read operation (e.g. KeyboardInterrupt, timeout, or unexpected exception).
    • Add a read_stream helper function that:

      • Sets read_thread_id.

      • Sets the read_started event to inform the interrupt thread that the read is in progress.

      • Calls reader.read_all() and records the result:

        • If the read unexpectedly completes, it pushes a RuntimeError("Read completed without interruption") into read_exception.
        • If KeyboardInterrupt is raised, it records that.
        • Any other exception is also captured for assertion.
    • Update trigger_interrupt to:

      • Wait for read_started with a max_wait_time timeout.
      • Use PyThreadState_SetAsyncExc to raise KeyboardInterrupt specifically in the read_thread_id instead of the main thread.
      • Reset the exception state and raise a RuntimeError if the injection fails.
    • Start both the read thread and the interrupt thread, then:

      • join the read thread with a 10‑second timeout and fail the test if it is still alive (indicating a hang).
      • Assert that an exception was captured.
      • Assert that the captured exception is either a KeyboardInterrupt or clearly contains "KeyboardInterrupt" in its message, to handle cases where it might be wrapped.
      • Finally, join the interrupt thread with a short timeout.

Overall, the test now:

  • Targets the thread actually performing the blocking read.
  • Is resilient to timing issues.
  • Explicitly verifies that the interruption is manifested as a KeyboardInterrupt.

Are these changes tested?

Yes.

  • The existing test_arrow_c_stream_interrupted has been rewritten to more accurately model the real‑world scenario of interrupting a blocking Arrow C stream read running in a background thread.

  • The test validates that:

    • The read operation does not hang indefinitely (it must complete or be interrupted within the timeout).
    • A KeyboardInterrupt (or an exception clearly indicating KeyboardInterrupt) is raised when we inject the signal into the read thread.

No additional tests were required beyond this updated test, as it directly exercises the new behavior introduced by the future_into_py changes when integrated with the Python runtime.

Test in Jupyter Notebook

Nov-29-2025 20-47-59

Are there any user-facing changes?

Yes, but they are behavioral improvements rather than API changes:

  • Python users running long‑running queries or scans via DataFusion/Arrow C streams should once again be able to interrupt those operations with Ctrl+C and receive a KeyboardInterrupt promptly.
  • There are no changes to public function signatures, modules, or configuration options; this is a bug fix in the underlying async and signal‑handling behavior.

No breaking changes to public APIs are introduced by this PR. The only observable change is more reliable and responsive handling of KeyboardInterrupt during long‑running operations.

nuno-faria and others added 14 commits November 29, 2025 19:36
Updated wait_for_future to surface pending Python exceptions by
executing bytecode during signal checks, ensuring that asynchronous
interrupts are processed promptly. Enhanced PartitionedDataFrameStreamReader
to cancel remaining partition streams on projection errors or Python
interrupts, allowing for clean iteration stops. Added regression tests
to validate interrupted Arrow C stream reads and improve timing
for RecordBatchReader.read_all cancellations.
@kosiew

kosiew commented Nov 29, 2025

Copy link
Copy Markdown
Author

The Jupyter notebook is saved in commit 784929d for testing.

@nuno-faria nuno-faria merged commit ab63c6c into nuno-faria:datafusion_51 Nov 29, 2025
nuno-faria pushed a commit that referenced this pull request Jun 4, 2026
* Add AGENTS.md and enrich __init__.py module docstring

Add python/datafusion/AGENTS.md as a comprehensive DataFrame API guide
for AI agents and users. It ships with pip automatically (Maturin includes
everything under python-source = "python"). Covers core abstractions,
import conventions, data loading, all DataFrame operations, expression
building, a SQL-to-DataFrame reference table, common pitfalls, idiomatic
patterns, and a categorized function index.

Enrich the __init__.py module docstring from 2 lines to a full overview
with core abstractions, a quick-start example, and a pointer to AGENTS.md.

Closes apache#1394 (PR 1a)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Clarify audience of root vs package AGENTS.md

The root AGENTS.md (symlinked as CLAUDE.md) is for contributors working
on the project. Add a pointer to python/datafusion/AGENTS.md which is
the user-facing DataFrame API guide shipped with the package. Also add
the Apache license header to the package AGENTS.md.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add PR template and pre-commit check guidance to AGENTS.md

Document that all PRs must follow .github/pull_request_template.md and
that pre-commit hooks must pass before committing. List all configured
hooks (actionlint, ruff, ruff-format, cargo fmt, cargo clippy, codespell,
uv-lock) and the command to run them manually.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Remove duplicated hook list from AGENTS.md

Let the hooks be discoverable from .pre-commit-config.yaml rather than
maintaining a separate list that can drift.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Fix AGENTS.md: Arrow C Data Interface, aggregate filter, fluent example

- Clarify that DataFusion works with any Arrow C Data Interface
  implementation, not just PyArrow.
- Show the filter keyword argument on aggregate functions (the idiomatic
  HAVING equivalent) instead of the post-aggregate .filter() pattern.
- Update the SQL reference table to show FILTER (WHERE ...) syntax.
- Remove the now-incorrect "Aggregate then filter for HAVING" pitfall.
- Add .collect() to the fluent chaining example so the result is clearly
  materialized.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Update agents file after working through the first tpc-h query using only the text description

* Add feedback from working through each of the TPC-H queries

* Address Copilot review feedback on AGENTS.md

- Wrap CASE/WHEN method-chain examples in parentheses and assign to a
  variable so they are valid Python as shown (Copilot #1, apache#2).
- Fix INTERSECT/EXCEPT mapping: the default distinct=False corresponds to
  INTERSECT ALL / EXCEPT ALL, not the distinct forms. Updated both the
  Set Operations section and the SQL reference table to show both the
  ALL and distinct variants (Copilot apache#4).
- Change write_parquet / write_csv / write_json examples to file-style
  paths (output.parquet, etc.) to match the convention used in existing
  tests and examples. Note that a directory path is also valid for
  partitioned output (Copilot apache#5).

Verified INTERSECT/EXCEPT semantics with a script:
  df1.intersect(df2)                -> [1, 1, 2]  (= INTERSECT ALL)
  df1.intersect(df2, distinct=True) -> [1, 2]     (= INTERSECT)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Use short-form comparisons in AGENTS.md examples

Drop lit() on the RHS of comparison operators since Expr auto-wraps raw
Python values, matching the style the guide recommends (Copilot apache#3, apache#6).

Updates examples in the Aggregation, CASE/WHEN, SQL reference table,
Common Pitfalls, Fluent Chaining, and Variables-as-CTEs sections, plus
the __init__.py quick-start snippet. Prose explanations of the rule
(which cite the long form as the thing to avoid) are left unchanged.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Move user guide from python/datafusion/AGENTS.md to SKILL.md

The in-wheel AGENTS.md was not a real distribution channel -- no shipping
agent walks site-packages for AGENTS.md files. Moving to SKILL.md at the
repo root, with YAML frontmatter, lets the skill ecosystems (npx skills,
Claude Code plugin marketplaces, community aggregators) discover it.

Update the pointers in the contributor AGENTS.md and the __init__.py
module docstring accordingly. The docstring now references the GitHub
URL since the file no longer ships with the wheel.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Address review feedback: doctest, streaming, date/timestamp

- Convert the __init__.py quick-start block to doctest format so it is
  picked up by `pytest --doctest-modules` (already the project default),
  preventing silent rot.
- Extract streaming into its own SKILL.md subsection with guidance on
  when to prefer execute_stream() over collect(), sync and async
  iteration, and execute_stream_partitioned() for per-partition streams.
- Generalize the date-arithmetic rule from Date32 to both Date32 and
  Date64 (both reject Duration at any precision, both accept
  month_day_nano_interval), and note that Timestamp columns differ and
  do accept Duration.
- Document the PyArrow-inherited type mapping returned by
  to_pydict()/to_pylist(), including the nanosecond fallback to
  pandas.Timestamp / pandas.Timedelta and the to_pandas() footgun where
  date columns come back as an object dtype.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

* Distinguish user guide from agent reference in module docstring

The docstring pointed readers at SKILL.md as a "comprehensive guide," but
SKILL.md is written in a dense, skill-oriented format for agents — humans
are better served by the online user guide. Put the online docs first as
the primary reference and label the SKILL.md link as the agent reference.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

---------

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants