Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reorganizing the data-collection for new bufr2geojson oaproc output #811

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .zap/rules.tsv
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,4 @@
10036 IGNORE "Server Leaks Version Information via ""Server"" HTTP Response Header Field" Low
10110 IGNORE Dangerous JS Functions Low
10105 IGNORE Authentication Credentials Captured Medium
10003 IGNORE Vulnerable JS Library Medium
15 changes: 10 additions & 5 deletions tests/integration/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,19 @@ def test_metadata_discovery_publish():
def test_data_ingest():
"""Test data ingest/process publish"""

item_api_url = f'{API_URL}/collections/{ID}/items/WIGOS_0-454-2-AWSNAMITAMBO_20210707T145500-82' # noqa
item_api_url = f'{API_URL}/collections/{ID}/items/0-454-2-AWSNAMITAMBO-202107071455-15' # noqa

item_api = SESSION.get(item_api_url).json()

assert item_api['reportId'] == 'WIGOS_0-454-2-AWSNAMITAMBO_20210707T145500'
assert item_api['properties']['resultTime'] == '2021-07-07T14:55:00Z' # noqa
item_source = f'2021-07-07/wis/{ID}/{item_api["reportId"]}.bufr4' # noqa
r = SESSION.get(f'{URL}/data/{item_source}') # noqa
assert item_api['reportId'] == '0-454-2-AWSNAMITAMBO-202107071455'
assert item_api['properties']['reportTime'] == '2021-07-07T14:55:00Z' # noqa
assert item_api['properties']['wigos_station_identifier'] == '0-454-2-AWSNAMITAMBO' # noqa
assert item_api['properties']['name'] == 'global_solar_radiation_integrated_over_period_specified' # noqa
assert item_api['properties']['value'] == 0.0
assert item_api['properties']['unit'] == 'J m-2'
assert item_api['properties']['phenomenonTime'] == '2021-07-06T14:55:00Z/2021-07-07T14:55:00Z' # noqa


assert r.status_code == codes.ok


Expand Down
11 changes: 10 additions & 1 deletion wis2box-management/wis2box/api/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from wis2box import cli_helpers
from wis2box.api.backend import load_backend
from wis2box.api.config import load_config
from wis2box.data_mappings import get_plugins

from wis2box.env import (DOCKER_API_URL, API_URL)

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -258,10 +260,17 @@ def setup(ctx, verbosity):
except Exception as err:
click.echo(f'Issue loading discovery-metadata: {err}')
return False
# loop over records and add data-collection when bufr2geojson is used
for record in records['features']:
metadata_id = record['id']
plugins = get_plugins(record)
LOGGER.info(f'Plugins used by {metadata_id}: {plugins}')
# check if any plugin-names contains 2geojson
has_2geojson = any('2geojson' in plugin for plugin in plugins)
if has_2geojson is False:
continue
if metadata_id not in api_collections:
click.echo(f'Adding collection: {metadata_id}')
click.echo(f'Adding data-collection for: {metadata_id}')
from wis2box.data import gcm
meta = gcm(record)
setup_collection(meta=meta)
Expand Down
62 changes: 59 additions & 3 deletions wis2box-management/wis2box/api/backend/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,57 @@
}
}

MAPPINGS_OBS = {
'properties': {
'geometry': {
'type': 'geo_shape'
},
'properties': {
'properties': {
'name': {
'type': 'text',
'fields': {
'raw': {'type': 'keyword'}
}
},
'reportTime': {
'type': 'date',
'fields': {
'raw': {'type': 'keyword'}
}
},
'reportId': {
'type': 'text',
'fields': {
'raw': {
'type': 'keyword'
}
}
},
'phenomenonTime': {
'type': 'text'
},
'wigos_station_identifier': {
'type': 'text',
'fields': {
'raw': {'type': 'keyword'}
}
},
'units': {
'type': 'text'
},
'value': {
'type': 'float',
'coerce': True
},
'description': {
'type': 'text'
},
}
}
}
}

