Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update migrations for application changes #588

Merged
merged 5 commits into from
Nov 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions ckan-backend-dev/src/ckanext-wri/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ Migrates an RW dataset/metadata to CKAN. It maps all supported RW fields to CKAN

**Parameters:**
- **rw_dataset_id** (string) – The RW UUID of the dataset to migrate (required—unless `gfw_dataset` is provided). Example: `c0b5f4b1-4f3b-4f1e-8f1e-3f4b1f3b4f1e`.
- **application** (string) – The RW application of the dataset to migrate (required). Example: `rw`.
- **rw_application** (string) – The RW application of the dataset to migrate (required). Example: `rw`.
- **dx_application** (string) – The destination DX application name (group name) to associate the dataset with (required). Example: `land-carbon-lab`.
- **dataset_slug** (string) – The desired slug of the dataset to migrate (optional). If you use this option, you will need to include this parameter each time you call `migrate_dataset` for this dataset. This value will override the `slug` value from the RW/GFW APIs. Example: `my-dataset`.
- **dataset_title** (string) – The desired title of the dataset to migrate (optional). If you use this option, you will need to include this parameter each time you call `migrate_dataset` for this dataset. This value will override the `name` value from the RW API or the `title` value from the GFW API. Example: `My Dataset`.
- **gfw_dataset** (string) – The GFW dataset to migrate (optional). If this dataset also has metadata in the RW API, you should also include `rw_dataset_id`. Example: `gfw_forest_data`.
Expand All @@ -260,7 +261,7 @@ A successful request will return the Prefect status of the new migration job.
##### Usage Example

