Skip to content

Commit

Permalink
Merge pull request #131 from ecmwf-projects/COPDS-1801-custom-types
Browse files Browse the repository at this point in the history
storing contents into catalogue db
  • Loading branch information
alex75 authored Sep 18, 2024
2 parents ac6d209 + 0d0579f commit 3b6aa61
Show file tree
Hide file tree
Showing 10 changed files with 857 additions and 28 deletions.
68 changes: 68 additions & 0 deletions alembic/versions/59fa8a6b0a81_contents_added_to_structure.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""contents added to structure.
Revision ID: 59fa8a6b0a81
Revises: 18ba95eb351a
Create Date: 2024-09-02 11:34:57.475654
"""

import sqlalchemy as sa
from sqlalchemy.dialects import postgresql as dialect_postgresql

import alembic

# revision identifiers, used by Alembic.
revision = "59fa8a6b0a81"
down_revision = "18ba95eb351a"
branch_labels = None
depends_on = None


def upgrade() -> None:
alembic.op.create_table(
"contents",
sa.Column("content_id", sa.Integer, primary_key=True),
sa.Column("content_uid", sa.String, index=True, unique=True, nullable=False),
sa.Column("content_update", sa.TIMESTAMP, nullable=False),
sa.Column("data", dialect_postgresql.JSONB),
sa.Column("description", sa.String, nullable=False),
sa.Column("image", sa.String),
sa.Column("layout", sa.String),
sa.Column("link", sa.String),
sa.Column("publication_date", sa.TIMESTAMP, nullable=False),
sa.Column("site", sa.String, index=True, nullable=False),
sa.Column("title", sa.String, nullable=False),
sa.Column("type", sa.String, nullable=False),
)
alembic.op.create_table(
"content_keywords",
sa.Column("keyword_id", sa.Integer, primary_key=True),
sa.Column("category_name", sa.String),
sa.Column("category_value", sa.String),
sa.Column("keyword_name", sa.String),
)
alembic.op.create_table(
"contents_keywords_m2m",
sa.Column(
"content_id",
sa.Integer,
sa.ForeignKey("contents.content_id"),
primary_key=True,
),
sa.Column(
"keyword_id",
sa.Integer,
sa.ForeignKey("content_keywords.keyword_id"),
primary_key=True,
),
)
alembic.op.add_column(
"catalogue_updates", sa.Column("content_repo_commit", sa.String)
)


def downgrade() -> None:
alembic.op.drop_table("contents_keywords_m2m")
alembic.op.drop_table("content_keywords")
alembic.op.drop_table("contents")
alembic.op.drop_column("catalogue_updates", "content_repo_commit")
237 changes: 237 additions & 0 deletions cads_catalogue/contents.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
"""utility module to load and store contents in the catalogue database."""
# Copyright 2022, European Union.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import pathlib
from typing import Any, List

import sqlalchemy as sa
import structlog

from cads_catalogue import config, database, object_storage

THIS_PATH = os.path.abspath(os.path.dirname(__file__))
logger = structlog.get_logger(__name__)

OBJECT_STORAGE_UPLOAD_FIELDS = ["layout", "image"]


def content_sync(
session: sa.orm.session.Session,
content: dict[str, Any],
storage_settings: config.ObjectStorageSettings,
) -> database.Content:
"""
Update db record with a content's metadata dictionary.
Parameters
----------
session: opened SQLAlchemy session
content: metadata of loaded content
storage_settings: object with settings to access the object storage
Returns
-------
The created/updated db message
"""
content = content.copy()
content_uid = content["content_uid"]
keywords = content.pop("keywords", [])

subpath = os.path.join("contents", content["content_uid"])
for field in OBJECT_STORAGE_UPLOAD_FIELDS:
file_path = content.get(field)
if not file_path:
continue
content[field] = object_storage.store_file(
file_path,
storage_settings.object_storage_url,
bucket_name=storage_settings.catalogue_bucket,
subpath=subpath,
**storage_settings.storage_kws,
)

# upsert of the message
db_content = session.scalars(
sa.select(database.Content).filter_by(content_uid=content_uid).limit(1)
).first()
if not db_content:
db_content = database.Content(**content)
session.add(db_content)
logger.debug("added db content %r" % content_uid)
else:
session.execute(
sa.update(database.Content)
.filter_by(content_id=db_content.content_id)
.values(**content)
)
logger.debug("updated db content %r" % content_uid)

# build related keywords
db_content.keywords = [] # type: ignore
for keyword in set(keywords):
category_name, category_value = [r.strip() for r in keyword.split(":")]
kw_md = {
"category_name": category_name,
"category_value": category_value,
"keyword_name": keyword,
}
keyword_obj = session.scalars(
sa.select(database.ContentKeyword).filter_by(**kw_md).limit(1)
).first()
if not keyword_obj:
keyword_obj = database.ContentKeyword(**kw_md)
db_content.keywords.append(keyword_obj)
return db_content


def load_content_folder(content_folder: str | pathlib.Path) -> dict[str, Any]:
"""
Parse a content folder and returns its metadata dictionary.
Parameters
----------
content_folder: folder path containing content files
Returns
-------
dictionary of information parsed.
"""
metadata_file_path = os.path.join(content_folder, "metadata.json")
with open(metadata_file_path) as fp:
data = json.load(fp)
metadata = {
"site": ",".join(data["site"]),
"type": data["resource_type"],
"content_uid": data["id"],
"title": data["title"],
"description": data["abstract"],
"publication_date": data["publication_date"],
"content_update": data["update_date"],
"link": data.get("link"),
"keywords": data.get("keywords", []),
"data": data.get("data"),
# managed below:
# "image": None,
# "layout": None,
}
for ancillar_file_field in OBJECT_STORAGE_UPLOAD_FIELDS: # image, layout
metadata[ancillar_file_field] = None
rel_path = data.get(ancillar_file_field)
if rel_path:
ancillar_file_path = os.path.abspath(os.path.join(content_folder, rel_path))
if os.path.isfile(ancillar_file_path):
metadata[ancillar_file_field] = os.path.abspath(
os.path.join(content_folder, rel_path)
)
else:
raise ValueError(
f"{metadata_file_path} contains reference to {ancillar_file_field} file not found!"
)
return metadata


def load_contents(contents_root_folder: str | pathlib.Path) -> List[dict[str, Any]]:
"""
Load all contents from a folder and return a dictionary of metadata extracted.
Parameters
----------
contents_root_folder: root path where to look for contents (i.e. cads-contents-json root folder)
Returns
-------
List of found contents parsed.
"""
loaded_contents = []
if not os.path.isdir(contents_root_folder):
logger.warning("not found folder {contents_root_folder}!")
return []
exclude_folder_names = [".git"]
for content_folder_name in sorted(os.listdir(contents_root_folder)):
if content_folder_name in exclude_folder_names:
continue
content_folder = os.path.join(contents_root_folder, content_folder_name)
if not os.path.isdir(content_folder):
logger.warning("unknown file %r found" % content_folder)
continue
try:
content_md = load_content_folder(content_folder)
except: # noqa
logger.exception(
"failed parsing content in %s, error follows" % content_folder
)
continue
loaded_contents.append(content_md)
return loaded_contents


def update_catalogue_contents(
session: sa.orm.session.Session,
contents_package_path: str | pathlib.Path,
storage_settings: config.ObjectStorageSettings,
remove_orphans: bool = True,
):
"""
Load metadata of contents from files and sync each content in the db.
Parameters
----------
session: opened SQLAlchemy session
contents_package_path: root folder path of the contents package (i.e. cads-contents-json root folder)
storage_settings: object with settings to access the object storage
remove_orphans: if True, remove from the database other contents not involved (default True)
Returns
-------
list: list of content uids involved
"""
contents = load_contents(contents_package_path)
logger.info(
"loaded %s contents from folder %s" % (len(contents), contents_package_path)
)
involved_content_ids = []
for content in contents:
content_uid = content["content_uid"]
involved_content_ids.append(content_uid)
try:
with session.begin_nested():
content_sync(session, content, storage_settings)
logger.info("content '%s' db sync successful" % content_uid)
except Exception: # noqa
logger.exception(
"db sync for content '%s' failed, error follows" % content_uid
)

if not remove_orphans:
return involved_content_ids

# remove not loaded contents from the db
contents_to_delete = (
session.scalars(
sa.select(database.Content).filter(
database.Content.content_uid.notin_(involved_content_ids)
)
)
.unique()
.all()
)
for content_to_delete in contents_to_delete:
content_to_delete.keywords = []
session.delete(content_to_delete)
logger.info("removed old content '%s'" % content_to_delete.content_uid)

return involved_content_ids
55 changes: 55 additions & 0 deletions cads_catalogue/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,64 @@ class CatalogueUpdate(BaseModel):
licence_repo_commit = sa.Column(sa.String)
message_repo_commit = sa.Column(sa.String)
cim_repo_commit = sa.Column(sa.String)
content_repo_commit = sa.Column(sa.String)
override_md = sa.Column(dialect_postgresql.JSONB, default={})


class Content(BaseModel):
"""Content ORM model."""

__tablename__ = "contents"

content_id = sa.Column(sa.Integer, primary_key=True)
content_uid = sa.Column(sa.String, index=True, unique=True, nullable=False)
content_update = sa.Column(sa.TIMESTAMP, nullable=False)
data = sa.Column(dialect_postgresql.JSONB)
description = sa.Column(sa.String, nullable=False)
image = sa.Column(sa.String)
layout = sa.Column(sa.String)
link = sa.Column(sa.String)
publication_date = sa.Column(sa.TIMESTAMP, nullable=False)
site = sa.Column(sa.String, index=True, nullable=False)
title = sa.Column(sa.String, nullable=False)
type = sa.Column(sa.String, nullable=False)

keywords: sa.orm.Mapped[List["ContentKeyword"]] = sa.orm.relationship(
"ContentKeyword", secondary="contents_keywords_m2m", back_populates="contents"
)


class ContentKeyword(BaseModel):
"""ContentKeyword ORM model."""

__tablename__ = "content_keywords"

keyword_id = sa.Column(sa.Integer, primary_key=True)
category_name = sa.Column(sa.String)
category_value = sa.Column(sa.String)
keyword_name = sa.Column(sa.String)

contents: sa.orm.Mapped[List["Content"]] = sa.orm.relationship(
"Content",
secondary="contents_keywords_m2m",
back_populates="keywords",
uselist=True,
)


class ContentsKeywordM2M(BaseModel):
"""many-to-may ORM model for contents-keywords."""

__tablename__ = "contents_keywords_m2m"

content_id = sa.Column(
sa.Integer, sa.ForeignKey("contents.content_id"), primary_key=True
)
keyword_id = sa.Column(
sa.Integer, sa.ForeignKey("content_keywords.keyword_id"), primary_key=True
)


class ResourceLicence(BaseModel):
"""many-to-many ORM model for resources-licences."""

Expand Down
Loading

0 comments on commit 3b6aa61

Please sign in to comment.