Skip to content

Commit c1287cc

Browse files
committed
Publish Storebaelt webcam freshness heartbeats
1 parent 6208578 commit c1287cc

3 files changed

Lines changed: 168 additions & 12 deletions

File tree

publishers/storebaelt_webcams/bootstrap_storebaelt_webcams.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,13 @@ def _datastream_schema(camera: dict) -> dict:
190190
{"type": "Text", "name": "sourceLastModifiedTime", "label": "Parsed Source Last-Modified Time", "definition": "http://sensorml.com/ont/swe/property/Timestamp"},
191191
{"type": "Text", "name": "contentLength", "label": "Content Length", "definition": "http://sensorml.com/ont/swe/property/Size"},
192192
{"type": "Text", "name": "imageSha256", "label": "Image SHA-256", "definition": "http://sensorml.com/ont/swe/property/Identifier"},
193+
{"type": "Boolean", "name": "imageChanged", "label": "Image Changed", "definition": "http://sensorml.com/ont/swe/property/Status"},
194+
{"type": "Text", "name": "firstSeenTime", "label": "Current Image First Seen Time", "definition": "http://sensorml.com/ont/swe/property/Timestamp"},
195+
{"type": "Text", "name": "lastSeenTime", "label": "Source Last Checked Time", "definition": "http://sensorml.com/ont/swe/property/Timestamp"},
196+
{"type": "Text", "name": "lastChangedTime", "label": "Image Last Changed Time", "definition": "http://sensorml.com/ont/swe/property/Timestamp"},
197+
{"type": "Count", "name": "unchangedPollCount", "label": "Unchanged Poll Count", "definition": "http://sensorml.com/ont/swe/property/Count"},
198+
{"type": "Text", "name": "stalenessStatus", "label": "Staleness Status", "definition": "http://sensorml.com/ont/swe/property/Status"},
199+
{"type": "Count", "name": "sourceAgeSeconds", "label": "Seconds Since Image Changed", "definition": "http://sensorml.com/ont/swe/property/ElapsedTime"},
193200
{"type": "Text", "name": "sourceUrl", "label": "Source URL", "definition": "http://sensorml.com/ont/swe/property/ReferenceURL"},
194201
],
195202
},

publishers/storebaelt_webcams/storebaelt_webcams_publisher.py

Lines changed: 94 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
USER_AGENT = "OS4CSAPI Storebaelt Webcams Publisher/1.0"
2323
DS_OUTPUT_NAME = "storebaeltWebcamImage"
24+
DEFAULT_STALE_SECONDS = 15 * 60
2425

2526

2627
def _load_cameras() -> list[dict]:
@@ -51,6 +52,19 @@ def _parse_http_date(value: str | None) -> str | None:
5152
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
5253

5354

