Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/post processing #282

Open
wants to merge 14 commits into
base: develop
Choose a base branch
from
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,5 @@ Pipfile.lock
#backend
backend/data
backend/.env

.DS_Store
85 changes: 79 additions & 6 deletions API/api_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

# Reader imports
from src.app import CustomExport, PolygonStats, RawData, S3FileTransfer
from src.post_processing.processor import PostProcessor
from src.config import ALLOW_BIND_ZIP_FILTER
from src.config import CELERY_BROKER_URL as celery_broker_uri
from src.config import CELERY_RESULT_BACKEND as celery_backend
Expand All @@ -39,6 +40,7 @@
RawDataCurrentParams,
RawDataOutputType,
)
from src.post_processing.processor import PostProcessor

if ENABLE_SOZIP:
# Third party imports
Expand Down Expand Up @@ -75,7 +77,12 @@ def create_readme_content(default_readme, polygon_stats):


def zip_binding(
working_dir, exportname_parts, geom_dump, polygon_stats, default_readme
working_dir,
exportname_parts,
geom_dump,
polygon_stats,
geojson_stats,
default_readme,
):
logging.debug("Zip Binding Started!")
upload_file_path = os.path.join(
Expand All @@ -88,6 +95,9 @@ def zip_binding(
),
}

if geojson_stats:
additional_files["stats.json"] = geojson_stats

for name, content in additional_files.items():
temp_path = os.path.join(working_dir, name)
with open(temp_path, "w") as f:
Expand Down Expand Up @@ -209,11 +219,60 @@ def process_raw_data(self, params, user=None):
file_parts,
)

geom_area, geom_dump, working_dir = RawData(
params, str(self.request.id)
).extract_current_data(file_parts)
inside_file_size = 0
# Post-proccessing: Generate GeoJSON/HTML stats and transliterations
polygon_stats = None
geojson_stats_html = None
geojson_stats_json = None
download_html_url = None
if "include_stats" or "include_translit" in params.dict():
post_processor = PostProcessor(
{
"include_stats": params.include_stats,
"include_translit": params.include_translit,
}
)

if params.include_stats:
post_processor.filters = params.filters

post_processor.init()

geom_area, geom_dump, working_dir = RawData(
params, str(self.request.id)
).extract_current_data(file_parts, post_processor.post_process_line)

if params.include_stats:
geojson_stats_json = json.dumps(post_processor.geoJSONStats.dict())

# Create a HTML summary of stats
if params.include_stats_html:
tpl = "stats"
if "waterway" in post_processor.geoJSONStats.config.keys:
tpl = "stats_waterway"
elif "highway" in post_processor.geoJSONStats.config.keys:
tpl = "stats_highway"
elif "building" in post_processor.geoJSONStats.config.keys:
tpl = "stats_building"
project_root = pathlib.Path(__file__).resolve().parent
tpl_path = os.path.join(
project_root,
"../src/post_processing/{tpl}_tpl.html".format(tpl=tpl),
)
geojson_stats_html = post_processor.geoJSONStats.html(
tpl_path
).build()
upload_html_path = os.path.join(
working_dir, os.pardir, f"{exportname_parts[-1]}.html"
)
with open(upload_html_path, "w") as f:
f.write(geojson_stats_html)

else:
geom_area, geom_dump, working_dir = RawData(
params, str(self.request.id)
).extract_current_data(file_parts)

inside_file_size = 0
if "include_stats" in params.dict():
if params.include_stats:
feature = {
Expand All @@ -222,12 +281,14 @@ def process_raw_data(self, params, user=None):
"properties": {},
}
polygon_stats = PolygonStats(feature).get_summary_stats()

if bind_zip:
upload_file_path, inside_file_size = zip_binding(
working_dir=working_dir,
exportname_parts=exportname_parts,
geom_dump=geom_dump,
polygon_stats=polygon_stats,
geojson_stats=geojson_stats_json,
default_readme=DEFAULT_README_TEXT,
)

Expand All @@ -240,6 +301,7 @@ def process_raw_data(self, params, user=None):
upload_file_path = file_path
inside_file_size += os.path.getsize(file_path)
break # only take one file inside dir , if contains many it should be inside zip

# check if download url will be generated from s3 or not from config
if use_s3_to_upload:
file_transfer_obj = S3FileTransfer()
Expand All @@ -253,7 +315,6 @@ def process_raw_data(self, params, user=None):
pattern = r"(hotosm_project_)(\d+)"
match = re.match(pattern, exportname)
if match:
prefix = match.group(1)
project_number = match.group(2)
if project_number:
upload_name = f"TM/{project_number}/{exportname}"
Expand All @@ -272,6 +333,15 @@ def process_raw_data(self, params, user=None):
upload_name,
file_suffix="zip" if bind_zip else params.output_type.lower(),
)

# If there's an HTML file, upload it too
if geojson_stats_html:
download_html_url = file_transfer_obj.upload(
upload_html_path,
upload_name,
file_suffix="html",
)

else:
# give the static file download url back to user served from fastapi static export path
download_url = str(upload_file_path)
Expand All @@ -297,6 +367,9 @@ def process_raw_data(self, params, user=None):
}
if polygon_stats:
final_response["stats"] = polygon_stats
if download_html_url:
final_response["download_html_url"] = download_html_url

return final_response

except Exception as ex:
Expand Down
1 change: 0 additions & 1 deletion backend/field_update
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ class Database:
try:
self.cursor.execute(query)
self.conn.commit()
# print(query)
try:
result = self.cursor.fetchall()

