Skip to content

Commit e45f201

Browse files
committed
Upgrade Digitraffic weathercam freshness publishing
1 parent 6bdcf66 commit e45f201

4 files changed

Lines changed: 298 additions & 10 deletions

File tree

publishers/digitraffic_weathercam/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,19 @@
22

33
Publishes image-reference observations for a curated set of Fintraffic Digitraffic road-weather camera presets associated with the Phase 1 Finland road-weather stations.
44

5+
The publisher emits heartbeat observations every cycle, including freshness and source-health fields (`imageChanged`, `lastSeenTime`, `lastChangedTime`, `sourceAgeSeconds`, `stalenessStatus`) so Explorer can distinguish source checks from image-content changes.
6+
57
Source endpoints:
68

79
- Camera metadata: `https://tie.digitraffic.fi/api/weathercam/v1/stations`
810
- Station-specific latest preset metadata: `https://tie.digitraffic.fi/api/weathercam/v1/stations/{id}/data`
911
- Latest preset image: `https://weathercam.digitraffic.fi/{presetId}.jpg`
1012
- Latest preset thumbnail: `https://weathercam.digitraffic.fi/{presetId}.jpg?thumbnail=true`
1113

14+
License and attribution reference:
15+
16+
- https://www.digitraffic.fi/en/terms-of-service/
17+
1218
The publisher attaches one companion `digitrafficWeatherCamImage` datastream to each existing Digitraffic road-weather station system. This lets deployed-system cards render the camera imagery for the same station card.
1319

1420
## Bootstrap

publishers/digitraffic_weathercam/bootstrap_digitraffic_weathercam.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
DS_OUTPUT_NAME = "digitrafficWeatherCamImage"
2424

2525
DIGITRAFFIC_HOME = "https://www.digitraffic.fi/en/road-traffic/"
26-
DIGITRAFFIC_LICENSE = "https://www.digitraffic.fi/en/terms-of-use/"
26+
DIGITRAFFIC_LICENSE = "https://www.digitraffic.fi/en/terms-of-service/"
2727
DIGITRAFFIC_WEATHERCAM_STATIONS = "https://tie.digitraffic.fi/api/weathercam/v1/stations"
2828
DIGITRAFFIC_WEATHERCAM_IMAGE_BASE = "https://weathercam.digitraffic.fi"
2929

@@ -119,10 +119,27 @@ def _datastream_schema(camera: dict) -> dict:
119119
"fields": [
120120
{"type": "Text", "name": "stationId", "label": "Road Weather Station ID", "definition": "http://sensorml.com/ont/swe/property/StationID"},
121121
{"type": "Text", "name": "camId", "label": "Weather Camera Preset ID", "definition": "http://sensorml.com/ont/swe/property/SensorID"},
122+
{"type": "Text", "name": "cameraStationId", "label": "Camera Station ID", "definition": "http://sensorml.com/ont/swe/property/StationID"},
123+
{"type": "Text", "name": "cameraStationName", "label": "Camera Station Name", "definition": "http://purl.org/dc/elements/1.1/title"},
122124
{"type": "Text", "name": "imageUrl", "label": "Full-Size Image URL", "definition": "http://www.opengis.net/def/property/OGC/0/ImageURL"},
123125
{"type": "Text", "name": "thumbUrl", "label": "Thumbnail Image URL", "definition": "http://www.opengis.net/def/property/OGC/0/ImageURL"},
124126
{"type": "Text", "name": "latestImageUrl", "label": "Latest Image URL", "definition": "http://www.opengis.net/def/property/OGC/0/ImageURL"},
125127
{"type": "Text", "name": "mediaType", "label": "Media Type", "definition": "http://purl.org/dc/elements/1.1/format"},
128+
{"type": "Text", "name": "sourceType", "label": "Source Type", "definition": "http://sensorml.com/ont/swe/property/ProcessingType"},
129+
{"type": "Boolean", "name": "live", "label": "Live Source", "definition": "http://sensorml.com/ont/swe/property/Status"},
130+
{"type": "Count", "name": "httpStatus", "label": "HTTP Status", "definition": "http://sensorml.com/ont/swe/property/StatusCode"},
131+
{"type": "Text", "name": "etag", "label": "HTTP ETag", "definition": "http://sensorml.com/ont/swe/property/Identifier"},
132+
{"type": "Text", "name": "lastModified", "label": "HTTP Last-Modified", "definition": "http://sensorml.com/ont/swe/property/Timestamp"},
133+
{"type": "Text", "name": "sourceLastModifiedTime", "label": "Parsed Source Last-Modified Time", "definition": "http://sensorml.com/ont/swe/property/Timestamp"},
134+
{"type": "Text", "name": "contentLength", "label": "Content Length", "definition": "http://sensorml.com/ont/swe/property/Size"},
135+
{"type": "Text", "name": "imageToken", "label": "Image Change Token", "definition": "http://sensorml.com/ont/swe/property/Identifier"},
136+
{"type": "Boolean", "name": "imageChanged", "label": "Image Changed", "definition": "http://sensorml.com/ont/swe/property/Status"},
137+
{"type": "Text", "name": "firstSeenTime", "label": "Current Image First Seen Time", "definition": "http://sensorml.com/ont/swe/property/Timestamp"},
138+
{"type": "Text", "name": "lastSeenTime", "label": "Source Last Checked Time", "definition": "http://sensorml.com/ont/swe/property/Timestamp"},
139+
{"type": "Text", "name": "lastChangedTime", "label": "Image Last Changed Time", "definition": "http://sensorml.com/ont/swe/property/Timestamp"},
140+
{"type": "Count", "name": "unchangedPollCount", "label": "Unchanged Poll Count", "definition": "http://sensorml.com/ont/swe/property/Count"},
141+
{"type": "Text", "name": "stalenessStatus", "label": "Staleness Status", "definition": "http://sensorml.com/ont/swe/property/Status"},
142+
{"type": "Count", "name": "sourceAgeSeconds", "label": "Seconds Since Image Changed", "definition": "http://sensorml.com/ont/swe/property/ElapsedTime"},
126143
{"type": "Text", "name": "sourceUrl", "label": "Source URL", "definition": "http://sensorml.com/ont/swe/property/ReferenceURL"},
127144
],
128145
},

