Skip to content
Open
7 changes: 5 additions & 2 deletions app/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ def _decode_token(token: str):


def get_current_user_id(token: str = Depends(oauth2_scheme)):
user: dict = _decode_token(token)
return user["sub"]
return get_current_user_claims(token)["sub"]


def get_current_user_claims(token: str = Depends(oauth2_scheme)) -> Dict[str, Any]:
return _decode_token(token)


async def websocket_authenticate(websocket: WebSocket) -> str | None:
Expand Down
194 changes: 145 additions & 49 deletions app/platforms/implementations/ogc_api_process.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import re
from typing import List
from app.auth import exchange_token
from app.auth import exchange_token, get_current_user_claims
from fastapi import Response
from loguru import logger

Expand Down Expand Up @@ -34,11 +34,14 @@
@register_platform(ProcessTypeEnum.OGC_API_PROCESS)
class OGCAPIProcessPlatform(BaseProcessingPlatform):
input_type_map = {
"date-time": ParamTypeEnum.DATETIME,
"date-interval": ParamTypeEnum.DATE_INTERVAL,
"bounding-box": ParamTypeEnum.BOUNDING_BOX,
"boolean": ParamTypeEnum.BOOLEAN,
"integer": ParamTypeEnum.INTEGER,
"double": ParamTypeEnum.DOUBLE,
"number": ParamTypeEnum.DOUBLE,
"string": ParamTypeEnum.STRING,
}

status_mapping = {
Expand All @@ -53,6 +56,12 @@ class OGCAPIProcessPlatform(BaseProcessingPlatform):
r"(?P<namespace>.+)/processes/(?P<process_id>[^/]+)$"
)

geojson_schema_references = {
GEOJSON_FEATURECOLLECTION_SCHEMA,
"https://geojson.org/schema/FeatureCollection.json",
"https://geojson.org/schema/Feature.json",
}

