Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 194 additions & 0 deletions tests/test_resource_growth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
# Copyright 2025 Adobe. All rights reserved.
# This file is licensed to you under the Apache License,
# Version 2.0 (http://www.apache.org/licenses/LICENSE-2.0)
# or the MIT license (http://opensource.org/licenses/MIT),
# at your option.

# Unless required by applicable law or agreed to in writing,
# this software is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR REPRESENTATIONS OF ANY KIND, either express or
# implied. See the LICENSE-MIT and LICENSE-APACHE files for the
# specific language governing permissions and limitations under
# each license.

"""
Regression tests for resource growth under concurrent Reader/Builder load.

Root causes fixed in c2pa-rs:
- A new tokio multi-thread Runtime (= new OS thread pool) was created per
Reader FFI call. Under load this produced hundreds of leaked threads.
- A new reqwest::Client (= new TCP connection pool) was created per
Reader/Builder instance and not pooled.

These tests measure OS-level thread count and RSS before/after a burst of
concurrent Reader operations and assert both stay within reasonable bounds.
They are expected to FAIL against the un-fixed c2pa-rs library.
"""

import gc
import io
import os
import threading
import time
import unittest
import concurrent.futures

from c2pa import Reader

FIXTURES_FOLDER = os.path.join(os.path.dirname(__file__), "fixtures")
DEFAULT_TEST_FILE = os.path.join(FIXTURES_FOLDER, "C.jpg")

# Burst parameters that reproduce production-scale pressure without being
# excessively slow in CI.
BURST_ITERATIONS = 60
BURST_WORKERS = 8

# Thresholds — intentionally generous to avoid flakiness, but tight enough
# to catch the original bugs (which produced 400+ threads and hundreds of MB).
MAX_THREAD_GROWTH = BURST_WORKERS + 4 # executor threads + small buffer
MAX_MEMORY_GROWTH_MB = 80 # generous; old code would add 300+ MB


def _proc_status() -> dict[str, str]:
"""Read /proc/self/status on Linux; return empty dict on other platforms."""
try:
with open("/proc/self/status") as f:
return dict(
line.split(":", 1) for line in f if ":" in line
)
except OSError:
return {}


def _thread_count() -> int:
"""OS-level thread count (includes native tokio threads, not just Python)."""
status = _proc_status()
if "Threads" in status:
return int(status["Threads"].strip())
# macOS / Windows fallback: Python threads only (still catches Python leaks)
return threading.active_count()


def _rss_mb() -> float | None:
"""Resident set size in MB, or None if not measurable."""
status = _proc_status()
if "VmRSS" in status:
kb = int(status["VmRSS"].strip().split()[0])
return kb / 1024.0
return None


class TestConcurrentReaderResourceGrowth(unittest.TestCase):
"""Verify that concurrent Reader operations do not leak threads or memory.

The burst deliberately mirrors the production pattern: many goroutines /
asyncio tasks simultaneously calling the C2PA signing stack, each going
through Reader.json() which triggers post-validation via the Rust FFI.
"""

@classmethod
def setUpClass(cls):
with open(DEFAULT_TEST_FILE, "rb") as f:
cls.test_data = f.read()

def _read_once(self) -> None:
buf = io.BytesIO(self.test_data)
reader = Reader("image/jpeg", buf)
_ = reader.json()
reader.close()

def _burst(self, iterations: int = BURST_ITERATIONS, workers: int = BURST_WORKERS) -> None:
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool:
futures = [pool.submit(self._read_once) for _ in range(iterations)]
for f in concurrent.futures.as_completed(futures):
f.result() # re-raise any exception from the worker

def test_thread_count_stable_under_load(self):
"""OS thread count must not grow unboundedly during a Reader burst.

Before fix: each Reader FFI call created a new tokio multi-thread
Runtime (Builder::new_multi_thread()) = ~8 new OS threads each call.
60 calls × 8 threads = 480+ leaked threads on top of baseline.

After fix: one shared static Runtime; thread count stays near baseline.
"""
# Warm up: let the static runtime and connection pool initialise once.
self._burst(iterations=10, workers=4)
gc.collect()
time.sleep(0.2)

threads_before = _thread_count()

self._burst()
gc.collect()
time.sleep(0.5) # allow OS to reap any transient threads

threads_after = _thread_count()
growth = threads_after - threads_before

self.assertLessEqual(
growth,
MAX_THREAD_GROWTH,
f"Thread count grew by {growth} (before={threads_before}, after={threads_after}). "
f"Indicates a tokio Runtime is being created per FFI call instead of shared. "
f"Expected growth ≤ {MAX_THREAD_GROWTH}.",
)

def test_memory_stable_under_load(self):
"""RSS must not grow unboundedly during a Reader burst.

Before fix: each Reader held a private reqwest::Client (TCP connection
pool ~100-500 KB each). 60 concurrent Readers = 30-300 MB of pools
accumulating before GC can collect them.

After fix: one shared static reqwest::Client; RSS stays near baseline.
"""
rss_before = _rss_mb()
if rss_before is None:
self.skipTest("RSS measurement not available on this platform")

# Warm up
self._burst(iterations=10, workers=4)
gc.collect()
time.sleep(0.2)

rss_before = _rss_mb()

self._burst()
gc.collect()
time.sleep(0.5)

rss_after = _rss_mb()
growth_mb = rss_after - rss_before

self.assertLess(
growth_mb,
MAX_MEMORY_GROWTH_MB,
f"RSS grew by {growth_mb:.1f} MB (before={rss_before:.1f} MB, after={rss_after:.1f} MB). "
f"Indicates connection pools or Rust objects are not being shared/freed. "
f"Expected growth < {MAX_MEMORY_GROWTH_MB} MB.",
)

def test_stream_callbacks_released_on_close(self):
"""Stream callbacks must be None after close(), not held until GC.

Callbacks hold references to the backing BytesIO which delays
memory reclamation under concurrent load.
"""
buf = io.BytesIO(self.test_data)
# Access internal Stream via Reader internals is not possible directly;
# test via the public Stream class used by Builder.
from c2pa.c2pa import Stream

inner_buf = io.BytesIO(self.test_data)
stream = Stream(inner_buf)
self.assertTrue(stream._read_cb is not None, "callback should exist before close")
stream.close()
self.assertIsNone(stream._read_cb, "_read_cb must be None after close()")
self.assertIsNone(stream._seek_cb, "_seek_cb must be None after close()")
self.assertIsNone(stream._write_cb, "_write_cb must be None after close()")
self.assertIsNone(stream._flush_cb, "_flush_cb must be None after close()")


if __name__ == "__main__":
unittest.main()
Loading