Expand Down
2 changes: 1 addition & 1 deletion backend/raw_backend
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ if __name__ == "__main__":

if not args.replication:
osm2pgsql.append("--drop")
print(osm2pgsql)

run_subprocess_cmd(osm2pgsql)

basic_index_cmd = [
Expand Down
7 changes: 7 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,10 @@ psutil==5.9.8

## logging
tqdm==4.66.2

# stats for geojson data
geojson-stats==0.2.4

# transliterations
transliterate==1.10.2

33 changes: 29 additions & 4 deletions src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from psycopg2.extras import DictCursor
from slugify import slugify
from tqdm import tqdm
from .post_processing.processor import PostProcessor

# Reader imports
from src.config import (
Expand Down Expand Up @@ -640,7 +641,7 @@ def ogr_export(query, outputtype, working_dir, dump_temp_path, params):
os.remove(query_path)

@staticmethod
def query2geojson(con, extraction_query, dump_temp_file_path):
def query2geojson(con, extraction_query, dump_temp_file_path, plugin_fn=None):
"""Function written from scratch without being dependent on any library, Provides better performance for geojson binding"""
# creating geojson file
pre_geojson = """{"type": "FeatureCollection","features": ["""
Expand All @@ -660,10 +661,12 @@ def query2geojson(con, extraction_query, dump_temp_file_path):
for row in cursor:
if first:
first = False
f.write(row[0])
else:
f.write(",")
f.write(row[0])
if plugin_fn:
f.write(plugin_fn(row[0]))
else:
f.write((row[0]))
cursor.close() # closing connection to avoid memory issues
# close the writing geojson with last part
f.write(post_geojson)
Expand Down Expand Up @@ -711,7 +714,7 @@ def get_grid_id(geom, cur):
country_export,
)

def extract_current_data(self, exportname):
def extract_current_data(self, exportname, plugin_fn=None):
"""Responsible for Extracting rawdata current snapshot, Initially it creates a geojson file , Generates query , run it with 1000 chunk size and writes it directly to the geojson file and closes the file after dump
Args:
exportname: takes filename as argument to create geojson file passed from routers
Expand Down Expand Up @@ -777,6 +780,7 @@ def extract_current_data(self, exportname):
country_export=country_export,
),
dump_temp_file_path,
plugin_fn,
) # uses own conversion class
if output_type == RawDataOutputType.SHAPEFILE.value:
(
Expand Down Expand Up @@ -1488,8 +1492,29 @@ def process_export_format(export_format):
layer_creation_options=layer_creation_options_str,
query_dump_path=export_format_path,
)

run_ogr2ogr_cmd(ogr2ogr_cmd)

# Post-processing GeoJSON files
# Adds: stats, HTML stats summary and transliterations
if export_format.driver_name == "GeoJSON" and (
self.params.include_stats or self.params.include_translit
):
post_processor = PostProcessor(
{
"include_stats": self.params.include_stats,
"include_translit": self.params.include_translit,
"include_stats_html": self.params.include_stats_html,
}
)
post_processor.init()
post_processor.custom(
categories=self.params.categories,
export_format_path=export_format_path,
export_filename=export_filename,
file_export_path=file_export_path,
)

zip_file_path = os.path.join(file_export_path, f"{export_filename}.zip")
zip_path = self.file_to_zip(export_format_path, zip_file_path)

Expand Down
1 change: 1 addition & 0 deletions src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,7 @@ def not_raises(func, *args, **kwargs):
logging.error(
"Error creating HDX configuration: %s, Disabling the hdx exports feature", e
)

ENABLE_HDX_EXPORTS = False

if ENABLE_HDX_EXPORTS:
Expand Down
Empty file added src/post_processing/__init__.py
Empty file.
61 changes: 61 additions & 0 deletions src/post_processing/geojson_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from geojson_stats.stats import Stats
from geojson_stats.html import Html

CONFIG_AREA = ["building"]
CONFIG_LENGTH = ["highway", "waterway"]


class GeoJSONStats(Stats):
"""Used for collecting stats while processing GeoJSON files line by line"""

def __init__(self, filters, *args, **kwargs):
super().__init__(*args, **kwargs)

self.config.clean = True
self.config.properties_prop = "properties.tags"

if filters and filters.tags:
for tag in CONFIG_AREA:
if self.check_filter(filters.tags, tag):
self.config.keys.append(tag)
self.config.value_keys.append(tag)
self.config.area = True

for tag in CONFIG_LENGTH:
if self.check_filter(filters.tags, tag):
self.config.keys.append(tag)
self.config.value_keys.append(tag)
self.config.length = True

def check_filter(self, tags, tag):
"""
Check if a tag is present in tag filters
"""

if tags.all_geometry:
if tags.all_geometry.join_or and tag in tags.all_geometry.join_or:
return True
if tags.all_geometry.join_and and tag in tags.all_geometry.join_and:
return True
if tags.polygon:
if tags.polygon.join_or and tag in tags.polygon.join_or:
return True
if tags.polygon.join_and and tag in tags.polygon.join_and:
return True
if tags.line:
if tags.line.join_or and tag in tags.line.join_or:
return True
if tags.line.join_and and tag in tags.line.join_and:
return True

def raw_data_line_stats(self, json_object: dict):
"""
Process a GeoJSON line (for getting stats) and return that line
"""
self.get_object_stats(json_object)

def html(self, tpl):
"""
Returns stats Html object, generated from stats data using a template
"""
return Html(tpl, self)
Loading
Loading