Send heartbeat when downloading spooled segments#604
Conversation
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
8c23aae to
018afff
Compare
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
1 similar comment
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
64e228b to
38cf8a3
Compare
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
| return self._delete(url, timeout=self._request_timeout, proxies=PROXIES) | ||
|
|
||
| def head(self, url: str) -> Response: | ||
| return self._http_session.head( |
There was a problem hiding this comment.
| return self._http_session.head( | |
| return self._head( |
head() currently skips the retry wrapper that get/post/delete use. Could you wrap it in max_attempts.setter (self._head = with_retry(self._http_session.head)) and call self._head() from head()?
| for container in client.containers.list(): | ||
| if container.name != "trino": | ||
| continue |
There was a problem hiding this comment.
| for container in client.containers.list(): | |
| if container.name != "trino": | |
| continue | |
| container = client.containers.get("trino") |
Is this loop needed?
| ] | ||
| if str(port) in host_ports: | ||
| return container | ||
| except Exception: |
There was a problem hiding this comment.
Could you narrow exception?
| except Exception: | |
| except docker.errors.DockerException: |
There was a problem hiding this comment.
Right, I've narrowed down both exception and scope in e876eae.
| request: TrinoRequest, | ||
| *, | ||
| heartbeat_interval: Optional[float] = None, | ||
| ) -> None: | ||
| self._segments = iter(segments if isinstance(segments, List) else [segments]) | ||
| self._mapper = mapper | ||
| self._decoder = None | ||
| self._rows: Iterator[List[List[Any]]] = iter([]) | ||
| self._finished = False | ||
| self._current_segment: Optional[DecodableSegment] = None | ||
| self._request = request | ||
| self._heartbeat_interval = heartbeat_interval |
There was a problem hiding this comment.
| request: TrinoRequest, | |
| *, | |
| heartbeat_interval: Optional[float] = None, | |
| ) -> None: | |
| self._segments = iter(segments if isinstance(segments, List) else [segments]) | |
| self._mapper = mapper | |
| self._decoder = None | |
| self._rows: Iterator[List[List[Any]]] = iter([]) | |
| self._finished = False | |
| self._current_segment: Optional[DecodableSegment] = None | |
| self._request = request | |
| self._heartbeat_interval = heartbeat_interval | |
| request: Optional[TrinoRequest] = None, | |
| *, | |
| heartbeat_interval: Optional[float] = None, | |
| ) -> None: | |
| self._segments = iter(segments if isinstance(segments, List) else [segments]) | |
| self._mapper = mapper | |
| self._decoder = None | |
| self._rows: Iterator[List[List[Any]]] = iter([]) | |
| self._finished = False | |
| self._current_segment: Optional[DecodableSegment] = None | |
| self._request = request | |
| self._heartbeat_interval = heartbeat_interval if request is not None else None |
The new request parameter is required and positional. Could it be Optional[TrinoRequest] = None? When missing, the heartbeat is simply disabled — same semantics as heartbeat_interval=None. Keeps the constructor backward-compatible
There was a problem hiding this comment.
Agreed. I wanted to avoid handling situations in which the interval is set but the request is None, but it's better to keep SegmentIterator backwards compatible, as a public class. I've made both optional keyword args in e876eae, I'm also raising ValueError if only one is specified rather than silently treating both as None.
| self._stop_event = threading.Event() | ||
|
|
||
| def __enter__(self) -> _RequestHeartbeat: | ||
| executor.submit(self._run) |
There was a problem hiding this comment.
I wonder if we should use a dedicated daemon thread instead?
threading.Thread(..., daemon=True)
The shared executor (max_workers=4) is already used by SpooledSegment.acknowledge(), so under concurrent queries a heartbeat could be delayed waiting for a free slot.
There was a problem hiding this comment.
Agreed. Switched to a separate thread in e876eae (making self._failures a local var in _run() is unrelated, I've just noticed it's not used outside _run()).
|
Thank you for your pull request and welcome to the Trino community. We require contributors to sign our Contributor License Agreement, and we don't seem to have you on file. Continue to work with us on the review and improvements in this PR, and submit the signed CLA to cla@trino.io. Photos, scans, or digitally-signed PDF files are all suitable. Processing may take a few days. The CLA needs to be on file before we merge your changes. For more information, see https://github.com/trinodb/cla |
Description
Resolves #554
Trino can abandon a query if the client is silent for too long. When results are delivered via the spooling protocol, downloading a segment from S3/MinIO blocks the client and may cause query timeout if the download takes too long (the timeout is set in Trino configuration, current default is 5 min).
This PR implements a heartbeat mechanism: a background thread is sending periodic HEAD requests to the coordinator's nextUri while a spooled segment is being downloaded. The heartbeat is started and stopped around each
SpooledSegment.datafetch insideSegmentIterator._load_next_segment(). Heartbeat thread is not started if spooling is not used.Heartbeat requests are emitted in 30s intervals by default. Custom interval can be set using the new
heartbeat_intervalparam forConnection. Setting it toNonedisables heartbeat.Unit tests are added in the new module
tests/unit/test_client_spooling.py.An integration test is added in
tests/integration/test_dbapi_integration.py.Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
( ) Release notes are required, with the following suggested text: