Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

from __future__ import annotations

import json
import time
import uuid
from typing import Any, Literal

import requests
from attrs import define
from gooddata_api_client.api.ai_lake_api import AILakeApi
from gooddata_api_client.model.analyze_statistics_request import AnalyzeStatisticsRequest
Expand Down Expand Up @@ -76,6 +78,52 @@ def __init__(self, api_client: GoodDataApiClient) -> None:
self._client = api_client
self._ai_lake_api: AILakeApi = AILakeApi(api_client._api_client)

def refresh_partition(
self,
instance_id: str,
table_name: str,
partition_spec: dict[str, str],
operation_id: str | None = None,
) -> str:
"""(BETA) Delete all rows for the specified Hive partition and re-load from S3.

Triggers a ``refresh-partition`` long-running operation in the AI Lake.
The generated client does not yet expose this endpoint, so the SDK calls
it via a raw HTTP POST using the same URL-construction logic as
:meth:`~gooddata_sdk.client.GoodDataApiClient._do_post_request`.

Args:
instance_id: Database instance name (preferred) or UUID.
table_name: Name of the pipe-backed OLAP table.
partition_spec: Partition column values that identify the partition to
refresh (e.g. ``{"year": "2024", "month": "01"}``).
operation_id: Optional client-supplied operation identifier. If
omitted, a fresh UUID is generated. Pass the same value to
:meth:`wait_for_operation` to poll.

Returns:
The operation ID (UUID string) the platform will track this run
under. Pass it to :meth:`get_operation` / :meth:`wait_for_operation`.
"""
op_id = operation_id or str(uuid.uuid4())
endpoint = f"api/v1/ailake/database/instances/{instance_id}/pipeTables/{table_name}/refresh"
hostname = self._client._hostname
if not hostname.endswith("/"):
endpoint = f"/{endpoint}"
url = f"{hostname}{endpoint}"
body = json.dumps({"partitionSpec": partition_spec}).encode("utf-8")
response = requests.post(
url=url,
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self._client._token}",
"operation-id": op_id,
},
data=body,
)
response.raise_for_status()
return op_id

def analyze_statistics(
self,
instance_id: str,
Expand Down
35 changes: 35 additions & 0 deletions packages/gooddata-sdk/tests/catalog/test_catalog_ai_lake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# (C) 2026 GoodData Corporation
"""Integration tests for CatalogAILakeService.

These tests exercise the full SDK → HTTP → server round-trip via VCR cassettes.
Each test function maps to exactly one cassette YAML.
"""

from __future__ import annotations

from pathlib import Path

from gooddata_sdk import GoodDataSdk
from tests_support.vcrpy_utils import get_vcr

gd_vcr = get_vcr()

_current_dir = Path(__file__).parent.absolute()
_fixtures_dir = _current_dir / "fixtures" / "ai_lake"


@gd_vcr.use_cassette(str(_fixtures_dir / "test_refresh_partition.yaml"))
def test_refresh_partition(test_config):
"""(BETA) Trigger a refresh-partition operation and verify a valid operation ID is returned.

The test seeds a caller-supplied operation_id so the cassette is deterministic,
then confirms the returned ID is exactly what was supplied.
"""
sdk = GoodDataSdk.create(host_=test_config["host"], token_=test_config["token"])
op_id = sdk.catalog_ai_lake.refresh_partition(
instance_id="demo-db",
table_name="fact_orders",
partition_spec={"year": "2024", "month": "01"},
operation_id="aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee",
)
assert op_id == "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

from __future__ import annotations

import json
from types import SimpleNamespace
from unittest.mock import MagicMock, call, patch

Expand All @@ -23,7 +24,11 @@
def _make_service() -> tuple[CatalogAILakeService, MagicMock]:
"""Build a service whose api-client side is fully mocked."""
fake_ai_lake_api = MagicMock(name="AILakeApi")
fake_client = SimpleNamespace(_api_client=MagicMock(name="ApiClient"))
fake_client = SimpleNamespace(
_api_client=MagicMock(name="ApiClient"),
_hostname="http://localhost:3000",
_token="test-token",
)

with patch("gooddata_sdk.catalog.ai_lake.service.AILakeApi", return_value=fake_ai_lake_api):
service = CatalogAILakeService(fake_client) # type: ignore[arg-type]
Expand Down Expand Up @@ -144,3 +149,64 @@ def test_times_out_when_never_terminal(self) -> None:
):
service.wait_for_operation("op", timeout_s=10.0, poll_s=0.1)
assert "did not finish within 10.0s" in str(exc_info.value)


class TestRefreshPartition:
"""Unit tests for refresh_partition, which uses raw requests.post.

The interesting logic here is UUID seeding, URL construction, and correct
forwarding of the operation-id header — worth verifying without a live stack.
"""

@patch("gooddata_sdk.catalog.ai_lake.service.requests.post")
def test_seeds_caller_supplied_operation_id(self, mock_post: MagicMock) -> None:
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response

service, _ = _make_service()
op_id = service.refresh_partition(
instance_id="demo-db",
table_name="fact_orders",
partition_spec={"year": "2024"},
operation_id="11111111-2222-3333-4444-555555555555",
)
assert op_id == "11111111-2222-3333-4444-555555555555"
call_kwargs = mock_post.call_args
assert call_kwargs.kwargs["headers"]["operation-id"] == "11111111-2222-3333-4444-555555555555"

@patch("gooddata_sdk.catalog.ai_lake.service.requests.post")
def test_generates_uuid_when_not_supplied(self, mock_post: MagicMock) -> None:
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response

service, _ = _make_service()
op_id = service.refresh_partition(
instance_id="demo-db",
table_name="fact_orders",
partition_spec={"year": "2024"},
)
assert len(op_id) == 36 and op_id.count("-") == 4
call_kwargs = mock_post.call_args
assert call_kwargs.kwargs["headers"]["operation-id"] == op_id

@patch("gooddata_sdk.catalog.ai_lake.service.requests.post")
def test_constructs_correct_url_and_body(self, mock_post: MagicMock) -> None:
mock_response = MagicMock()
mock_response.raise_for_status.return_value = None
mock_post.return_value = mock_response

service, _ = _make_service()
service.refresh_partition(
instance_id="demo-db",
table_name="sales",
partition_spec={"region": "eu", "month": "2024-01"},
)
call_kwargs = mock_post.call_args
url = call_kwargs.kwargs["url"]
assert "demo-db" in url
assert "sales" in url
assert url.endswith("/refresh")
body = json.loads(call_kwargs.kwargs["data"])
assert body == {"partitionSpec": {"region": "eu", "month": "2024-01"}}
Loading