Skip to content

Send heartbeat when downloading spooled segments#604

Open
azawlocki-sbdt wants to merge 5 commits into
trinodb:masterfrom
azawlocki-sbdt:azawlocki/query-heartbeat-for-spooling
Open

Send heartbeat when downloading spooled segments#604
azawlocki-sbdt wants to merge 5 commits into
trinodb:masterfrom
azawlocki-sbdt:azawlocki/query-heartbeat-for-spooling

Conversation

@azawlocki-sbdt
Copy link
Copy Markdown

@azawlocki-sbdt azawlocki-sbdt commented May 18, 2026

Description

Resolves #554

Trino can abandon a query if the client is silent for too long. When results are delivered via the spooling protocol, downloading a segment from S3/MinIO blocks the client and may cause query timeout if the download takes too long (the timeout is set in Trino configuration, current default is 5 min).

This PR implements a heartbeat mechanism: a background thread is sending periodic HEAD requests to the coordinator's nextUri while a spooled segment is being downloaded. The heartbeat is started and stopped around each SpooledSegment.data fetch inside SegmentIterator._load_next_segment(). Heartbeat thread is not started if spooling is not used.

Heartbeat requests are emitted in 30s intervals by default. Custom interval can be set using the new heartbeat_interval param for Connection. Setting it to None disables heartbeat.

Unit tests are added in the new module tests/unit/test_client_spooling.py.
An integration test is added in tests/integration/test_dbapi_integration.py.

Release notes

( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text:

* Fix some things. ({issue}`issuenumber`)

@cla-bot
Copy link
Copy Markdown

cla-bot Bot commented May 18, 2026

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

@azawlocki-sbdt azawlocki-sbdt changed the title Senf heartbeat when downloading spooled segments Send heartbeats when downloading spooled segments May 18, 2026
@azawlocki-sbdt azawlocki-sbdt marked this pull request as draft May 18, 2026 17:01
@azawlocki-sbdt azawlocki-sbdt force-pushed the azawlocki/query-heartbeat-for-spooling branch from 8c23aae to 018afff Compare May 18, 2026 17:03
@cla-bot
Copy link
Copy Markdown

cla-bot Bot commented May 18, 2026

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

@cla-bot
Copy link
Copy Markdown

cla-bot Bot commented May 22, 2026

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

1 similar comment
@cla-bot
Copy link
Copy Markdown

cla-bot Bot commented May 22, 2026

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

@azawlocki-sbdt azawlocki-sbdt changed the title Send heartbeats when downloading spooled segments Send heartbeat when downloading spooled segments May 22, 2026
@azawlocki-sbdt azawlocki-sbdt force-pushed the azawlocki/query-heartbeat-for-spooling branch from 64e228b to 38cf8a3 Compare May 25, 2026 08:43
@cla-bot
Copy link
Copy Markdown

cla-bot Bot commented May 25, 2026

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

@azawlocki-sbdt azawlocki-sbdt marked this pull request as ready for review May 25, 2026 09:50
Comment thread trino/client.py Outdated
return self._delete(url, timeout=self._request_timeout, proxies=PROXIES)

def head(self, url: str) -> Response:
return self._http_session.head(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return self._http_session.head(
return self._head(

head() currently skips the retry wrapper that get/post/delete use. Could you wrap it in max_attempts.setter (self._head = with_retry(self._http_session.head)) and call self._head() from head()?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in e876eae

Comment thread tests/development_server.py Outdated
Comment on lines +27 to +29
for container in client.containers.list():
if container.name != "trino":
continue
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
for container in client.containers.list():
if container.name != "trino":
continue
container = client.containers.get("trino")

Is this loop needed?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, I've simplified the code in e876eae.

Comment thread tests/development_server.py Outdated
]
if str(port) in host_ports:
return container
except Exception:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you narrow exception?

Suggested change
except Exception:
except docker.errors.DockerException:

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I've narrowed down both exception and scope in e876eae.

Comment thread trino/client.py Outdated
Comment on lines +1352 to +1363
request: TrinoRequest,
*,
heartbeat_interval: Optional[float] = None,
) -> None:
self._segments = iter(segments if isinstance(segments, List) else [segments])
self._mapper = mapper
self._decoder = None
self._rows: Iterator[List[List[Any]]] = iter([])
self._finished = False
self._current_segment: Optional[DecodableSegment] = None
self._request = request
self._heartbeat_interval = heartbeat_interval
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
request: TrinoRequest,
*,
heartbeat_interval: Optional[float] = None,
) -> None:
self._segments = iter(segments if isinstance(segments, List) else [segments])
self._mapper = mapper
self._decoder = None
self._rows: Iterator[List[List[Any]]] = iter([])
self._finished = False
self._current_segment: Optional[DecodableSegment] = None
self._request = request
self._heartbeat_interval = heartbeat_interval
request: Optional[TrinoRequest] = None,
*,
heartbeat_interval: Optional[float] = None,
) -> None:
self._segments = iter(segments if isinstance(segments, List) else [segments])
self._mapper = mapper
self._decoder = None
self._rows: Iterator[List[List[Any]]] = iter([])
self._finished = False
self._current_segment: Optional[DecodableSegment] = None
self._request = request
self._heartbeat_interval = heartbeat_interval if request is not None else None

The new request parameter is required and positional. Could it be Optional[TrinoRequest] = None? When missing, the heartbeat is simply disabled — same semantics as heartbeat_interval=None. Keeps the constructor backward-compatible

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. I wanted to avoid handling situations in which the interval is set but the request is None, but it's better to keep SegmentIterator backwards compatible, as a public class. I've made both optional keyword args in e876eae, I'm also raising ValueError if only one is specified rather than silently treating both as None.

Comment thread trino/client.py Outdated
self._stop_event = threading.Event()

def __enter__(self) -> _RequestHeartbeat:
executor.submit(self._run)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should use a dedicated daemon thread instead?
threading.Thread(..., daemon=True)
The shared executor (max_workers=4) is already used by SpooledSegment.acknowledge(), so under concurrent queries a heartbeat could be delayed waiting for a free slot.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Switched to a separate thread in e876eae (making self._failures a local var in _run() is unrelated, I've just noticed it's not used outside _run()).

@cla-bot
Copy link
Copy Markdown

cla-bot Bot commented May 29, 2026

Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla

@azawlocki-sbdt azawlocki-sbdt requested a review from damian3031 May 29, 2026 09:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Development

Successfully merging this pull request may close these issues.

Send query heartbeat while downloading results

2 participants