From 071ab533ab1edec1521506868c05dd76505419b0 Mon Sep 17 00:00:00 2001 From: Carlos Marchal Date: Tue, 9 Jun 2026 11:45:46 +0200 Subject: [PATCH 1/3] feat: dedicated ai endpoint routing --- .../dedicated-ai-endpoint-routing.md | 5 ++ posthog/client.py | 15 ++++ posthog/consumer.py | 31 +++++++- posthog/request.py | 11 ++- posthog/test/test_dedicated_ai_endpoint.py | 74 +++++++++++++++++++ 5 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 .sampo/changesets/dedicated-ai-endpoint-routing.md create mode 100644 posthog/test/test_dedicated_ai_endpoint.py diff --git a/.sampo/changesets/dedicated-ai-endpoint-routing.md b/.sampo/changesets/dedicated-ai-endpoint-routing.md new file mode 100644 index 00000000..e2d70c08 --- /dev/null +++ b/.sampo/changesets/dedicated-ai-endpoint-routing.md @@ -0,0 +1,5 @@ +--- +pypi/posthog: patch +--- + +Add internal-only routing of `$ai_*` events to a dedicated capture endpoint in their own batch, gated behind the unstable `_internal_dedicated_ai_endpoint` client option (off by default, not for general use). diff --git a/posthog/client.py b/posthog/client.py index bf967449..bbc11e69 100644 --- a/posthog/client.py +++ b/posthog/client.py @@ -54,6 +54,8 @@ ) from posthog.poller import Poller from posthog.request import ( + AI_EVENTS_ENDPOINT, + EVENTS_ENDPOINT, APIError, QuotaLimitError, RequestsConnectionError, @@ -62,6 +64,7 @@ determine_server_host, flags, get, + is_ai_event, normalize_host, remote_config, reset_sessions, @@ -210,6 +213,7 @@ def __init__( code_variables_mask_patterns=None, code_variables_ignore_patterns=None, in_app_modules: list[str] | None = None, + _internal_dedicated_ai_endpoint=False, ): """ Initialize a new PostHog client instance. @@ -320,6 +324,9 @@ def __init__( self.disable_geoip = disable_geoip self.is_server = is_server self.historical_migration = historical_migration + # Internal, not ready for use: routes `$ai_*` events to a dedicated + # capture-ai endpoint while the backend route + ingress roll out. + self._internal_dedicated_ai_endpoint = _internal_dedicated_ai_endpoint self.super_properties = super_properties self.enable_exception_autocapture = enable_exception_autocapture self.log_captured_exceptions = log_captured_exceptions @@ -397,6 +404,7 @@ def __init__( retries=max_retries, timeout=timeout, historical_migration=historical_migration, + dedicated_ai_endpoint=self._internal_dedicated_ai_endpoint, ) self.consumers.append(consumer) @@ -1265,6 +1273,7 @@ def _reinit_after_fork(self): retries=old.retries, timeout=old.timeout, historical_migration=old.historical_migration, + dedicated_ai_endpoint=old.dedicated_ai_endpoint, ) new_consumers.append(consumer) @@ -1361,6 +1370,11 @@ def _enqueue(self, msg, disable_geoip): if self.sync_mode: self.log.debug("enqueued with blocking %s.", msg["event"]) + path = ( + AI_EVENTS_ENDPOINT + if self._internal_dedicated_ai_endpoint and is_ai_event(msg.get("event")) + else EVENTS_ENDPOINT + ) batch_post( self.api_key, self.host, @@ -1368,6 +1382,7 @@ def _enqueue(self, msg, disable_geoip): timeout=self.timeout, batch=[msg], historical_migration=self.historical_migration, + path=path, ) return sent_uuid diff --git a/posthog/consumer.py b/posthog/consumer.py index 6a9becbe..1789f4f4 100644 --- a/posthog/consumer.py +++ b/posthog/consumer.py @@ -4,7 +4,14 @@ import time from threading import Thread -from posthog.request import APIError, DatetimeSerializer, batch_post +from posthog.request import ( + AI_EVENTS_ENDPOINT, + EVENTS_ENDPOINT, + APIError, + DatetimeSerializer, + batch_post, + is_ai_event, +) try: from queue import Empty @@ -36,6 +43,7 @@ def __init__( retries=10, timeout=15, historical_migration=False, + dedicated_ai_endpoint=False, ): """Create a consumer thread.""" Thread.__init__(self) @@ -48,6 +56,7 @@ def __init__( self.on_error = on_error self.queue = queue self.gzip = gzip + self.dedicated_ai_endpoint = dedicated_ai_endpoint # It's important to set running in the constructor: if we are asked to # pause immediately after construction, we might set running to True in # run() *after* we set it to False in pause... and keep running @@ -126,7 +135,24 @@ def next(self): return items def request(self, batch): - """Attempt to upload the batch and retry before raising an error""" + """Upload the batch, routing `$ai_*` events to their own endpoint when enabled. + + Each destination is sent (and retried) independently so a failure on one + does not re-send the other — the batch was already dequeued in `upload()`. + """ + if not self.dedicated_ai_endpoint: + self._send(batch, EVENTS_ENDPOINT) + return + + ai_events = [item for item in batch if is_ai_event(item.get("event"))] + analytics_events = [item for item in batch if not is_ai_event(item.get("event"))] + if analytics_events: + self._send(analytics_events, EVENTS_ENDPOINT) + if ai_events: + self._send(ai_events, AI_EVENTS_ENDPOINT) + + def _send(self, batch, path): + """Attempt to upload a single batch to `path`, retrying before raising an error""" def is_retryable(exc): if isinstance(exc, APIError): @@ -150,6 +176,7 @@ def is_retryable(exc): timeout=self.timeout, batch=batch, historical_migration=self.historical_migration, + path=path, ) return except Exception as e: diff --git a/posthog/request.py b/posthog/request.py index 76f0a9fe..a3258d12 100644 --- a/posthog/request.py +++ b/posthog/request.py @@ -341,15 +341,24 @@ def remote_config( return response.data +EVENTS_ENDPOINT = "/batch/" +AI_EVENTS_ENDPOINT = "/i/v0/ai/batch/" + + +def is_ai_event(event_name) -> bool: + return isinstance(event_name, str) and event_name.startswith("$ai_") + + def batch_post( api_key: str, host: Optional[str] = None, gzip: bool = False, timeout: int = 15, + path: str = EVENTS_ENDPOINT, **kwargs, ) -> requests.Response: """Post the `kwargs` to the batch API endpoint for events""" - res = post(api_key, host, "/batch/", gzip, timeout, **kwargs) + res = post(api_key, host, path, gzip, timeout, **kwargs) return _process_response( res, success_message="data uploaded successfully", return_json=False ) diff --git a/posthog/test/test_dedicated_ai_endpoint.py b/posthog/test/test_dedicated_ai_endpoint.py new file mode 100644 index 00000000..c4fa129a --- /dev/null +++ b/posthog/test/test_dedicated_ai_endpoint.py @@ -0,0 +1,74 @@ +import unittest +from unittest import mock + +from posthog.client import Client +from posthog.consumer import Consumer +from posthog.request import AI_EVENTS_ENDPOINT, EVENTS_ENDPOINT +from posthog.test.test_utils import TEST_API_KEY + + +def _event(name: str) -> dict: + return {"type": "capture", "event": name, "distinct_id": "distinct_id"} + + +class TestDedicatedAiEndpointConsumer(unittest.TestCase): + def test_routes_ai_and_analytics_to_separate_endpoints(self) -> None: + consumer = Consumer(None, TEST_API_KEY, dedicated_ai_endpoint=True) + with mock.patch("posthog.consumer.batch_post") as mock_post: + consumer.request([_event("$ai_generation"), _event("button_clicked")]) + + by_path = {c.kwargs["path"]: c.kwargs["batch"] for c in mock_post.call_args_list} + self.assertEqual(set(by_path), {EVENTS_ENDPOINT, AI_EVENTS_ENDPOINT}) + self.assertEqual( + [e["event"] for e in by_path[AI_EVENTS_ENDPOINT]], ["$ai_generation"] + ) + self.assertEqual( + [e["event"] for e in by_path[EVENTS_ENDPOINT]], ["button_clicked"] + ) + + def test_only_ai_events_single_call_to_ai_endpoint(self) -> None: + consumer = Consumer(None, TEST_API_KEY, dedicated_ai_endpoint=True) + with mock.patch("posthog.consumer.batch_post") as mock_post: + consumer.request([_event("$ai_generation"), _event("$ai_embedding")]) + + self.assertEqual(mock_post.call_count, 1) + self.assertEqual(mock_post.call_args.kwargs["path"], AI_EVENTS_ENDPOINT) + + def test_disabled_routes_everything_to_batch(self) -> None: + consumer = Consumer(None, TEST_API_KEY, dedicated_ai_endpoint=False) + with mock.patch("posthog.consumer.batch_post") as mock_post: + consumer.request([_event("$ai_generation"), _event("button_clicked")]) + + self.assertEqual(mock_post.call_count, 1) + self.assertEqual(mock_post.call_args.kwargs["path"], EVENTS_ENDPOINT) + + +class TestDedicatedAiEndpointSyncMode(unittest.TestCase): + def test_routes_ai_event_to_ai_endpoint(self) -> None: + client = Client( + TEST_API_KEY, sync_mode=True, _internal_dedicated_ai_endpoint=True + ) + with mock.patch("posthog.client.batch_post") as mock_post: + client.capture("$ai_generation", distinct_id="distinct_id") + + self.assertEqual(mock_post.call_args.kwargs["path"], AI_EVENTS_ENDPOINT) + + def test_routes_normal_event_to_batch(self) -> None: + client = Client( + TEST_API_KEY, sync_mode=True, _internal_dedicated_ai_endpoint=True + ) + with mock.patch("posthog.client.batch_post") as mock_post: + client.capture("button_clicked", distinct_id="distinct_id") + + self.assertEqual(mock_post.call_args.kwargs["path"], EVENTS_ENDPOINT) + + def test_disabled_routes_ai_event_to_batch(self) -> None: + client = Client(TEST_API_KEY, sync_mode=True) + with mock.patch("posthog.client.batch_post") as mock_post: + client.capture("$ai_generation", distinct_id="distinct_id") + + self.assertEqual(mock_post.call_args.kwargs["path"], EVENTS_ENDPOINT) + + +if __name__ == "__main__": + unittest.main() From db530a889e5cabacf7becfa26271dd2e941c52b0 Mon Sep 17 00:00:00 2001 From: Carlos Marchal Date: Wed, 10 Jun 2026 12:02:20 +0200 Subject: [PATCH 2/3] fix: attempt AI and analytics sends independently, rename flag to _dedicated_ai_endpoint Generated-By: PostHog Code Task-Id: 142d3917-41e8-4bb5-ad6c-5778e13041c2 --- .../dedicated-ai-endpoint-routing.md | 2 +- posthog/client.py | 8 ++-- posthog/consumer.py | 36 ++++++++++++---- posthog/test/test_dedicated_ai_endpoint.py | 41 ++++++++----------- sdk_compliance_adapter/adapter.py | 4 +- 5 files changed, 54 insertions(+), 37 deletions(-) diff --git a/.sampo/changesets/dedicated-ai-endpoint-routing.md b/.sampo/changesets/dedicated-ai-endpoint-routing.md index e2d70c08..b6d028ad 100644 --- a/.sampo/changesets/dedicated-ai-endpoint-routing.md +++ b/.sampo/changesets/dedicated-ai-endpoint-routing.md @@ -2,4 +2,4 @@ pypi/posthog: patch --- -Add internal-only routing of `$ai_*` events to a dedicated capture endpoint in their own batch, gated behind the unstable `_internal_dedicated_ai_endpoint` client option (off by default, not for general use). +Add internal-only routing of `$ai_*` events to a dedicated capture endpoint in their own batch, gated behind the unstable `_dedicated_ai_endpoint` client option (off by default, not for general use). diff --git a/posthog/client.py b/posthog/client.py index bbc11e69..6caa0c3c 100644 --- a/posthog/client.py +++ b/posthog/client.py @@ -213,7 +213,7 @@ def __init__( code_variables_mask_patterns=None, code_variables_ignore_patterns=None, in_app_modules: list[str] | None = None, - _internal_dedicated_ai_endpoint=False, + _dedicated_ai_endpoint=False, ): """ Initialize a new PostHog client instance. @@ -326,7 +326,7 @@ def __init__( self.historical_migration = historical_migration # Internal, not ready for use: routes `$ai_*` events to a dedicated # capture-ai endpoint while the backend route + ingress roll out. - self._internal_dedicated_ai_endpoint = _internal_dedicated_ai_endpoint + self._dedicated_ai_endpoint = _dedicated_ai_endpoint self.super_properties = super_properties self.enable_exception_autocapture = enable_exception_autocapture self.log_captured_exceptions = log_captured_exceptions @@ -404,7 +404,7 @@ def __init__( retries=max_retries, timeout=timeout, historical_migration=historical_migration, - dedicated_ai_endpoint=self._internal_dedicated_ai_endpoint, + dedicated_ai_endpoint=self._dedicated_ai_endpoint, ) self.consumers.append(consumer) @@ -1372,7 +1372,7 @@ def _enqueue(self, msg, disable_geoip): self.log.debug("enqueued with blocking %s.", msg["event"]) path = ( AI_EVENTS_ENDPOINT - if self._internal_dedicated_ai_endpoint and is_ai_event(msg.get("event")) + if self._dedicated_ai_endpoint and is_ai_event(msg.get("event")) else EVENTS_ENDPOINT ) batch_post( diff --git a/posthog/consumer.py b/posthog/consumer.py index 1789f4f4..ea759d35 100644 --- a/posthog/consumer.py +++ b/posthog/consumer.py @@ -137,19 +137,39 @@ def next(self): def request(self, batch): """Upload the batch, routing `$ai_*` events to their own endpoint when enabled. - Each destination is sent (and retried) independently so a failure on one - does not re-send the other — the batch was already dequeued in `upload()`. + Each destination is attempted independently so a failure on one does not + skip the other. The first failure is raised (so `upload()` logs it and + invokes `on_error`); a second is logged here so it isn't silently lost. + The batch was already dequeued in `upload()`, so unsent events are dropped + after retries, same as the single-endpoint path. """ if not self.dedicated_ai_endpoint: self._send(batch, EVENTS_ENDPOINT) return - ai_events = [item for item in batch if is_ai_event(item.get("event"))] - analytics_events = [item for item in batch if not is_ai_event(item.get("event"))] - if analytics_events: - self._send(analytics_events, EVENTS_ENDPOINT) - if ai_events: - self._send(ai_events, AI_EVENTS_ENDPOINT) + ai_events: list[Any] = [] + analytics_events: list[Any] = [] + for item in batch: + target = ai_events if is_ai_event(item.get("event")) else analytics_events + target.append(item) + + first_exc = None + for events, path in ( + (analytics_events, EVENTS_ENDPOINT), + (ai_events, AI_EVENTS_ENDPOINT), + ): + if not events: + continue + try: + self._send(events, path) + except Exception as e: + if first_exc is None: + first_exc = e + else: + self.log.error("error uploading to %s: %s", path, e) + + if first_exc is not None: + raise first_exc def _send(self, batch, path): """Attempt to upload a single batch to `path`, retrying before raising an error""" diff --git a/posthog/test/test_dedicated_ai_endpoint.py b/posthog/test/test_dedicated_ai_endpoint.py index c4fa129a..b93a7a77 100644 --- a/posthog/test/test_dedicated_ai_endpoint.py +++ b/posthog/test/test_dedicated_ai_endpoint.py @@ -1,6 +1,8 @@ import unittest from unittest import mock +from parameterized import parameterized + from posthog.client import Client from posthog.consumer import Consumer from posthog.request import AI_EVENTS_ENDPOINT, EVENTS_ENDPOINT @@ -17,7 +19,9 @@ def test_routes_ai_and_analytics_to_separate_endpoints(self) -> None: with mock.patch("posthog.consumer.batch_post") as mock_post: consumer.request([_event("$ai_generation"), _event("button_clicked")]) - by_path = {c.kwargs["path"]: c.kwargs["batch"] for c in mock_post.call_args_list} + by_path = { + c.kwargs["path"]: c.kwargs["batch"] for c in mock_post.call_args_list + } self.assertEqual(set(by_path), {EVENTS_ENDPOINT, AI_EVENTS_ENDPOINT}) self.assertEqual( [e["event"] for e in by_path[AI_EVENTS_ENDPOINT]], ["$ai_generation"] @@ -44,30 +48,21 @@ def test_disabled_routes_everything_to_batch(self) -> None: class TestDedicatedAiEndpointSyncMode(unittest.TestCase): - def test_routes_ai_event_to_ai_endpoint(self) -> None: - client = Client( - TEST_API_KEY, sync_mode=True, _internal_dedicated_ai_endpoint=True - ) - with mock.patch("posthog.client.batch_post") as mock_post: - client.capture("$ai_generation", distinct_id="distinct_id") - - self.assertEqual(mock_post.call_args.kwargs["path"], AI_EVENTS_ENDPOINT) - - def test_routes_normal_event_to_batch(self) -> None: - client = Client( - TEST_API_KEY, sync_mode=True, _internal_dedicated_ai_endpoint=True - ) - with mock.patch("posthog.client.batch_post") as mock_post: - client.capture("button_clicked", distinct_id="distinct_id") - - self.assertEqual(mock_post.call_args.kwargs["path"], EVENTS_ENDPOINT) - - def test_disabled_routes_ai_event_to_batch(self) -> None: - client = Client(TEST_API_KEY, sync_mode=True) + @parameterized.expand( + [ + ("enabled_ai_event", True, "$ai_generation", AI_EVENTS_ENDPOINT), + ("enabled_normal_event", True, "button_clicked", EVENTS_ENDPOINT), + ("disabled_ai_event", False, "$ai_generation", EVENTS_ENDPOINT), + ] + ) + def test_routes_event_to_expected_endpoint( + self, _name, enabled, event, expected_path + ) -> None: + client = Client(TEST_API_KEY, sync_mode=True, _dedicated_ai_endpoint=enabled) with mock.patch("posthog.client.batch_post") as mock_post: - client.capture("$ai_generation", distinct_id="distinct_id") + client.capture(event, distinct_id="distinct_id") - self.assertEqual(mock_post.call_args.kwargs["path"], EVENTS_ENDPOINT) + self.assertEqual(mock_post.call_args.kwargs["path"], expected_path) if __name__ == "__main__": diff --git a/sdk_compliance_adapter/adapter.py b/sdk_compliance_adapter/adapter.py index cb3e168d..2fd8eed9 100644 --- a/sdk_compliance_adapter/adapter.py +++ b/sdk_compliance_adapter/adapter.py @@ -14,6 +14,7 @@ from flask import Flask, jsonify, request from posthog import Client +from posthog.request import EVENTS_ENDPOINT from posthog.request import batch_post as original_batch_post from posthog.version import VERSION @@ -164,6 +165,7 @@ def patched_batch_post( host: Optional[str] = None, gzip: bool = False, timeout: int = 15, + path: str = EVENTS_ENDPOINT, **kwargs, ): """Patched version of batch_post that tracks requests""" @@ -172,7 +174,7 @@ def patched_batch_post( try: # Call original batch_post - response = original_batch_post(api_key, host, gzip, timeout, **kwargs) + response = original_batch_post(api_key, host, gzip, timeout, path, **kwargs) # Record successful request state.record_request(200, batch, batch_id) return response From 5d20ea0c8903f369775364d9936f4ac92f0b701b Mon Sep 17 00:00:00 2001 From: Carlos Marchal Date: Wed, 10 Jun 2026 13:08:28 +0200 Subject: [PATCH 3/3] fix: increase size limit for AI events --- posthog/consumer.py | 17 +++++- posthog/test/test_dedicated_ai_endpoint.py | 60 +++++++++++++++++++++- 2 files changed, 74 insertions(+), 3 deletions(-) diff --git a/posthog/consumer.py b/posthog/consumer.py index ea759d35..80335228 100644 --- a/posthog/consumer.py +++ b/posthog/consumer.py @@ -21,6 +21,11 @@ MAX_MSG_SIZE = 900 * 1024 # 900KiB per event +# `$ai_*` events carry LLM inputs/outputs and, when routed to the dedicated AI +# endpoint, hit a pipeline that accepts larger messages than analytics ingestion, +# so they get a higher per-event ceiling when that routing is enabled. +AI_MAX_MSG_SIZE = 8 * 1024 * 1024 # 8MiB per `$ai_*` event + # The maximum request body size is currently 20MiB, let's be conservative # in case we want to lower it in the future. BATCH_SIZE_LIMIT = 5 * 1024 * 1024 @@ -118,9 +123,12 @@ def next(self): try: item = queue.get(block=True, timeout=self.flush_interval - elapsed) item_size = len(json.dumps(item, cls=DatetimeSerializer).encode()) - if item_size > MAX_MSG_SIZE: + max_msg_size = self._max_msg_size(item) + if item_size > max_msg_size: self.log.error( - "Item exceeds 900kib limit, dropping. (%s)", str(item) + "Item exceeds %dKiB limit, dropping. (%s)", + max_msg_size // 1024, + str(item), ) queue.task_done() continue @@ -213,3 +221,8 @@ def is_retryable(exc): if last_exc: raise last_exc + + def _max_msg_size(self, item): + if self.dedicated_ai_endpoint and is_ai_event(item.get("event")): + return AI_MAX_MSG_SIZE + return MAX_MSG_SIZE diff --git a/posthog/test/test_dedicated_ai_endpoint.py b/posthog/test/test_dedicated_ai_endpoint.py index b93a7a77..1fb4322a 100644 --- a/posthog/test/test_dedicated_ai_endpoint.py +++ b/posthog/test/test_dedicated_ai_endpoint.py @@ -1,10 +1,11 @@ import unittest +from queue import Queue from unittest import mock from parameterized import parameterized from posthog.client import Client -from posthog.consumer import Consumer +from posthog.consumer import AI_MAX_MSG_SIZE, Consumer from posthog.request import AI_EVENTS_ENDPOINT, EVENTS_ENDPOINT from posthog.test.test_utils import TEST_API_KEY @@ -65,5 +66,62 @@ def test_routes_event_to_expected_endpoint( self.assertEqual(mock_post.call_args.kwargs["path"], expected_path) +class TestDedicatedAiEndpointSizeLimit(unittest.TestCase): + def _sized_event(self, name: str, payload_bytes: int) -> dict: + event = _event(name) + event["properties"] = {"p": "x" * payload_bytes} + return event + + @parameterized.expand( + [ + ( + "ai_event_under_ai_limit_kept", + True, + "$ai_generation", + 2 * 1024 * 1024, + True, + ), + ( + "ai_event_over_ai_limit_dropped", + True, + "$ai_generation", + AI_MAX_MSG_SIZE, + False, + ), + ( + "analytics_event_over_analytics_limit_dropped", + True, + "button_clicked", + 2 * 1024 * 1024, + False, + ), + ( + "ai_event_uses_analytics_limit_when_disabled", + False, + "$ai_generation", + 2 * 1024 * 1024, + False, + ), + ] + ) + def test_per_event_size_limit( + self, _name, enabled, event, payload_bytes, expect_kept + ) -> None: + q = Queue() + consumer = Consumer( + q, TEST_API_KEY, dedicated_ai_endpoint=enabled, flush_interval=0.01 + ) + q.put(self._sized_event(event, payload_bytes)) + + batch = consumer.next() + + if expect_kept: + self.assertEqual([e["event"] for e in batch], [event]) + else: + self.assertEqual(batch, []) + self.assertTrue(q.empty()) + self.assertEqual(q.unfinished_tasks, 0) + + if __name__ == "__main__": unittest.main()