"""
OGC API Process processing platform implementation.
This class handles the execution of processing jobs on the OGC API Process platform.
Expand All @@ -64,6 +73,76 @@ def _split_job_id(self, job_id) -> tuple[str, ...]:
return ("", job_id)
return tuple(parts)

def _get_type_from_schema(
self, schema: dict | str | None, input_id: str = ""
) -> ParamTypeEnum:
if isinstance(schema, str):
if schema in self.__class__.geojson_schema_references:
return ParamTypeEnum.POLYGON
return self.__class__.input_type_map.get(schema, ParamTypeEnum.STRING)

if not isinstance(schema, dict):
return ParamTypeEnum.STRING

schema_type = str(schema.get("type"))
schema_format = schema.get("format")
schema_subtype = schema.get("subtype")

if schema_type == "array" and schema_subtype == "date-interval":
return ParamTypeEnum.DATE_INTERVAL
if schema_type == "array" and schema.get("items", {}).get("type") == "string":
return ParamTypeEnum.ARRAY_STRING
if schema_subtype == "geojson":
return ParamTypeEnum.POLYGON
if schema_subtype == "bounding-box":
return ParamTypeEnum.BOUNDING_BOX
if schema_format == "geojson":
return ParamTypeEnum.POLYGON
if schema_format == "date-time":
return ParamTypeEnum.DATETIME
if schema_type == "object":
required = schema.get("required") or []
if "type" in required and "coordinates" in required:
type_properties = schema.get("properties", {}).get("type", {})
type_instance = type_properties
while "actual_instance" in type_instance:
type_instance = type_instance["actual_instance"]
if "Polygon" in type_instance.get("enum", []):
return ParamTypeEnum.POLYGON
if "Point" in type_instance.get("enum", []):
return ParamTypeEnum.POINT
elif "bbox" in required:
return ParamTypeEnum.BOUNDING_BOX

if isinstance(schema.get("$ref"), str):
return self._get_type_from_schema(schema.get("$ref"), input_id)

for variant_key in ("oneOf", "anyOf", "allOf"):
variants = schema.get(variant_key) or []
if not isinstance(variants, list):
continue
for variant in variants:
detected_type = self._get_type_from_schema(variant, input_id)
if detected_type != ParamTypeEnum.STRING:
return detected_type

properties = schema.get("properties") or {}
if (
schema.get("title") == "GeoJSON"
or "geometry" in properties
or "features" in properties
or input_id.lower() in {"aoi", "geometry", "geom", "geojson"}
):
return ParamTypeEnum.POLYGON

return self.__class__.input_type_map.get(schema_type, ParamTypeEnum.STRING)

def _get_options_from_schema(self, schema: dict | str | None) -> list:
if not isinstance(schema, dict):
return []
options = schema.get("enum")
return options if isinstance(options, list) else []

async def _create_api_client_instance(
self,
endpoint: str,
Expand Down Expand Up @@ -91,6 +170,8 @@ async def execute_job(
) -> str:
logger.info(f"Executing OGC API job with title={title}")

parameters = await self._transform_parameters(user_token, details, parameters)

# Exchanging token
logger.debug("Exchanging user token for OGC API Process execution...")
exchanged_token = await exchange_token(
Expand All @@ -112,7 +193,22 @@ async def execute_job(
if exchanged_token:
headers["Authorization"] = f"Bearer {exchanged_token}"

data = {"inputs": {key: value for key, value in parameters.items()}}
user_claims = get_current_user_claims(user_token)
properties = {
"title": title,
"application": details.application,
}
if user_claims.get("sub"):
properties["user_id"] = user_claims["sub"]
if user_claims.get("preferred_username"):
properties["username"] = user_claims["preferred_username"]
if user_claims.get("email"):
properties["email"] = user_claims["email"]

data = {
"inputs": parameters,
"properties": properties,
}

content = api_client.execute_simple(
process_id=details.application, execute=data, _headers=headers
Expand All @@ -125,6 +221,49 @@ async def execute_job(
return f"{details.namespace}:{job_id}"
return job_id

def _transform_bbox_parameter(self, param_name: str, value) -> list[float]:
if isinstance(value, (list, tuple)) and len(value) == 4:
return [float(coord) for coord in value]

if isinstance(value, dict):
if ["east", "north", "south", "west"] == sorted(value.keys()):
return [
float(value["west"]),
float(value["south"]),
float(value["east"]),
float(value["north"]),
]

raise ValueError(
f"Unsupported bounding box value for parameter {param_name}: {value}"
)

async def _transform_parameters(
self, user_token: str, details: ServiceDetails, parameters: dict
) -> dict:
service_params = await self.get_service_parameters(user_token, details)

modifiers = {
ParamTypeEnum.BOUNDING_BOX: self._transform_bbox_parameter,
}

transformed_parameters = parameters.copy()
for param in service_params:
if param.name not in parameters:
continue

modifier = modifiers.get(param.type)

if modifier:
transformed_parameters[param.name] = modifier(
param.name, parameters[param.name]
)

logger.debug(
f"Transformed parameters for OGC API Process: {transformed_parameters}"
)
return transformed_parameters

async def execute_synchronous_job(
self,
user_token: str,
Expand Down Expand Up @@ -303,53 +442,10 @@ async def get_service_parameters(

if process_description.inputs:
for input_id, input_details in process_description.inputs.items():
input_type = (
input_id,
input_details.model_dump()
.get("var_schema", {})
.get("actual_instance", {})
.get("type", ""),
)
if isinstance(input_type, tuple):
input_type_str = next(
(
t
for t in input_type
if t
in [
"date-interval",
"bounding-box",
"boolean",
"integer",
"double",
]
),
None,
)
else:
input_type_str = None

if input_type_str:
input_type_str = self.__class__.input_type_map.get(input_type_str)

if not input_type_str:
input_type_str = ParamTypeEnum.STRING
input_types = (
input_details.model_dump()
.get("var_schema", {})
.get("actual_instance", {})
.get("required")
or []
)
if "bbox" in input_types:
input_type_str = ParamTypeEnum.BOUNDING_BOX

input_options = (
schema = (
input_details.model_dump()
.get("var_schema", {})
.get("actual_instance", {})
.get("enum")
or []
.get("actual_instance")
)
parameters.append(
Parameter(
Expand All @@ -359,8 +455,8 @@ async def get_service_parameters(
else f"Parameter: {input_id}",
default=None,
optional=(input_details.min_occurs == 0),
type=input_type_str,
options=input_options,
type=self._get_type_from_schema(schema, input_id),
options=self._get_options_from_schema(schema),
)
)

Expand Down
1 change: 1 addition & 0 deletions app/schemas/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class ParamTypeEnum(str, Enum):
DATE_INTERVAL = "date-interval"
BOUNDING_BOX = "bounding-box"
POLYGON = "polygon"
POINT = "point"
BOOLEAN = "boolean"
INTEGER = "integer"
DOUBLE = "double"
Expand Down
Loading
Loading