55+
def _format_utc(dt: datetime) -> str:
56+
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
57+
58+
59+
def _parse_iso_utc(value: str | None) -> datetime | None:
60+
if not value:
61+
return None
62+
try:
63+
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
64+
except ValueError:
65+
return None
66+
67+
5468
def _probe_image_url(url: str) -> dict:
5569
headers = {
5670
"Accept": "image/jpeg,*/*;q=0.8",
@@ -136,8 +150,9 @@ def __init__(self, camera_filter: list[str] | None = None):
136150
self._base_url = os.environ.get("OSH_BASE_URL", f"https://{self.osh_address}/{self.osh_root}/api")
137151
self._auth = "Basic " + base64.b64encode(f"{self.osh_user}:{self.osh_pass}".encode()).decode()
138152
self._ds_ids: dict[str, str] = {}
139-
self._seen: set[str] = set()
153+
self._image_state: dict[str, dict] = {}
140154
self._request_delay = float(os.environ.get("STOREBAELT_WEBCAMS_REQUEST_DELAY", "0.5"))
155+
self._stale_seconds = int(os.environ.get("STOREBAELT_WEBCAMS_STALE_SECONDS", str(DEFAULT_STALE_SECONDS)))
141156
self.stats = {"published": 0, "errors": 0, "reconnects": 0, "skipped": 0}
142157

143158
def connect(self):
@@ -155,6 +170,7 @@ def connect(self):
155170
print(f" [WARN] Datastream {DS_OUTPUT_NAME} not found for camera {camera['id']}")
156171
continue
157172
self._ds_ids[camera["id"]] = ds_id
173+
self._seed_image_state(camera["id"], ds_id)
158174
connected += 1
159175
print(f" Connected: {camera['id']} -> sys={sys_id} ds={ds_id}")
160176
print(f" Ready: {connected}/{len(self.cameras)} cameras connected")
@@ -197,6 +213,78 @@ def _post_observation(self, ds_id: str, obs: dict):
197213
body_text = exc.read().decode("utf-8", errors="replace")[:500]
198214
raise RuntimeError(f"HTTP {exc.code} POST {url}: {body_text}") from exc
199215

216+
def _seed_image_state(self, camera_id: str, ds_id: str):
217+
try:
218+
data = api_get(self._base_url, f"datastreams/{ds_id}/observations?limit=1&resultTime=latest", self._auth)
219+
except Exception as exc:
220+
print(f" [WARN] Could not seed freshness state for {camera_id}: {exc}")
221+
return
222+
items = (data or {}).get("items") or []
223+
if not items:
224+
return
225+
obs = items[0]
226+
result = obs.get("result") or {}
227+
image_sha256 = result.get("imageSha256") or ""
228+
if not image_sha256:
229+
return
230+
observed_time = obs.get("phenomenonTime") or obs.get("resultTime") or _format_utc(datetime.now(timezone.utc))
231+
first_seen = result.get("firstSeenTime") or result.get("lastChangedTime") or observed_time
232+
last_changed = result.get("lastChangedTime") or first_seen
233+
last_seen = result.get("lastSeenTime") or observed_time
234+
self._image_state[camera_id] = {
235+
"imageSha256": image_sha256,
236+
"firstSeenTime": first_seen,
237+
"lastChangedTime": last_changed,
238+
"lastSeenTime": last_seen,
239+
"unchangedPollCount": int(result.get("unchangedPollCount") or 0),
240+
}
241+
242+
def _apply_freshness_status(self, camera_id: str, latest: dict, poll_time: datetime) -> dict:
243+
result = dict(latest["result"])
244+
poll_time_iso = latest["phenomenonTime"]
245+
effective_poll_time = _parse_iso_utc(poll_time_iso) or poll_time
246+
image_sha256 = result.get("imageSha256") or latest["dedupeKey"].split("|", 1)[-1]
247+
previous = self._image_state.get(camera_id)
248+
image_changed = not previous or previous.get("imageSha256") != image_sha256
249+
250+
if image_changed:
251+
first_seen = poll_time_iso
252+
last_changed = poll_time_iso
253+
unchanged_count = 0
254+
else:
255+
first_seen = previous.get("firstSeenTime") or poll_time_iso
256+
last_changed = previous.get("lastChangedTime") or first_seen
257+
unchanged_count = int(previous.get("unchangedPollCount") or 0) + 1
258+
259+
last_changed_dt = _parse_iso_utc(last_changed) or effective_poll_time
260+
source_age_seconds = max(0, int((effective_poll_time - last_changed_dt).total_seconds()))
261+
if image_changed:
262+
staleness_status = "fresh"
263+
elif source_age_seconds >= self._stale_seconds:
264+
staleness_status = "stale"
265+
else:
266+
staleness_status = "unchanged"
267+
268+
result.update({
269+
"imageChanged": image_changed,
270+
"firstSeenTime": first_seen,
271+
"lastSeenTime": poll_time_iso,
272+
"lastChangedTime": last_changed,
273+
"unchangedPollCount": unchanged_count,
274+
"stalenessStatus": staleness_status,
275+
"sourceAgeSeconds": source_age_seconds,
276+
})
277+
self._image_state[camera_id] = {
278+
"imageSha256": image_sha256,
279+
"firstSeenTime": first_seen,
280+
"lastChangedTime": last_changed,
281+
"lastSeenTime": poll_time_iso,
282+
"unchangedPollCount": unchanged_count,
283+
}
284+
latest = dict(latest)
285+
latest["result"] = result
286+
return latest
287+
200288
def publish_cycle(self, dry_run: bool = False) -> int:
201289
published = 0
202290
now = datetime.now(timezone.utc)
@@ -213,25 +301,23 @@ def publish_cycle(self, dry_run: bool = False) -> int:
213301
self.stats["skipped"] += 1
214302
print(f" [{ts_label}] {camera_id}: no image metadata")
215303
continue
216-
if latest["dedupeKey"] in self._seen:
217-
self.stats["skipped"] += 1
218-
print(f" [{ts_label}] {camera_id}: unchanged, skipping")
219-
continue
304+
latest = self._apply_freshness_status(camera_id, latest, now)
220305
obs = {
221306
"phenomenonTime": latest["phenomenonTime"],
222307
"resultTime": now.strftime("%Y-%m-%dT%H:%M:%SZ"),
223308
"result": latest["result"],
224309
}
225-
label = f"{latest['phenomenonTime']} {latest['result']['imageUrl']}"
310+
status = latest["result"].get("stalenessStatus", "unknown")
311+
changed = "changed" if latest["result"].get("imageChanged") else "unchanged"
312+
age = latest["result"].get("sourceAgeSeconds", 0)
313+
label = f"{latest['phenomenonTime']} {changed}/{status} age={age}s {latest['result']['imageUrl']}"
226314
if dry_run:
227315
print(f" [{ts_label}] {camera_id}: [DRY] {label}")
228-
self._seen.add(latest["dedupeKey"])
229316
else:
230317
try:
231318
self._post_observation(ds_id, obs)
232319
self.stats["published"] += 1
233320
published += 1
234-
self._seen.add(latest["dedupeKey"])
235321
print(f" [{ts_label}] {camera_id}: OK {label}")
236322
except Exception as exc:
237323
self.stats["errors"] += 1

tests/test_storebaelt_webcams_publisher.py

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,12 +53,13 @@ def fake_probe(url):
5353
assert latest["result"]["imageSha256"] == "f" * 64
5454

5555

56-
def test_publish_cycle_dry_run_dedupes(monkeypatch):
56+
def test_publish_cycle_dry_run_publishes_unchanged_heartbeat(monkeypatch):
5757
publisher = StorebaeltWebcamsPublisher.__new__(StorebaeltWebcamsPublisher)
5858
publisher.cameras = [{"id": "sprogo", "title": "Sprogo Webcam"}]
5959
publisher._ds_ids = {}
60-
publisher._seen = set()
60+
publisher._image_state = {}
6161
publisher._request_delay = 0
62+
publisher._stale_seconds = 900
6263
publisher.stats = {"published": 0, "errors": 0, "reconnects": 0, "skipped": 0}
6364

6465
def fake_fetch(camera):
@@ -68,15 +69,77 @@ def fake_fetch(camera):
6869
"result": {
6970
"cameraId": camera["id"],
7071
"imageUrl": "https://stream.sob.m-dn.net/res/sb2-live.jpg",
72+
"imageSha256": "abc",
7173
},
7274
}
7375