publishers/digitraffic_weathercam/digitraffic_weathercam_publisher.py

Lines changed: 163 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import argparse
55
import base64
6+
import email.utils
67
import gzip
78
import json
89
import os
@@ -21,6 +22,7 @@
2122

2223
USER_AGENT = "OS4CSAPI Digitraffic Weathercam Publisher/1.0"
2324
DS_OUTPUT_NAME = "digitrafficWeatherCamImage"
25+
DEFAULT_STALE_SECONDS = 15 * 60
2426

2527

2628
def _load_cameras() -> list[dict]:
@@ -45,6 +47,31 @@ def _thumb_url(preset_id: str) -> str:
4547
return f"{_image_url(preset_id)}?thumbnail=true"
4648

4749

50+
def _parse_http_date(value: str | None) -> str | None:
51+
if not value:
52+
return None
53+
try:
54+
dt = email.utils.parsedate_to_datetime(value)
55+
except (TypeError, ValueError):
56+
return None
57+
if dt.tzinfo is None:
58+
dt = dt.replace(tzinfo=timezone.utc)
59+
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
60+
61+
62+
def _format_utc(dt: datetime) -> str:
63+
return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
64+
65+
66+
def _parse_iso_utc(value: str | None) -> datetime | None:
67+
if not value:
68+
return None
69+
try:
70+
return datetime.fromisoformat(value.replace("Z", "+00:00")).astimezone(timezone.utc)
71+
except ValueError:
72+
return None
73+
74+
4875
def _parse_time(value: str) -> tuple[float, str]:
4976
if not value:
5077
raise ValueError("missing timestamp")
@@ -64,6 +91,22 @@ def _get_json(url: str) -> dict:
6491
return json.loads(raw.decode("utf-8"))
6592

6693