MAPPINGS_STATIONS = {
'properties': {
'geometry': {
Expand Down Expand Up @@ -216,8 +267,10 @@ def add_collection(self, collection_id: str) -> dict:

if collection_id == 'stations':
mappings = MAPPINGS_STATIONS
else:
elif collection_id in ['discovery-metadata', 'messages']:
mappings = MAPPINGS
else:
mappings = MAPPINGS_OBS

es_index = self.es_id(collection_id)

Expand Down Expand Up @@ -316,8 +369,11 @@ def gendata(features):
'_id': feature['id'],
'_source': feature
}

helpers.bulk(self.conn, gendata(items))
success, errors = helpers.bulk(self.conn, gendata(items), raise_on_error=False) # noqa
if errors:
for error in errors:
LOGGER.error(f"Indexing error: {error}")
raise RuntimeError(f"Upsert failed with {len(errors)} errors")

def delete_collection_item(self, collection_id: str, item_id: str) -> str:
"""
Expand Down
8 changes: 4 additions & 4 deletions wis2box-management/wis2box/data/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,13 +124,13 @@ def gcm(mcf: Union[dict, str]) -> dict:
'id': generated['id'],
'type': 'feature',
'topic_hierarchy': generated['properties']['wmo:topicHierarchy'].replace('origin/a/wis2/', '').replace('/', '.'), # noqa: E501
'title': generated['properties']['title'],
'description': generated['properties']['description'],
'title': f'Observations in json format for {generated["id"]}',
'description': f'Observations in json format for {generated["id"]}', # noqa
'keywords': generated['properties']['keywords'],
'bbox': bbox,
'links': generated['links'],
'id_field': 'id',
'time_field': 'resultTime',
'time_field': 'reportTime',
'title_field': 'id'
}

Expand All @@ -145,7 +145,7 @@ def add_collection_data(metadata: str):
"""

meta = gcm(metadata)

LOGGER.info(f'Adding data-collection for {meta["id"]}')
setup_collection(meta=meta)

return
Expand Down
2 changes: 1 addition & 1 deletion wis2box-management/wis2box/data/bufr2geojson.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ def transform(self, input_data: Union[Path, bytes],
for item in result['items']:
id = item['id']

data_date = item['properties']['resultTime']
data_date = item['properties']['reportTime']
self.output_data[id] = {
'_meta': {
'identifier': id,
Expand Down
2 changes: 1 addition & 1 deletion wis2box-management/wis2box/data/geojson.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def transform(self, input_data: Union[Path, bytes],
LOGGER.debug('Procesing GeoJSON data')
data_ = json.loads(input_data)
identifier = data_['id']
data_date = data_['properties']['resultTime']
data_date = data_['properties']['reportTime']
self.output_data[identifier] = {
'_meta': {
'identifier': identifier,
Expand Down
22 changes: 22 additions & 0 deletions wis2box-management/wis2box/data_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,28 @@
LOGGER = logging.getLogger(__name__)


def get_plugins(record: dict) -> list:
"""
Get plugins from record

:param record: `dict` of record

:returns: `list` of plugins
"""

plugins = []

try:
dm = record['wis2box']['data_mappings']
for filetype in dm['plugins'].keys():
for p in dm['plugins'][filetype]:
plugins.append(p['plugin'])
except Exception as e:
LOGGER.info(f"No plugins found for record-id={record['id']} : {e}")

return plugins


def refresh_data_mappings():
# load plugin for local broker and publish refresh request
defs_local = {
Expand Down
77 changes: 41 additions & 36 deletions wis2box-management/wis2box/metadata/discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@
from wis2box import cli_helpers
from wis2box.api import (delete_collection_item, remove_collection,
setup_collection, upsert_collection_item)
from wis2box.data_mappings import refresh_data_mappings
from wis2box.data_mappings import refresh_data_mappings, get_plugins

from wis2box.env import (API_URL, BROKER_PUBLIC, DOCKER_API_URL,
STORAGE_PUBLIC, STORAGE_SOURCE, URL)
from wis2box.metadata.base import BaseMetadata
Expand Down Expand Up @@ -76,20 +77,13 @@ def generate(self, mcf: dict) -> str:
if md['identification']['extents']['temporal'][0].get('begin', 'BEGIN_DATE') is None: # noqa
today = date.today().strftime('%Y-%m-%d')
md['identification']['extents']['temporal'][0]['begin'] = today

LOGGER.debug('Adding distribution links')
oafeat_link, mqp_link, canonical_link = self.get_distribution_links(
identifier, mqtt_topic, format_='mcf')

md['distribution'] = {
'oafeat': oafeat_link,
'mqtt': mqp_link,
'canonical': canonical_link
}


LOGGER.debug('Adding data policy')
md['identification']['wmo_data_policy'] = mqtt_topic.split('/')[5]

# md set 'distribution' to empty object, we add links later
md['distribution'] = {}

LOGGER.debug('Generating OARec discovery metadata')
record = WMOWCMP2OutputSchema().write(md, stringify=False)
record['properties']['wmo:topicHierarchy'] = mqtt_topic
Expand Down Expand Up @@ -118,26 +112,36 @@ def generate(self, mcf: dict) -> str:

return record

def get_distribution_links(self, identifier: str, topic: str,
def get_distribution_links(self,
record: dict,
format_='mcf') -> list:
"""
Generates distribution links

:param identifier: `str` of metadata identifier
:param topic: `str` of associated topic
:param record: `dict` of discovery metadata record
:param format_: `str` of format (`mcf` or `wcmp2`)

:returns: `list` of distribution links
"""

LOGGER.debug('Adding distribution links')
oafeat_link = {
'href': f"{API_URL}/collections/{identifier}?f=json",
'type': 'application/json',
'name': identifier,
'description': identifier,
'rel': 'collection'
}
LOGGER.info('Adding distribution links')

identifier = record['id']
topic = record['properties']['wmo:topicHierarchy']

links = []
plugins = get_plugins(record)
# check if any plugin-names contains 2geojson
has_2geojson = any('2geojson' in plugin for plugin in plugins)
if has_2geojson:
oafeat_link = {
'href': f"{API_URL}/collections/{identifier}?f=json",
'type': 'application/json',
'name': identifier,
'description': f'Observations in json format for {identifier}',
'rel': 'collection'
}
links.append(oafeat_link)

mqp_link = {
'href': get_broker_public_endpoint(),
Expand All @@ -147,6 +151,7 @@ def get_distribution_links(self, identifier: str, topic: str,
'rel': 'items',
'channel': topic
}
links.append(mqp_link)

canonical_link = {
'href': f"{API_URL}/collections/discovery-metadata/items/{identifier}", # noqa
Expand All @@ -155,12 +160,13 @@ def get_distribution_links(self, identifier: str, topic: str,
'description': identifier,
'rel': 'canonical'
}
links.append(canonical_link)

if format_ == 'mcf':
for link in [oafeat_link, mqp_link, canonical_link]:
for link in links:
link['url'] = link.pop('href')

return oafeat_link, mqp_link, canonical_link
return links


def publish_broker_message(record: dict, storage_path: str,
Expand Down Expand Up @@ -239,22 +245,21 @@ def publish_discovery_metadata(metadata: Union[dict, str]):
LOGGER.info('Adding WCMP2 record from dictionary')
record = metadata
dm = DiscoveryMetadata()
distribution_links = dm.get_distribution_links(
record['id'], record['properties']['wmo:topicHierarchy'],
format_='wcmp2')
# update links, do not extend or we get duplicates
record['links'] = distribution_links
for link in record['links']:
if 'description' in link:
link['title'] = link.pop('description')
if 'url' in link:
link['href'] = link.pop('url')
else:
LOGGER.debug('Transforming MCF into WCMP2 record')
LOGGER.info('Transforming MCF into WCMP2 record')
dm = DiscoveryMetadata()
record_mcf = dm.parse_record(metadata)
record = dm.generate(record_mcf)

distribution_links = dm.get_distribution_links(record, format_='wcmp2')
# update links, do not extend or we get duplicates
record['links'] = distribution_links
for link in record['links']:
if 'description' in link:
link['title'] = link.pop('description')
if 'url' in link:
link['href'] = link.pop('url')

if 'x-wmo' in record['id']:
msg = 'Change x-wmo to wmo in metadata identifier'
LOGGER.error(msg)
Expand Down
Loading