7476
monkeypatch.setattr(storebaelt, "fetch_latest_image", fake_fetch)
7577

7678
assert publisher.publish_cycle(dry_run=True) == 0
7779
assert publisher.publish_cycle(dry_run=True) == 0
78-
assert "sprogo|abc" in publisher._seen
79-
assert publisher.stats["skipped"] == 1
80+
assert publisher._image_state["sprogo"]["imageSha256"] == "abc"
81+
assert publisher._image_state["sprogo"]["unchangedPollCount"] == 1
82+
assert publisher.stats["skipped"] == 0
83+
84+
85+
def test_freshness_status_tracks_changed_and_unchanged_images():
86+
publisher = StorebaeltWebcamsPublisher.__new__(StorebaeltWebcamsPublisher)
87+
publisher._image_state = {}
88+
publisher._stale_seconds = 900
89+
poll_time = datetime(2026, 6, 3, 12, 0, tzinfo=timezone.utc)
90+
latest = {
91+
"phenomenonTime": "2026-06-03T12:00:00Z",
92+
"dedupeKey": "sprogo|abc",
93+
"result": {"cameraId": "sprogo", "imageSha256": "abc"},
94+
}
95+
96+
first = publisher._apply_freshness_status("sprogo", latest, poll_time)
97+
98+
assert first["result"]["imageChanged"] is True
99+
assert first["result"]["stalenessStatus"] == "fresh"
100+
assert first["result"]["unchangedPollCount"] == 0
101+
assert first["result"]["firstSeenTime"] == "2026-06-03T12:00:00Z"
102+
assert first["result"]["lastChangedTime"] == "2026-06-03T12:00:00Z"
103+
104+
second_latest = {
105+
"phenomenonTime": "2026-06-03T12:05:00Z",
106+
"dedupeKey": "sprogo|abc",
107+
"result": {"cameraId": "sprogo", "imageSha256": "abc"},
108+
}
109+
second = publisher._apply_freshness_status("sprogo", second_latest, datetime(2026, 6, 3, 12, 5, tzinfo=timezone.utc))
110+
111+
assert second["result"]["imageChanged"] is False
112+
assert second["result"]["stalenessStatus"] == "unchanged"
113+
assert second["result"]["sourceAgeSeconds"] == 300
114+
assert second["result"]["unchangedPollCount"] == 1
115+
assert second["result"]["firstSeenTime"] == "2026-06-03T12:00:00Z"
116+
assert second["result"]["lastChangedTime"] == "2026-06-03T12:00:00Z"
117+
118+
119+
def test_freshness_status_marks_stale_after_threshold():
120+
publisher = StorebaeltWebcamsPublisher.__new__(StorebaeltWebcamsPublisher)
121+
publisher._image_state = {
122+
"sprogo": {
123+
"imageSha256": "abc",
124+
"firstSeenTime": "2026-06-03T12:00:00Z",
125+
"lastChangedTime": "2026-06-03T12:00:00Z",
126+
"lastSeenTime": "2026-06-03T12:10:00Z",
127+
"unchangedPollCount": 2,
128+
}
129+
}
130+
publisher._stale_seconds = 900
131+
latest = {
132+
"phenomenonTime": "2026-06-03T12:16:00Z",
133+
"dedupeKey": "sprogo|abc",
134+
"result": {"cameraId": "sprogo", "imageSha256": "abc"},
135+
}
136+
137+
updated = publisher._apply_freshness_status("sprogo", latest, datetime(2026, 6, 3, 12, 16, tzinfo=timezone.utc))
138+
139+
assert updated["result"]["imageChanged"] is False
140+
assert updated["result"]["stalenessStatus"] == "stale"
141+
assert updated["result"]["sourceAgeSeconds"] == 960
142+
assert updated["result"]["unchangedPollCount"] == 3
80143

81144

82145
def test_http_date_parser_normalizes_to_utc():

0 commit comments

Comments
 (0)