94+
def _probe_image_url(url: str) -> dict:
95+
req = Request(url, headers={"Accept": "image/jpeg,*/*;q=0.8", "User-Agent": USER_AGENT, "Digitraffic-User": "OS4CSAPI/DigitrafficWeathercamPublisher 1.0"}, method="HEAD")
96+
with urlopen(req, timeout=30) as resp:
97+
headers = dict(resp.headers)
98+
etag = headers.get("ETag") or headers.get("etag") or ""
99+
content_length = headers.get("Content-Length") or headers.get("content-length") or ""
100+
last_modified = headers.get("Last-Modified") or headers.get("last-modified") or ""
101+
return {
102+
"status": resp.status,
103+
"etag": etag,
104+
"contentLength": str(content_length),
105+
"lastModified": last_modified,
106+
"sourceLastModifiedTime": _parse_http_date(last_modified) or "",
107+
}
108+
109+
67110
def fetch_latest_image(camera: dict) -> dict | None:
68111
source_url = _station_data_url(camera["cameraStationId"])
69112
try:
@@ -84,18 +127,47 @@ def fetch_latest_image(camera: dict) -> dict | None:
84127
_, phenomenon_time = _parse_time(preset.get("measuredTime") or data.get("dataUpdatedTime"))
85128
preset_id = camera["presetId"]
86129
image_url = _image_url(preset_id)
130+
image_probe = {
131+
"status": 0,
132+
"etag": "",
133+
"contentLength": "",
134+
"lastModified": "",
135+
"sourceLastModifiedTime": "",
136+
}
137+
try:
138+
image_probe = _probe_image_url(image_url)
139+
except Exception as exc:
140+
print(f" [WARN] Digitraffic image HEAD probe failed for {preset_id}: {exc}")
141+
142+
observed_token = "|".join(
143+
[
144+
preset_id,
145+
phenomenon_time,
146+
image_probe.get("etag") or "",
147+
image_probe.get("contentLength") or "",
148+
]
149+
)
87150
return {
88151
"phenomenonTime": phenomenon_time,
89152
"result": {
90153
"stationId": camera["roadWeatherStationId"],
91154
"camId": preset_id,
155+
"cameraStationId": camera["cameraStationId"],
156+
"cameraStationName": camera["cameraStationName"],
92157
"imageUrl": image_url,
93158
"thumbUrl": _thumb_url(preset_id),
94159
"latestImageUrl": image_url,
95160
"mediaType": "image/jpeg",
161+
"sourceType": "road-weather-camera-image",
162+
"live": True,
163+
"httpStatus": int(image_probe.get("status") or 0),
164+
"etag": image_probe.get("etag") or "",
165+
"lastModified": image_probe.get("lastModified") or "",
166+
"sourceLastModifiedTime": image_probe.get("sourceLastModifiedTime") or "",
167+
"contentLength": image_probe.get("contentLength") or "",
96168
"sourceUrl": source_url,
97169
},
98-
"dedupeKey": f"{camera['roadWeatherStationId']}|{preset_id}|{phenomenon_time}",
170+
"dedupeKey": observed_token,
99171
}
100172

101173

@@ -120,8 +192,9 @@ def __init__(self, camera_filter: list[str] | None = None):
120192
self._is_go_server = "csapi-go" in self._base_url
121193
self._auth = "Basic " + base64.b64encode(f"{self.osh_user}:{self.osh_pass}".encode()).decode()
122194
self._ds_ids: dict[str, str] = {}
123-
self._seen: set[str] = set()
195+
self._image_state: dict[str, dict] = {}
124196
self._request_delay = float(os.environ.get("DIGITRAFFIC_WEATHERCAM_REQUEST_DELAY", "0.5"))
197+
self._stale_seconds = int(os.environ.get("DIGITRAFFIC_WEATHERCAM_STALE_SECONDS", str(DEFAULT_STALE_SECONDS)))
125198
self.stats = {"published": 0, "errors": 0, "reconnects": 0, "skipped": 0}
126199

127200
def _raw_datastream_ids(self, sys_id: str) -> dict[str, str]:
@@ -161,6 +234,7 @@ def connect(self):
161234
print(f" [WARN] Datastream {DS_OUTPUT_NAME} not found for station {station_id}")
162235
continue
163236
self._ds_ids[camera["presetId"]] = ds_id
237+
self._seed_image_state(camera["presetId"], ds_id)
164238
connected += 1
165239
print(f" Connected: {station_id}/{camera['presetId']} -> sys={sys_id} ds={ds_id}")
166240
print(f" Ready: {connected}/{len(self.cameras)} cameras connected")
@@ -193,6 +267,88 @@ def _post_observation(self, ds_id: str, obs: dict):
193267
body_text = exc.read().decode("utf-8", errors="replace")[:500]
194268
raise RuntimeError(f"HTTP {exc.code} POST {url}: {body_text}") from exc
195269

