Skip to content

Commit

Permalink
ogc api processes subscriber (#1313)
Browse files Browse the repository at this point in the history
* Exclude None from `get_processor` return type annotation

An exception is raised in case of error, so it can't ever return None

* Add support for OGC API Processes Subscriber

The subscription URLs are passed to the manager, which
then has to call them appropriately.

By default, managers have the attribute `supports_subscribing`
set to `False` in order to not break the API for these. The
subscriptions are only passed to if this is set to `True`

* Add ogc api callback class to conformance

https://docs.ogc.org/is/18-062r2/18-062r2.html#toc67

* Make successUri mandatory in subscriber

It's mandatory in the standard.

Thx @ricardogsilva !

* Use snake case in python for fields which are camel case in the api

Thx @ricardogsilva !

* Add subscriber to method docstring

* Provide default value for subscriber for managers not supporting it

Thanks @ricardogsilva !

* Factor out notification call into methods

This increases reusability by other managers

Thanks @ricardogsilva !

* Add an example call for a process subscriber

* Change test urls to valid urls

* Third party imports in own block
  • Loading branch information
totycro authored Mar 11, 2024
1 parent ab4fe09 commit 94ae782
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 8 deletions.
6 changes: 6 additions & 0 deletions docs/source/data-publishing/ogcapi-processes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ Processing examples
-H "Prefer: respond-async"
-d "{\"inputs\":{\"name\": \"hi there2\"}}"
# execute a job for the ``hello-world`` process with a success subscriber
curl -X POST http://localhost:5000/processes/hello-world/execution \
-H "Content-Type: application/json" \
-d "{\"inputs\":{\"name\": \"hi there2\"}, \
\"subscriber\": {\"successUri\": \"https://www.example.com/success\"}}"
.. _`OGC API - Processes`: https://ogcapi.ogc.org/processes
.. _`sample`: https://github.com/geopython/pygeoapi/blob/master/pygeoapi/process/hello_world.py
Expand Down
26 changes: 24 additions & 2 deletions pygeoapi/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@
TEMPLATES, to_json, get_api_rules, get_base_url,
get_crs_from_uri, get_supported_crs_list,
modify_pygeofilter, CrsTransformSpec,
transform_bbox)
transform_bbox, Subscriber)

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -174,6 +174,7 @@
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/core',
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/json',
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/oas30'
'http://www.opengis.net/spec/ogcapi-processes-1/1.0/conf/callback',
],
'edr': [
'http://www.opengis.net/spec/ogcapi-edr-1/1.0/conf/core'
Expand Down Expand Up @@ -3492,6 +3493,23 @@ def execute_process(self, request: Union[APIRequest, Any],
data_dict = data.get('inputs', {})
LOGGER.debug(data_dict)

subscriber = None
subscriber_dict = data.get('subscriber')
if subscriber_dict:
try:
success_uri = subscriber_dict['successUri']
except KeyError:
return self.get_exception(
HTTPStatus.BAD_REQUEST, headers, request.format,
'MissingParameterValue', 'Missing successUri')
else:
subscriber = Subscriber(
# NOTE: successUri is mandatory according to the standard
success_uri=success_uri,
in_progress_uri=subscriber_dict.get('inProgressUri'),
failed_uri=subscriber_dict.get('failedUri'),
)

try:
execution_mode = RequestedProcessExecutionMode(
request.headers.get('Prefer', request.headers.get('prefer'))
Expand All @@ -3501,7 +3519,11 @@ def execute_process(self, request: Union[APIRequest, Any],
try:
LOGGER.debug('Executing process')
result = self.manager.execute_process(
process_id, data_dict, execution_mode=execution_mode)
process_id,
data_dict,
execution_mode=execution_mode,
subscriber=subscriber,
)
job_id, mime_type, outputs, status, additional_headers = result
headers.update(additional_headers or {})
headers['Location'] = f'{self.base_url}/jobs/{job_id}'
Expand Down
56 changes: 50 additions & 6 deletions pygeoapi/process/manager/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
from typing import Any, Dict, Tuple, Optional, OrderedDict
import uuid

import requests

from pygeoapi.plugin import load_plugin
from pygeoapi.process.base import (
BaseProcessor,
Expand All @@ -50,6 +52,7 @@
JobStatus,
ProcessExecutionMode,
RequestedProcessExecutionMode,
Subscriber,
)

LOGGER = logging.getLogger(__name__)
Expand All @@ -70,6 +73,7 @@ def __init__(self, manager_def: dict):

self.name = manager_def['name']
self.is_async = False
self.supports_subscribing = False
self.connection = manager_def.get('connection')
self.output_dir = manager_def.get('output_dir')

Expand All @@ -85,7 +89,7 @@ def __init__(self, manager_def: dict):
for id_, process_conf in manager_def.get('processes', {}).items():
self.processes[id_] = dict(process_conf)

def get_processor(self, process_id: str) -> Optional[BaseProcessor]:
def get_processor(self, process_id: str) -> BaseProcessor:
"""Instantiate a processor.
:param process_id: Identifier of the process
Expand Down Expand Up @@ -178,7 +182,9 @@ def delete_job(self, job_id: str) -> bool:
raise JobNotFoundError()

def _execute_handler_async(self, p: BaseProcessor, job_id: str,
data_dict: dict) -> Tuple[str, None, JobStatus]:
data_dict: dict,
subscriber: Optional[Subscriber] = None,
) -> Tuple[str, None, JobStatus]:
"""
This private execution handler executes a process in a background
thread using `multiprocessing.dummy`
Expand All @@ -194,13 +200,15 @@ def _execute_handler_async(self, p: BaseProcessor, job_id: str,
"""
_process = dummy.Process(
target=self._execute_handler_sync,
args=(p, job_id, data_dict)
args=(p, job_id, data_dict, subscriber)
)
_process.start()
return 'application/json', None, JobStatus.accepted

def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
data_dict: dict) -> Tuple[str, Any, JobStatus]:
data_dict: dict,
subscriber: Optional[Subscriber] = None,
) -> Tuple[str, Any, JobStatus]:
"""
Synchronous execution handler
Expand Down Expand Up @@ -233,6 +241,7 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
}

self.add_job(job_metadata)
self._send_in_progress_notification(subscriber)

try:
if self.output_dir is not None:
Expand Down Expand Up @@ -276,6 +285,7 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,
}

self.update_job(job_id, job_update_metadata)
self._send_success_notification(subscriber, outputs=outputs)

except Exception as err:
# TODO assess correct exception type and description to help users
Expand Down Expand Up @@ -308,13 +318,16 @@ def _execute_handler_sync(self, p: BaseProcessor, job_id: str,

self.update_job(job_id, job_metadata)

self._send_failed_notification(subscriber)

return jfmt, outputs, current_status

def execute_process(
self,
process_id: str,
data_dict: dict,
execution_mode: Optional[RequestedProcessExecutionMode] = None
execution_mode: Optional[RequestedProcessExecutionMode] = None,
subscriber: Optional[Subscriber] = None,
) -> Tuple[str, Any, JobStatus, Optional[Dict[str, str]]]:
"""
Default process execution handler
Expand All @@ -323,6 +336,7 @@ def execute_process(
:param data_dict: `dict` of data parameters
:param execution_mode: `str` optionally specifying sync or async
processing.
:param subscriber: `Subscriber` optionally specifying callback urls
:raises UnknownProcessError: if the input process_id does not
correspond to a known process
Expand Down Expand Up @@ -367,9 +381,39 @@ def execute_process(
response_headers = None
# TODO: handler's response could also be allowed to include more HTTP
# headers
mime_type, outputs, status = handler(processor, job_id, data_dict)
mime_type, outputs, status = handler(
processor,
job_id,
data_dict,
# only pass subscriber if supported, otherwise this breaks existing
# managers
**({'subscriber': subscriber} if self.supports_subscribing else {})
)
return job_id, mime_type, outputs, status, response_headers

def _send_in_progress_notification(self, subscriber: Optional[Subscriber]):
if subscriber and subscriber.in_progress_uri:
response = requests.post(subscriber.in_progress_uri, json={})
LOGGER.debug(
f'In progress notification response: {response.status_code}'
)

def _send_success_notification(
self, subscriber: Optional[Subscriber], outputs: Any
):
if subscriber:
response = requests.post(subscriber.success_uri, json=outputs)
LOGGER.debug(
f'Success notification response: {response.status_code}'
)

def _send_failed_notification(self, subscriber: Optional[Subscriber]):
if subscriber and subscriber.failed_uri:
response = requests.post(subscriber.failed_uri, json={})
LOGGER.debug(
f'Failed notification response: {response.status_code}'
)

def __repr__(self):
return f'<BaseManager> {self.name}'

Expand Down
1 change: 1 addition & 0 deletions pygeoapi/process/manager/mongodb_.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class MongoDBManager(BaseManager):
def __init__(self, manager_def):
super().__init__(manager_def)
self.is_async = True
self.supports_subscribing = True

def _connect(self):
try:
Expand Down
1 change: 1 addition & 0 deletions pygeoapi/process/manager/tinydb_.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def __init__(self, manager_def: dict):

super().__init__(manager_def)
self.is_async = True
self.supports_subscribing = True

@contextmanager
def _db(self):
Expand Down
11 changes: 11 additions & 0 deletions pygeoapi/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,17 @@ class JobStatus(Enum):
dismissed = 'dismissed'


@dataclass(frozen=True)
class Subscriber:
"""Store subscriber urls as defined in:
https://schemas.opengis.net/ogcapi/processes/part1/1.0/openapi/schemas/subscriber.yaml # noqa
"""
success_uri: str
in_progress_uri: Optional[str]
failed_uri: Optional[str]


def read_data(path: Union[Path, str]) -> Union[bytes, str]:
"""
helper function to read data (file or network)
Expand Down
29 changes: 29 additions & 0 deletions tests/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import time
import gzip
from http import HTTPStatus
from unittest import mock

from pyld import jsonld
import pytest
Expand Down Expand Up @@ -1738,6 +1739,16 @@ def test_execute_process(config, api_):
'name': None
}
}
req_body_7 = {
'inputs': {
'name': 'Test'
},
'subscriber': {
'successUri': 'https://example.com/success',
'inProgressUri': 'https://example.com/inProgress',
'failedUri': 'https://example.com/failed',
}
}

cleanup_jobs = set()

Expand Down Expand Up @@ -1865,6 +1876,24 @@ def test_execute_process(config, api_):
assert isinstance(response, dict)
assert code == HTTPStatus.CREATED

cleanup_jobs.add(tuple(['hello-world',
rsp_headers['Location'].split('/')[-1]]))

req = mock_request(data=req_body_7)
with mock.patch(
'pygeoapi.process.manager.base.requests.post'
) as post_mocker:
rsp_headers, code, response = api_.execute_process(req, 'hello-world')
assert code == HTTPStatus.OK
post_mocker.assert_any_call(
req_body_7['subscriber']['inProgressUri'], json={}
)
post_mocker.assert_any_call(
req_body_7['subscriber']['successUri'],
json={'id': 'echo', 'value': 'Hello Test!'}
)
assert post_mocker.call_count == 2

cleanup_jobs.add(tuple(['hello-world',
rsp_headers['Location'].split('/')[-1]]))

Expand Down

0 comments on commit 94ae782

Please sign in to comment.