feat: bigquery_fetcher entrypoint#67
Conversation
| app_feature=app_feature, | ||
| amount=int(amount), | ||
| usage_type=unit, | ||
| timestamp=None if timestamp is None else int(timestamp), |
There was a problem hiding this comment.
Bug: The code calls int(timestamp) on a value from BigQuery. This will raise a TypeError because the BigQuery client returns datetime.datetime objects for timestamp columns, which cannot be cast to an integer directly.
Severity: CRITICAL
Suggested Fix
Before converting the timestamp to an integer, check if it is a datetime.datetime object. If it is, convert it to a Unix epoch timestamp first (e.g., by using timestamp.timestamp()) before casting to int. This will correctly handle the type returned by the BigQuery client.
Prompt for AI Agent
Review the code at the location below. A potential bug has been identified by an AI
agent. Verify if this is a real issue. If it is, propose a fix; if not, explain why it's
not valid.
Location: py/usageaccountant/bigquery_fetcher.py#L75
Potential issue: The code at `py/usageaccountant/bigquery_fetcher.py:75` attempts to
convert a `timestamp` value from a BigQuery result to an integer using `int(timestamp)`.
However, the BigQuery Python client library automatically converts `TIMESTAMP` columns
into `datetime.datetime` objects. Calling `int()` directly on a `datetime.datetime`
object will raise a `TypeError`, causing the data fetching process to fail whenever a
query includes a standard timestamp column. The existing tests do not detect this
because they use mock dictionaries with integer timestamps, not actual BigQuery `Row`
objects.
Did we get this right? 👍 / 👎 to inform future reviews.
|
|
||
| [project.optional-dependencies] | ||
| # Only needed for the `bigquery_fetcher` entrypoint | ||
| bigquery = ['google-cloud-bigquery>=3.0.0'] |
There was a problem hiding this comment.
Do we need to update the uv.lock as well? Although not sure if that's used anywhere in practice, maybe only for development.
| bucket_time = now if timestamp is None else timestamp | ||
| key = UsageKey( | ||
| floor(now / self.__granularity_sec) * self.__granularity_sec, | ||
| floor(bucket_time / self.__granularity_sec) | ||
| * self.__granularity_sec, |
There was a problem hiding this comment.
Maybe better?
| bucket_time = now if timestamp is None else timestamp | |
| key = UsageKey( | |
| floor(now / self.__granularity_sec) * self.__granularity_sec, | |
| floor(bucket_time / self.__granularity_sec) | |
| * self.__granularity_sec, | |
| bucket_time = timestamp if timestamp is not None else floor(now / self.__granularity_sec) * self.__granularity_sec | |
| key = UsageKey( | |
| bucket_time, |
| query_list = parse_and_assert_query_file(query_file) | ||
| record_list: List[UsageAccumulatorRecord] = [] | ||
| for query_dict in query_list: | ||
| query = query_dict["query"] |
There was a problem hiding this comment.
I hope the query is pretty concise, otherwise I don't really love the idea of putting a large sql query in a JSON.
There was a problem hiding this comment.
the Bigtable query for us will look something like:
SELECT
SPLIT(rowkey, '/')[OFFSET(0)] AS app_feature, -- TODO mapping
SUM(
-- `m` field size: object metadata
IFNULL(BYTE_LENGTH(COALESCE(fg.m.cell.value, fm.m.cell.value)), 0)
-- `p` field size: object payload
-- check `m` for precomputed size, fallback to scanning `p`
+ IFNULL(
COALESCE(
LAX_INT64(
JSON_QUERY(
TO_JSON(COALESCE(fg.m.cell.value, fm.m.cell.value)), "$.size")),
BYTE_LENGTH(COALESCE(fg.p.cell.value, fm.p.cell.value))),
0)
-- `t` field size: tombstone metadata
+ IFNULL(BYTE_LENGTH(COALESCE(fg.t.cell.value, fm.t.cell.value)), 0)
-- `r` field size: tombstone
+ IFNULL(BYTE_LENGTH(COALESCE(fg.r.cell.value, fm.r.cell.value)), 0))
AS amount
FROM `eng-dev-sbx--files-1.objectstore.bigtable_objectstore`
GROUP BY usecase
ORDER BY total_bytes DESC;
GCS one should be shorter bc size is precomputed
lcian
left a comment
There was a problem hiding this comment.
Left some suggestions. Please address the CI checks as well.
as title
timestampaccumulator for times when you're querying a snapshot. you can put in the snapshot timedatadog_fetcher.pyto newfetcher_utils.py(and same for tests)bigquery_fetcher.pyentrypoint which works basically the same as the datadog one but with a BQ clientDockerfileboilerplatei'm wiring some inventory data into BigQuery for objectstore COGS. reusing
usage-accountantto push BQ rollups through Kafka saves us a good deal of trouble.Ref FS-210