270+
def _seed_image_state(self, preset_id: str, ds_id: str):
271+
try:
272+
data = api_get(self._base_url, f"datastreams/{ds_id}/observations?limit=1&resultTime=latest", self._auth)
273+
except Exception as exc:
274+
print(f" [WARN] Could not seed freshness state for {preset_id}: {exc}")
275+
return
276+
items = (data or {}).get("items") or []
277+
if not items:
278+
return
279+
obs = items[0]
280+
result = obs.get("result") or {}
281+
token = result.get("imageToken") or ""
282+
if not token:
283+
token = "|".join(
284+
[
285+
result.get("camId") or preset_id,
286+
obs.get("phenomenonTime") or "",
287+
result.get("etag") or "",
288+
result.get("contentLength") or "",
289+
]
290+
)
291+
observed_time = obs.get("phenomenonTime") or obs.get("resultTime") or _format_utc(datetime.now(timezone.utc))
292+
first_seen = result.get("firstSeenTime") or result.get("lastChangedTime") or observed_time
293+
last_changed = result.get("lastChangedTime") or first_seen
294+
last_seen = result.get("lastSeenTime") or observed_time
295+
self._image_state[preset_id] = {
296+
"imageToken": token,
297+
"firstSeenTime": first_seen,
298+
"lastChangedTime": last_changed,
299+
"lastSeenTime": last_seen,
300+
"unchangedPollCount": int(result.get("unchangedPollCount") or 0),
301+
}
302+
303+
def _apply_freshness_status(self, preset_id: str, latest: dict, poll_time: datetime) -> dict:
304+
result = dict(latest["result"])
305+
poll_time_iso = latest["phenomenonTime"]
306+
effective_poll_time = _parse_iso_utc(poll_time_iso) or poll_time
307+
image_token = latest["dedupeKey"]
308+
previous = self._image_state.get(preset_id)
309+
image_changed = not previous or previous.get("imageToken") != image_token
310+
311+
if image_changed:
312+
first_seen = poll_time_iso
313+
last_changed = poll_time_iso
314+
unchanged_count = 0
315+
else:
316+
first_seen = previous.get("firstSeenTime") or poll_time_iso
317+
last_changed = previous.get("lastChangedTime") or first_seen
318+
unchanged_count = int(previous.get("unchangedPollCount") or 0) + 1
319+
320+
last_changed_dt = _parse_iso_utc(last_changed) or effective_poll_time
321+
source_age_seconds = max(0, int((effective_poll_time - last_changed_dt).total_seconds()))
322+
if image_changed:
323+
staleness_status = "fresh"
324+
elif source_age_seconds >= self._stale_seconds:
325+
staleness_status = "stale"
326+
else:
327+
staleness_status = "unchanged"
328+
329+
source_url = result.pop("sourceUrl", "")
330+
result.update({
331+
"imageToken": image_token,
332+
"imageChanged": image_changed,
333+
"firstSeenTime": first_seen,
334+
"lastSeenTime": poll_time_iso,
335+
"lastChangedTime": last_changed,
336+
"unchangedPollCount": unchanged_count,
337+
"stalenessStatus": staleness_status,
338+
"sourceAgeSeconds": source_age_seconds,
339+
})
340+
result["sourceUrl"] = source_url
341+
self._image_state[preset_id] = {
342+
"imageToken": image_token,
343+
"firstSeenTime": first_seen,
344+
"lastChangedTime": last_changed,
345+
"lastSeenTime": poll_time_iso,
346+
"unchangedPollCount": unchanged_count,
347+
}
348+
latest = dict(latest)
349+
latest["result"] = result
350+
return latest
351+
196352
def publish_cycle(self, dry_run: bool = False) -> int:
197353
published = 0
198354
now = datetime.now(timezone.utc)
@@ -214,21 +370,19 @@ def publish_cycle(self, dry_run: bool = False) -> int:
214370
self.stats["skipped"] += 1
215371
print(f" [{ts_label}] {preset_id}: no image metadata")
216372
continue
217-
if latest["dedupeKey"] in self._seen:
218-
self.stats["skipped"] += 1
219-
print(f" [{ts_label}] {preset_id}: unchanged, skipping")
220-
continue
373+
latest = self._apply_freshness_status(preset_id, latest, now)
221374
obs = {"phenomenonTime": latest["phenomenonTime"], "resultTime": now.strftime("%Y-%m-%dT%H:%M:%SZ"), "result": latest["result"]}
222-
label = f"{latest['phenomenonTime']} {latest['result']['imageUrl']}"
375+
status = latest["result"].get("stalenessStatus", "unknown")
376+
changed = "changed" if latest["result"].get("imageChanged") else "unchanged"
377+
age = latest["result"].get("sourceAgeSeconds", 0)
378+
label = f"{latest['phenomenonTime']} {changed}/{status} age={age}s {latest['result']['imageUrl']}"
223379
if dry_run:
224380
print(f" [{ts_label}] {preset_id}: [DRY] {label}")
225-
self._seen.add(latest["dedupeKey"])
226381
else:
227382
try:
228383
self._post_observation(ds_id, obs)
229384
self.stats["published"] += 1
230385
published += 1
231-
self._seen.add(latest["dedupeKey"])
232386
print(f" [{ts_label}] {preset_id}: OK {label}")
233387
except Exception as exc:
234388
self.stats["errors"] += 1

0 commit comments

Comments
 (0)