Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .sampo/changesets/dedicated-ai-endpoint-routing.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
pypi/posthog: patch
---

Add internal-only routing of `$ai_*` events to a dedicated capture endpoint in their own batch, gated behind the unstable `_dedicated_ai_endpoint` client option (off by default, not for general use).
15 changes: 15 additions & 0 deletions posthog/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
)
from posthog.poller import Poller
from posthog.request import (
AI_EVENTS_ENDPOINT,
EVENTS_ENDPOINT,
APIError,
QuotaLimitError,
RequestsConnectionError,
Expand All @@ -62,6 +64,7 @@
determine_server_host,
flags,
get,
is_ai_event,
normalize_host,
remote_config,
reset_sessions,
Expand Down Expand Up @@ -210,6 +213,7 @@ def __init__(
code_variables_mask_patterns=None,
code_variables_ignore_patterns=None,
in_app_modules: list[str] | None = None,
_dedicated_ai_endpoint=False,
):
"""
Initialize a new PostHog client instance.
Expand Down Expand Up @@ -320,6 +324,9 @@ def __init__(
self.disable_geoip = disable_geoip
self.is_server = is_server
self.historical_migration = historical_migration
# Internal, not ready for use: routes `$ai_*` events to a dedicated
# capture-ai endpoint while the backend route + ingress roll out.
self._dedicated_ai_endpoint = _dedicated_ai_endpoint
self.super_properties = super_properties
self.enable_exception_autocapture = enable_exception_autocapture
self.log_captured_exceptions = log_captured_exceptions
Expand Down Expand Up @@ -397,6 +404,7 @@ def __init__(
retries=max_retries,
timeout=timeout,
historical_migration=historical_migration,
dedicated_ai_endpoint=self._dedicated_ai_endpoint,
)
self.consumers.append(consumer)

Expand Down Expand Up @@ -1265,6 +1273,7 @@ def _reinit_after_fork(self):
retries=old.retries,
timeout=old.timeout,
historical_migration=old.historical_migration,
dedicated_ai_endpoint=old.dedicated_ai_endpoint,
)
new_consumers.append(consumer)

Expand Down Expand Up @@ -1361,13 +1370,19 @@ def _enqueue(self, msg, disable_geoip):

if self.sync_mode:
self.log.debug("enqueued with blocking %s.", msg["event"])
path = (
AI_EVENTS_ENDPOINT
if self._dedicated_ai_endpoint and is_ai_event(msg.get("event"))
else EVENTS_ENDPOINT
)
batch_post(
self.api_key,
self.host,
gzip=self.gzip,
timeout=self.timeout,
batch=[msg],
historical_migration=self.historical_migration,
path=path,
)

return sent_uuid
Expand Down
68 changes: 64 additions & 4 deletions posthog/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,14 @@
import time
from threading import Thread

from posthog.request import APIError, DatetimeSerializer, batch_post
from posthog.request import (
AI_EVENTS_ENDPOINT,
EVENTS_ENDPOINT,
APIError,
DatetimeSerializer,
batch_post,
is_ai_event,
)

try:
from queue import Empty
Expand All @@ -14,6 +21,11 @@

MAX_MSG_SIZE = 900 * 1024 # 900KiB per event

# `$ai_*` events carry LLM inputs/outputs and, when routed to the dedicated AI
# endpoint, hit a pipeline that accepts larger messages than analytics ingestion,
# so they get a higher per-event ceiling when that routing is enabled.
AI_MAX_MSG_SIZE = 8 * 1024 * 1024 # 8MiB per `$ai_*` event

# The maximum request body size is currently 20MiB, let's be conservative
# in case we want to lower it in the future.
BATCH_SIZE_LIMIT = 5 * 1024 * 1024
Expand All @@ -36,6 +48,7 @@ def __init__(
retries=10,
timeout=15,
historical_migration=False,
dedicated_ai_endpoint=False,
):
"""Create a consumer thread."""
Thread.__init__(self)
Expand All @@ -48,6 +61,7 @@ def __init__(
self.on_error = on_error
self.queue = queue
self.gzip = gzip
self.dedicated_ai_endpoint = dedicated_ai_endpoint
# It's important to set running in the constructor: if we are asked to
# pause immediately after construction, we might set running to True in
# run() *after* we set it to False in pause... and keep running
Expand Down Expand Up @@ -109,9 +123,12 @@ def next(self):
try:
item = queue.get(block=True, timeout=self.flush_interval - elapsed)
item_size = len(json.dumps(item, cls=DatetimeSerializer).encode())
if item_size > MAX_MSG_SIZE:
max_msg_size = self._max_msg_size(item)
if item_size > max_msg_size:
self.log.error(
"Item exceeds 900kib limit, dropping. (%s)", str(item)
"Item exceeds %dKiB limit, dropping. (%s)",
max_msg_size // 1024,
str(item),
)
queue.task_done()
continue
Expand All @@ -126,7 +143,44 @@ def next(self):
return items

def request(self, batch):
"""Attempt to upload the batch and retry before raising an error"""
"""Upload the batch, routing `$ai_*` events to their own endpoint when enabled.

Each destination is attempted independently so a failure on one does not
skip the other. The first failure is raised (so `upload()` logs it and
invokes `on_error`); a second is logged here so it isn't silently lost.
The batch was already dequeued in `upload()`, so unsent events are dropped
after retries, same as the single-endpoint path.
"""
if not self.dedicated_ai_endpoint:
self._send(batch, EVENTS_ENDPOINT)
return

ai_events: list[Any] = []
analytics_events: list[Any] = []
for item in batch:
target = ai_events if is_ai_event(item.get("event")) else analytics_events
target.append(item)

first_exc = None
for events, path in (
(analytics_events, EVENTS_ENDPOINT),
(ai_events, AI_EVENTS_ENDPOINT),
):
if not events:
continue
try:
self._send(events, path)
except Exception as e:
if first_exc is None:
first_exc = e
else:
self.log.error("error uploading to %s: %s", path, e)

if first_exc is not None:
raise first_exc

def _send(self, batch, path):
"""Attempt to upload a single batch to `path`, retrying before raising an error"""

def is_retryable(exc):
if isinstance(exc, APIError):
Expand All @@ -150,6 +204,7 @@ def is_retryable(exc):
timeout=self.timeout,
batch=batch,
historical_migration=self.historical_migration,
path=path,
)
return
except Exception as e:
Expand All @@ -166,3 +221,8 @@ def is_retryable(exc):

if last_exc:
raise last_exc

def _max_msg_size(self, item):
if self.dedicated_ai_endpoint and is_ai_event(item.get("event")):
return AI_MAX_MSG_SIZE
return MAX_MSG_SIZE
11 changes: 10 additions & 1 deletion posthog/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,24 @@ def remote_config(
return response.data


EVENTS_ENDPOINT = "/batch/"
AI_EVENTS_ENDPOINT = "/i/v0/ai/batch/"
Comment on lines +344 to +345

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the whole HTTP contract the very same? as in retry logic, request and response payload, etc?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I just noticed that beyond capping batch sizes on the backend, we also pre-emptively do it on the SDK. Just bumped the AI event size to the 8MB we're gonna be trialing.

Didn't touch batching size for the moment because it was a more comprehensive change, affecting non-AI events too. This means that for events larger than the batch size, we'll just send them as a 1 event batch, this seems reasonable for now.



def is_ai_event(event_name) -> bool:
return isinstance(event_name, str) and event_name.startswith("$ai_")


def batch_post(
api_key: str,
host: Optional[str] = None,
gzip: bool = False,
timeout: int = 15,
path: str = EVENTS_ENDPOINT,
**kwargs,
) -> requests.Response:
"""Post the `kwargs` to the batch API endpoint for events"""
res = post(api_key, host, "/batch/", gzip, timeout, **kwargs)
res = post(api_key, host, path, gzip, timeout, **kwargs)
return _process_response(
res, success_message="data uploaded successfully", return_json=False
)
Expand Down
127 changes: 127 additions & 0 deletions posthog/test/test_dedicated_ai_endpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import unittest
from queue import Queue
from unittest import mock

from parameterized import parameterized

from posthog.client import Client
from posthog.consumer import AI_MAX_MSG_SIZE, Consumer
from posthog.request import AI_EVENTS_ENDPOINT, EVENTS_ENDPOINT
from posthog.test.test_utils import TEST_API_KEY


def _event(name: str) -> dict:
return {"type": "capture", "event": name, "distinct_id": "distinct_id"}


class TestDedicatedAiEndpointConsumer(unittest.TestCase):
def test_routes_ai_and_analytics_to_separate_endpoints(self) -> None:
consumer = Consumer(None, TEST_API_KEY, dedicated_ai_endpoint=True)
with mock.patch("posthog.consumer.batch_post") as mock_post:
consumer.request([_event("$ai_generation"), _event("button_clicked")])

by_path = {
c.kwargs["path"]: c.kwargs["batch"] for c in mock_post.call_args_list
}
self.assertEqual(set(by_path), {EVENTS_ENDPOINT, AI_EVENTS_ENDPOINT})
self.assertEqual(
[e["event"] for e in by_path[AI_EVENTS_ENDPOINT]], ["$ai_generation"]
)
self.assertEqual(
[e["event"] for e in by_path[EVENTS_ENDPOINT]], ["button_clicked"]
)

def test_only_ai_events_single_call_to_ai_endpoint(self) -> None:
consumer = Consumer(None, TEST_API_KEY, dedicated_ai_endpoint=True)
with mock.patch("posthog.consumer.batch_post") as mock_post:
consumer.request([_event("$ai_generation"), _event("$ai_embedding")])

self.assertEqual(mock_post.call_count, 1)
self.assertEqual(mock_post.call_args.kwargs["path"], AI_EVENTS_ENDPOINT)

def test_disabled_routes_everything_to_batch(self) -> None:
consumer = Consumer(None, TEST_API_KEY, dedicated_ai_endpoint=False)
with mock.patch("posthog.consumer.batch_post") as mock_post:
consumer.request([_event("$ai_generation"), _event("button_clicked")])

self.assertEqual(mock_post.call_count, 1)
self.assertEqual(mock_post.call_args.kwargs["path"], EVENTS_ENDPOINT)


class TestDedicatedAiEndpointSyncMode(unittest.TestCase):
@parameterized.expand(
[
("enabled_ai_event", True, "$ai_generation", AI_EVENTS_ENDPOINT),
("enabled_normal_event", True, "button_clicked", EVENTS_ENDPOINT),
("disabled_ai_event", False, "$ai_generation", EVENTS_ENDPOINT),
]
)
def test_routes_event_to_expected_endpoint(
self, _name, enabled, event, expected_path
) -> None:
client = Client(TEST_API_KEY, sync_mode=True, _dedicated_ai_endpoint=enabled)
with mock.patch("posthog.client.batch_post") as mock_post:
client.capture(event, distinct_id="distinct_id")

self.assertEqual(mock_post.call_args.kwargs["path"], expected_path)


class TestDedicatedAiEndpointSizeLimit(unittest.TestCase):
def _sized_event(self, name: str, payload_bytes: int) -> dict:
event = _event(name)
event["properties"] = {"p": "x" * payload_bytes}
return event

@parameterized.expand(
[
(
"ai_event_under_ai_limit_kept",
True,
"$ai_generation",
2 * 1024 * 1024,
True,
),
(
"ai_event_over_ai_limit_dropped",
True,
"$ai_generation",
AI_MAX_MSG_SIZE,
False,
),
(
"analytics_event_over_analytics_limit_dropped",
True,
"button_clicked",
2 * 1024 * 1024,
False,
),
(
"ai_event_uses_analytics_limit_when_disabled",
False,
"$ai_generation",
2 * 1024 * 1024,
False,
),
]
)
def test_per_event_size_limit(
self, _name, enabled, event, payload_bytes, expect_kept
) -> None:
q = Queue()
consumer = Consumer(
q, TEST_API_KEY, dedicated_ai_endpoint=enabled, flush_interval=0.01
)
q.put(self._sized_event(event, payload_bytes))

batch = consumer.next()

if expect_kept:
self.assertEqual([e["event"] for e in batch], [event])
else:
self.assertEqual(batch, [])
self.assertTrue(q.empty())
self.assertEqual(q.unfinished_tasks, 0)


if __name__ == "__main__":
unittest.main()
Comment thread
carlos-marchal-ph marked this conversation as resolved.
4 changes: 3 additions & 1 deletion sdk_compliance_adapter/adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from flask import Flask, jsonify, request

from posthog import Client
from posthog.request import EVENTS_ENDPOINT
from posthog.request import batch_post as original_batch_post
from posthog.version import VERSION

Expand Down Expand Up @@ -164,6 +165,7 @@ def patched_batch_post(
host: Optional[str] = None,
gzip: bool = False,
timeout: int = 15,
path: str = EVENTS_ENDPOINT,
**kwargs,
):
"""Patched version of batch_post that tracks requests"""
Expand All @@ -172,7 +174,7 @@ def patched_batch_post(

try:
# Call original batch_post
response = original_batch_post(api_key, host, gzip, timeout, **kwargs)
response = original_batch_post(api_key, host, gzip, timeout, path, **kwargs)
# Record successful request
state.record_request(200, batch, batch_id)
return response
Expand Down
Loading