```
% curl -H "Authorization: YOUR_API_TOKEN" "https://wri.dev.ckan.datopian.com/api/3/action/migrate_dataset?rw_dataset_id=c12446ce-174f-4ffb-b2f7-77ecb0116aba&application=rw&team=migration-test&topics=lucas-topic,nov-16-topic"
% curl -H "Authorization: YOUR_API_TOKEN" "https://wri.dev.ckan.datopian.com/api/3/action/migrate_dataset?rw_dataset_id=c12446ce-174f-4ffb-b2f7-77ecb0116aba&rw_application=rw&dx_application=land-carbon-lab&team=migration-test&topics=lucas-topic,nov-16-topic"
{
"help": "https://wri.dev.ckan.datopian.com/api/3/action/help_show?name=migration_status",
"success": true,
Expand All @@ -283,7 +284,8 @@ A successful request will return the Prefect status of the new migration job.
"lucas-topic",
"nov-16-topic"
],
"application": "rw"
"rw_application": "rw",
"dx_application": "land-carbon-lab"
}
},
"idempotency_key": null,
Expand Down Expand Up @@ -443,7 +445,8 @@ You'll need this ID: `"id": "7cd8a09e-1834-4ab5-8b72-bd638e9392ae"` (`result.id`
Add a custom file to the `migration/files` directory and commit it to the repo. Once deployed, you can use the `file_name` parameter to specify it. The file should be a CSV with the following columns:

- `rw_dataset_id` (required—unless `gfw_dataset` is provided)
- `application` (required)
- `rw_application` (required)
- `dx_application` (required)
- `team` (optional)
- `topics` (optional)
- `geographic_coverage` (optional)
Expand All @@ -461,14 +464,13 @@ Add a custom file to the `migration/files` directory and commit it to the repo.
Example:

```csv
rw_dataset_id,gfw_dataset,application,team,topics,geographic_coverage,authors,maintainers,layer_ids,dataset_title,dataset_slug
d491f094-ad6e-4015-b248-1d1cd83667fa,,aqueduct-water-risk,aqueduct,"freshwater,surface-water-bodies",Global,,John Smith:[email protected];Jane Smith:[email protected],,An Aqueduct Dataset,an-aqueduct-dataset
b318381e-485d-46c9-8958-c9a9d75d7e91,,aqueduct-water-risk,aqueduct,"freshwater,water-risks",Global,John Smith:[email protected];Jane Smith:[email protected],,,Another Aqueduct Dataset,another-aqueduct-dataset
faf79d2c-5e54-4591-9d70-4bd1029c18e6,,crt,agriadapt,atmosphere,Global,John Smith:[email protected],Jane Smith:[email protected],,,
,gfw_forest_flux_forest_age_category,gfw,global-forest-watch,"land,ghg-emissions,forest",,,John Smith:[email protected],,,
,gfw_forest_flux_removal_forest_type,gfw,global-forest-watch,"land,ghg-emissions,forest",,Jane Smith:[email protected],John Smith:[email protected],,Another Title Example,
47a8e6cc-ea40-44a8-b1fc-6cf4fcc7d868,nasa_viirs_fire_alerts,gfw,global-forest-watch,"land,natural-hazards,forest",Global,,,2462cceb-41de-4bd2-8251-a6f75fe4e3d5,,another-slug-example
c92b6411-f0e5-4606-bbd9-138e40e50eb8,,gfw,global-forest-watch,"land,forest",,Jane Smith:[email protected],,"0cba3c4f-2d3b-4fb1-8c93-c951dc1da84b,2351399c-ef2c-48da-9485-20698190acb0",,
rw_dataset_id,gfw_dataset,rw_application,team,topics,geographic_coverage,authors,maintainers,layer_ids,dataset_title,dataset_slug,dx_application
d491f094-ad6e-4015-b248-1d1cd83667fa,,aqueduct-water-risk,aqueduct,"freshwater,surface-water-bodies",Global,,John Smith:[email protected];Jane Smith:[email protected],,An Aqueduct Dataset,an-aqueduct-dataset,aqueduct
b318381e-485d-46c9-8958-c9a9d75d7e91,,aqueduct-water-risk,aqueduct,"freshwater,water-risks",Global,John Smith:[email protected];Jane Smith:[email protected],,,Another Aqueduct Dataset,another-aqueduct-dataset,aqueduct
,gfw_forest_flux_forest_age_category,gfw,global-forest-watch,"land,ghg-emissions,forest",,,John Smith:[email protected],,,,global-forest-watch
,gfw_forest_flux_removal_forest_type,gfw,global-forest-watch,"land,ghg-emissions,forest",,Jane Smith:[email protected],John Smith:[email protected],,Another Title Example,,global-forest-watch
47a8e6cc-ea40-44a8-b1fc-6cf4fcc7d868,nasa_viirs_fire_alerts,gfw,global-forest-watch,"land,natural-hazards,forest",Global,,,2462cceb-41de-4bd2-8251-a6f75fe4e3d5,,another-slug-example,global-forest-watch
c92b6411-f0e5-4606-bbd9-138e40e50eb8,,gfw,global-forest-watch,"land,forest",,Jane Smith:[email protected],,"0cba3c4f-2d3b-4fb1-8c93-c951dc1da84b,2351399c-ef2c-48da-9485-20698190acb0",,,global-forest-watch
```

#### POST /api/3/action/migration_status
Expand Down Expand Up @@ -508,7 +510,8 @@ The following uses the flow run ID from the `/migrate_dataset` endpoint example
"lucas-topic",
"nov-16-topic"
],
"application": "rw"
"rw_application": "rw",
"dx_application": "land-carbon-lab"
}
},
"idempotency_key": null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@
"gfw_dataset",
"gfw_only",
"gfw_version",
"application",
"rw_application",
"dx_application",
"team",
"topics",
"layer_ids",
Expand Down Expand Up @@ -278,7 +279,8 @@ def trigger_migration(context: Context, data_dict: DataDict):
@logic.side_effect_free
def migrate_dataset(context: Context, data_dict: DataDict):
dataset_id = data_dict.get("rw_dataset_id")
application = data_dict.get("application")
dx_application = data_dict.get("dx_application")
rw_application = data_dict.get("rw_application")
gfw_dataset = data_dict.get("gfw_dataset")

data_dict = _black_white_list("whitelist", data_dict)
Expand All @@ -295,9 +297,19 @@ def migrate_dataset(context: Context, data_dict: DataDict):
else:
data_dict["gfw_only"] = True

if not application:
if not rw_application:
if not gfw_dataset:
raise tk.ValidationError(_("Application is required"))
raise tk.ValidationError(_("'rw_application' is required when no 'gfw_dataset' is provided"))

if not dx_application:
raise tk.ValidationError(_("'dx_application' is required to associate the dataset with a DX application"))

try:
tk.get_action("group_show")(
{"ignore_auth": True}, {"id": dx_application, "type": "application"}
)
except logic.NotFound:
raise tk.ValidationError(_("'dx_application' not found: ") + dx_application)

team = data_dict.get("team")
topics = data_dict.get("topics")
Expand Down
92 changes: 51 additions & 41 deletions migration/tasks/migration_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ def normalize_value(value):
return value.strip()


def check_dataset_exists(dataset_id, rw_id=None, application=None):
def check_dataset_exists(dataset_id, dx_application, rw_application, rw_id):
"""
Check if dataset exists in CKAN.
"""
Expand All @@ -255,9 +255,9 @@ def check_dataset_exists(dataset_id, rw_id=None, application=None):
dataset = ckan.action.package_show(id=dataset_id)
return True, dataset
except ckanapi.errors.NotFound:
if rw_id and application:
if rw_id and dx_application and rw_application:
dataset = ckan.action.package_search(
fq=f"+rw_id:{rw_id} +application:{application}"
fq=f"+rw_id:{rw_id} +(groups:{dx_application} OR application:{rw_application})"
)

dataset_count = dataset.get("count")
Expand All @@ -273,6 +273,10 @@ def check_dataset_exists(dataset_id, rw_id=None, application=None):
log.warning("Using the first dataset found.")

return dataset_count > 0, dataset_results[0] if dataset_count > 0 else None
else:
log.error(
f"Missing required parameters: rw_id, dx_application, rw_application: {rw_id}, {dx_application}, {rw_application}"
)

return False, None

Expand All @@ -291,7 +295,8 @@ def get_datasets_from_csv(file_name):
dataset = {}
dataset_id = row.get("rw_dataset_id")
gfw_dataset = row.get("gfw_dataset")
application = row.get("application")
rw_application = row.get("rw_application")
dx_application = row.get("dx_application")
gfw_only = row.get("gfw_only") or False

if not dataset_id:
Expand All @@ -300,10 +305,10 @@ def get_datasets_from_csv(file_name):
else:
dataset_id = gfw_dataset
gfw_only = True
application = "gfw"
rw_application = "gfw"

if not application:
raise ValueError("'application' required")
if not rw_application and not dx_application:
raise ValueError("Both 'rw_application' and 'dx_application' required")

team = row.get("team")
topics = row.get("topics")
Expand All @@ -325,7 +330,8 @@ def get_datasets_from_csv(file_name):
"rw_dataset_id": dataset_id,
"gfw_dataset": gfw_dataset,
"gfw_only": gfw_only,
"application": application,
"rw_application": rw_application,
"dx_application": dx_application,
"team": team,
"topics": topics,
"authors": authors,
Expand All @@ -347,7 +353,8 @@ def send_migration_dataset(data_dict):

dataset_id = data_dict.get("rw_dataset_id")
gfw_dataset = data_dict.get("gfw_dataset")
application = data_dict.get("application")
rw_application = data_dict.get("rw_application")
dx_application = data_dict.get("dx_application")
gfw_only = data_dict.get("gfw_only")
gfw_version = data_dict.get("gfw_version")
dataset_slug = data_dict.get("dataset_slug")
Expand All @@ -359,13 +366,13 @@ def send_migration_dataset(data_dict):
else:
dataset_id = gfw_dataset
gfw_only = True
application = "gfw"
rw_application = "gfw"

if not application:
raise ValueError("'application' required")
if not rw_application and not dx_application:
raise ValueError("Both 'rw_application' and 'dx_application' required")

dataset = get_dataset_from_api(
dataset_id, application, gfw_dataset, gfw_only, gfw_version
dataset_id, rw_application, gfw_dataset, gfw_only, gfw_version
)
external_dataset_slug = (
dataset.get("dataset", {}).get("slug") if not gfw_only else dataset_id
Expand Down Expand Up @@ -471,7 +478,10 @@ def migrate_dataset(data_dict):

dataset_name = data_dict.get("name")
dataset_exists, dataset = check_dataset_exists(
dataset_name, data_dict.get("rw_id"), data_dict.get("application")
dataset_name,
data_dict.get("dx_application"),
data_dict.get("rw_application"),
data_dict.get("rw_id"),
)

log_name = f'{dataset_name if dataset_name else "Unknown dataset"} -'
Expand Down Expand Up @@ -879,7 +889,11 @@ def unstringify_agents(agents, agent_type, log, log_name):

name, email = agent.split(":")
name = name.strip() if name else None
email = email.strip() if email and email_validator(email, agent_type, log, log_name) else None
email = (
email.strip()
if email and email_validator(email, agent_type, log, log_name)
else None
)

if not name or not email:
log.error(
Expand All @@ -900,7 +914,11 @@ def unstringify_agents(agents, agent_type, log, log_name):
name = agent.get("name")
email = agent.get("email")
name = name.strip() if name else None
email = email.strip() if email and email_validator(email, agent_type, log, log_name) else None
email = (
email.strip()
if email and email_validator(email, agent_type, log, log_name)
else None
)

if not name or not email:
log.error(
Expand Down Expand Up @@ -938,7 +956,8 @@ def stringify_agents(data_dict):
def prepare_dataset(data_dict, original_data_dict, gfw_only=False):
log = get_run_logger()

application = original_data_dict.get("application")
rw_application = original_data_dict.get("rw_application")
dx_application = original_data_dict.get("dx_application")
team = original_data_dict.get("team")
topics = original_data_dict.get("topics")
whitelist = original_data_dict.get("whitelist")
Expand Down Expand Up @@ -979,31 +998,12 @@ def get_value(key, default="", data_object=None):

base_name = dataset_slug or f'{get_value("name", data_object="dataset")}'

dataset_application = get_value("application")
requested_application = application

warnings = []

if not requested_application:
warnings.append(
f"Requested application not found, using application: {application}"
)
requested_application = dataset_application

if dataset_application and type(dataset_application) == list:
application = [a.lower() for a in dataset_application]

if requested_application not in application:
warnings.append(
f"Requested application not found in dataset applications: {application}"
)
warnings.append(f"Requested application: {requested_application}")

application = requested_application
gfw_title = None

if gfw_only or application == "gfw":
application = "gfw"
if gfw_only or rw_application == "gfw":
rw_application = "gfw"
gfw_title = get_value("title", data_object="metadata")

if not gfw_title and layer_names:
Expand All @@ -1012,7 +1012,7 @@ def get_value(key, default="", data_object=None):
if len(layer_name) == 1:
gfw_title = layer_name[0]

name = munge_title_to_name(f"{base_name} {application}")
name = dataset_slug or munge_title_to_name(f"{base_name} {rw_application}")

log_name = f'{name if name else "Unknown dataset"} -'

Expand Down Expand Up @@ -1090,7 +1090,6 @@ def get_value(key, default="", data_object=None):
"approval_status": approval_status,
"is_approved": is_approved,
"draft": is_draft,
"application": application,
"visibility_type": visibility_type,
}

Expand Down Expand Up @@ -1156,9 +1155,20 @@ def get_value(key, default="", data_object=None):
if valid_topics:
required_dataset_values["groups"] = valid_topics

try:
application_dict = ckan.action.group_show(id=dx_application)
required_dataset_values["groups"] = required_dataset_values.get(
"groups", []
) + [{"name": application_dict["name"]}]
except ckanapi.errors.NotFound:
log.error(f"{log_name} Application not found: {dx_application}")
log.error(
f"{log_name} The process will continue, but the dataset will not be associated with the desired application"
)

resources = []

if application not in ["aqueduct", "aqueduct-water-risk"] and not gfw_only:
if rw_application not in ["aqueduct", "aqueduct-water-risk"] and not gfw_only:
required_dataset_values["rw_id"] = resource["dataset_id"]

for layer in layers:
Expand Down
Loading