Skip to content

Commit

Permalink
add argument multiprocessing_chunk_size to control memory use
Browse files Browse the repository at this point in the history
  • Loading branch information
chenchenplus committed Sep 29, 2024
1 parent 794c23f commit 2cbeead
Show file tree
Hide file tree
Showing 12 changed files with 120 additions and 55 deletions.
41 changes: 19 additions & 22 deletions mosstool/map/_map_util/aois/append_aois_matcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,14 @@
import shapely.ops as ops
from scipy.spatial import KDTree
from shapely.affinity import scale
from shapely.geometry import (
LineString,
MultiLineString,
MultiPoint,
MultiPolygon,
Point,
Polygon,
)
from shapely.geometry import (LineString, MultiLineString, MultiPoint,
MultiPolygon, Point, Polygon)
from shapely.strtree import STRtree

from ....type import AoiType
from ..._util.angle import abs_delta_angle, delta_angle
from ..._util.line import (
connect_line_string,
get_line_angle,
get_start_vector,
line_extend,
offset_lane,
)
from ..._util.line import (connect_line_string, get_line_angle,
get_start_vector, line_extend, offset_lane)
from .utils import geo_coords

# ATTENTION: In order to achieve longer distance POI merging, the maximum recursion depth needs to be modified.
Expand Down Expand Up @@ -266,7 +255,11 @@ def find_neighbor(pid):
_split_merged_poi_unit,
list(merged_poi.values()),
chunksize=max(
min(ceil(len(list(merged_poi.values())) / workers), 500), 1
min(
ceil(len(list(merged_poi.values())) / workers),
MAX_CHUNK_SIZE,
),
1,
),
)
for res_aoi, res_poi in post_res:
Expand Down Expand Up @@ -1661,7 +1654,7 @@ def _merge_covered_aoi(aois, workers):
aois_result += pool.map(
_find_aoi_parent_unit,
aois_batch,
chunksize=min(ceil(len(aois_batch) / workers), 1000),
chunksize=min(ceil(len(aois_batch) / workers), MAX_CHUNK_SIZE),
)
aois = aois_result
parent2children = defaultdict(list)
Expand Down Expand Up @@ -1701,7 +1694,7 @@ def _merge_covered_aoi(aois, workers):
_find_aoi_overlap_unit,
aois_batch,
chunksize=max(
min(ceil(len(aois_batch) / workers), 1000),
min(ceil(len(aois_batch) / workers), MAX_CHUNK_SIZE),
1,
),
)
Expand Down Expand Up @@ -1845,7 +1838,7 @@ def _add_aoi(aois, stops, workers, merge_aoi: bool = False):
results_stop += pool.map(
_add_aoi_stop_unit,
args_batch,
chunksize=min(ceil(len(args_batch) / workers), 500),
chunksize=min(ceil(len(args_batch) / workers), MAX_CHUNK_SIZE),
)
results_stop = [r for r in results_stop if r]
logging.info(f"matched aois_stop: {len(results_stop)}")
Expand All @@ -1857,7 +1850,7 @@ def _add_aoi(aois, stops, workers, merge_aoi: bool = False):
results_poly += pool.map(
_add_poly_aoi_unit,
args_batch,
chunksize=min(ceil(len(args_batch) / workers), 500),
chunksize=min(ceil(len(args_batch) / workers), MAX_CHUNK_SIZE),
)
results_poly = [r for r in results_poly if r]
logging.info(f"matched aois_poly: {len(results_poly)}")
Expand All @@ -1879,14 +1872,16 @@ def add_aoi_to_map(
bbox,
merge_aoi: bool,
dis_gate: float = 30.0,
multiprocessing_chunk_size: int = 500,
projstr: Optional[str] = None,
shp_path: Optional[str] = None,
workers: int = 32,
):
"""match AOIs to lanes"""
global aoi_uid, d_matcher, w_matcher, road_lane_matcher
global D_DIS_GATE, D_HUGE_GATE, W_DIS_GATE, W_HUGE_GATE, LENGTH_PER_DOOR, MAX_DOOR_NUM, AOI_GATE_OFFSET
global D_DIS_GATE, D_HUGE_GATE, W_DIS_GATE, W_HUGE_GATE, LENGTH_PER_DOOR, MAX_DOOR_NUM, AOI_GATE_OFFSET, MAX_CHUNK_SIZE
W_DIS_GATE = dis_gate
MAX_CHUNK_SIZE = multiprocessing_chunk_size
D_DIS_GATE = W_DIS_GATE + EXTRA_DIS_GATE
d_matcher, w_matcher, road_lane_matcher = (
matchers["drive"],
Expand Down Expand Up @@ -1922,6 +1917,7 @@ def add_sumo_aoi_to_map(
projstr: str,
merge_aoi: bool,
dis_gate: float = 30.0,
multiprocessing_chunk_size: int = 500,
workers: int = 32,
):
"""for SUMO converter, match AOI to lanes"""
Expand All @@ -1931,9 +1927,10 @@ def add_sumo_aoi_to_map(
matchers["walk"],
matchers["road_lane"],
)
global D_DIS_GATE, D_HUGE_GATE, W_DIS_GATE, W_HUGE_GATE, LENGTH_PER_DOOR, MAX_DOOR_NUM, AOI_GATE_OFFSET
global D_DIS_GATE, D_HUGE_GATE, W_DIS_GATE, W_HUGE_GATE, LENGTH_PER_DOOR, MAX_DOOR_NUM, AOI_GATE_OFFSET, MAX_CHUNK_SIZE
global aoi_uid
W_DIS_GATE = dis_gate
MAX_CHUNK_SIZE = multiprocessing_chunk_size
D_DIS_GATE = W_DIS_GATE + EXTRA_DIS_GATE
# AOI UID
aoi_uid = AOI_START_ID
Expand Down
12 changes: 7 additions & 5 deletions mosstool/map/_map_util/aois/match_aoi_pop.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ def _get_aoi_pop(aois_point, aois_poly, workers):
aois_point_result += pool.map(
_get_aoi_point_pop_unit,
aois_point_batch,
chunksize=min(ceil(len(aois_point_batch) / workers), 1000),
chunksize=min(ceil(len(aois_point_batch) / workers), MAX_CHUNK_SIZE),
)
aois_poly_result = []
for i in range(0, len(aois_poly), MAX_BATCH_SIZE):
Expand All @@ -234,7 +234,7 @@ def _get_aoi_pop(aois_point, aois_poly, workers):
aois_poly_result += pool.map(
_get_aoi_poly_pop_unit,
aois_poly_batch,
chunksize=min(ceil(len(aois_poly_batch) / workers), 200),
chunksize=min(ceil(len(aois_poly_batch) / workers), MAX_CHUNK_SIZE),
)
aois_point, aois_poly = aois_point_result, aois_poly_result
aois_poly, flags = zip(*aois_poly)
Expand Down Expand Up @@ -268,6 +268,7 @@ def add_aoi_pop(
max_latitude: float,
min_latitude: float,
proj_str: str,
multiprocessing_chunk_size: int = 500,
upsample_factor: int = 4,
workers: int = 32,
tif_path: Optional[str] = None,
Expand All @@ -284,7 +285,8 @@ def add_aoi_pop(
)
bbox = (min_lon, max_lon, min_lat, max_lat)
lon_cen, lat_cen = (min_lon + max_lon) / 2, (min_lat + max_lat) / 2
global pixel_idx2point_pop, aois_poly_global, x_left, y_upper, x_step, y_step, pixel_area, aoi_point_area, xy_gps_scale2, n_upsample
global pixel_idx2point_pop, aois_poly_global, x_left, y_upper, x_step, y_step, pixel_area, aoi_point_area, xy_gps_scale2, n_upsample, MAX_CHUNK_SIZE
MAX_CHUNK_SIZE = multiprocessing_chunk_size
# Preprocess AOI data
logging.info("Pre-processing aois")
has_pop = False
Expand Down Expand Up @@ -369,7 +371,7 @@ def add_aoi_pop(
list_pixel2pop_batch,
chunksize=min(
ceil(len(list_pixel2pop_batch) / workers),
1000,
MAX_CHUNK_SIZE,
),
)
pixel_idx2point_pop = {k: v for x in results for k, v in x}
Expand All @@ -389,7 +391,7 @@ def add_aoi_pop(
list_pixel2pop_batch,
chunksize=min(
ceil(len(list_pixel2pop_batch) / workers),
1000,
MAX_CHUNK_SIZE,
),
)
pixel_idx2point_pop = {k: v for x in results for k, v in x}
Expand Down
18 changes: 14 additions & 4 deletions mosstool/map/_map_util/aois/reuse_aois_matchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@

from ....type import Map
from ....util.format_converter import pb2dict
from .append_aois_matcher import _process_matched_result, _str_tree_matcher_unit
from ..const import *
from .append_aois_matcher import (_process_matched_result,
_str_tree_matcher_unit)


def _map_aoi2geo(aoi: dict) -> Polygon:
Expand All @@ -36,9 +37,18 @@ def _add_aoi_unit(aoi):
)


def match_map_aois(net: Map, matchers: dict, workers: int):
global d_matcher, w_matcher
def match_map_aois(
net: Map,
matchers: dict,
workers: int,
dis_gate: float = 30.0,
multiprocessing_chunk_size: int = 500,
):
global d_matcher, w_matcher, D_DIS_GATE, W_DIS_GATE, MAX_CHUNK_SIZE
global d_tree, w_tree
W_DIS_GATE = dis_gate
MAX_CHUNK_SIZE = multiprocessing_chunk_size
D_DIS_GATE = W_DIS_GATE + EXTRA_DIS_GATE
net_dict = pb2dict(net)
orig_aois = net_dict["aois"]
orig_pois = net_dict["pois"]
Expand All @@ -55,7 +65,7 @@ def match_map_aois(net: Map, matchers: dict, workers: int):
results_aois += pool.map(
_add_aoi_unit,
args_batch,
chunksize=min(ceil(len(args_batch) / workers), 500),
chunksize=min(ceil(len(args_batch) / workers), MAX_CHUNK_SIZE),
)
aois = [r for r in results_aois if r]
# filter pois
Expand Down
27 changes: 21 additions & 6 deletions mosstool/map/_map_util/aois/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ def _merge_aoi(input_aois: list, merge_aoi: bool = False, workers: int = 32):
aois = pool.map(
_find_aoi_parent_unit,
aois,
chunksize=min(ceil(len(aois) / workers), 1000),
chunksize=min(ceil(len(aois) / workers), MAX_CHUNK_SIZE),
)

aois_ancestor = []
Expand Down Expand Up @@ -257,7 +257,7 @@ def _connect_aoi(input_aois: list, merge_aoi: bool = False, workers: int = 32):
results = pool.map(
_connect_aoi_unit1,
args,
chunksize=max(min(ceil(len(args) / workers), 200), 1),
chunksize=max(min(ceil(len(args) / workers), MAX_CHUNK_SIZE), 1),
)
results = [x for x in results if x]
clusters = [x[:2] for x in results]
Expand All @@ -270,7 +270,7 @@ def _connect_aoi(input_aois: list, merge_aoi: bool = False, workers: int = 32):
results = pool.map(
_connect_aoi_unit2,
clusters,
chunksize=max(min(ceil(len(clusters) / workers), 200), 1),
chunksize=max(min(ceil(len(clusters) / workers), MAX_CHUNK_SIZE), 1),
)
aois_connect = [x for x in results if isinstance(x, dict)]
aois_other += sum([x for x in results if isinstance(x, list)], [])
Expand Down Expand Up @@ -364,7 +364,7 @@ def _match_poi_to_aoi(aois, pois, workers):
_match_poi_unit,
pois_batch,
chunksize=max(
min(ceil(len(pois_batch) / workers), 1000),
min(ceil(len(pois_batch) / workers), MAX_CHUNK_SIZE),
1,
),
)
Expand Down Expand Up @@ -413,7 +413,15 @@ def _post_compute_aoi_poi(aois, pois_isolate):
return aois


def generate_aoi_poi(input_aois, input_pois, input_stops, workers: int = 32):
def generate_aoi_poi(
input_aois,
input_pois,
input_stops,
workers: int = 32,
multiprocessing_chunk_size: int = 500,
):
global MAX_CHUNK_SIZE
MAX_CHUNK_SIZE = multiprocessing_chunk_size
merge_aoi = False
input_aois = _fix_aois_poly(input_aois)
# Process covered AOI
Expand All @@ -431,8 +439,15 @@ def generate_aoi_poi(input_aois, input_pois, input_stops, workers: int = 32):


def generate_sumo_aoi_poi(
input_aois, input_pois, input_stops, workers: int = 32, merge_aoi: bool = False
input_aois,
input_pois,
input_stops,
workers: int = 32,
merge_aoi: bool = False,
multiprocessing_chunk_size: int = 500,
):
global MAX_CHUNK_SIZE
MAX_CHUNK_SIZE = multiprocessing_chunk_size
input_aois = _fix_aois_poly(input_aois)
input_aois = _merge_aoi(input_aois, merge_aoi, workers)
input_aois = _connect_aoi(input_aois, merge_aoi, workers)
Expand Down
2 changes: 1 addition & 1 deletion mosstool/map/_map_util/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"footway",
] # TO be converted SUMO road levels

MAX_BATCH_SIZE = 15_0000
MAX_JUNC_LANE_LENGTH = 500
MIN_WALK_CONNECTED_COMPONENT = 5 # Minimum length of connected component of sidewalk
DEFAULT_ROAD_SPLIT_LENGTH = 20 # Default road shortening length
Expand Down Expand Up @@ -103,7 +104,6 @@
"BUS": 100,
"SUBWAY": 180,
}
MAX_BATCH_SIZE = 15_0000
MIN_HAS_WALK_LANE_LENGTH = 5
LARGE_LANE_NUM_THRESHOLD = 4
SMALL_LANE_NUM_THRESHOLD = 2
Expand Down
15 changes: 12 additions & 3 deletions mosstool/map/builder/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ def __init__(
Literal["green_yellow_red"],
Literal["green_yellow_clear_red"],
] = "green_yellow_clear_red",
multiprocessing_chunk_size: int = 500,
green_time: float = 30.0,
yellow_time: float = 5.0,
strict_mode: bool = False,
Expand All @@ -88,6 +89,7 @@ def __init__(
- road_expand_mode (str): road expand mode
- aoi_mode (str): aoi appending mode. `append` takes effect when the input `net` is Map, incrementally adding the input AOIs; `overwrite` only adds the input AOIs, ignoring existing ones.
- traffic_light_mode (str): fixed time traffic-light generation mode. `green_red` means only green and red light will be generated, `green_yellow_red` means there will be yellow light between green and red light, `green_yellow_clear_red` add extra pedestrian clear red light.
- multiprocessing_chunk_size (int): the maximum size of each multiprocessing chunk
- green_time (float): green time
- strict_mode (bool): when enabled, causes the program to exit whenever a warning occurs
- merge_aoi (bool): merge nearby aois
Expand Down Expand Up @@ -119,7 +121,7 @@ def __init__(
self.strict_mode = strict_mode
self.merge_aoi = merge_aoi
self.aoi_matching_distance_threshold = aoi_matching_distance_threshold
# TODO:加入阈值
self.multiprocessing_chunk_size = multiprocessing_chunk_size
self.output_lane_length_check = output_lane_length_check
self.workers = workers
self.traffic_light_mode: Union[
Expand Down Expand Up @@ -4042,7 +4044,10 @@ def _add_reuse_aoi(self):
}
if type(self.net) == Map:
(reuse_aois, reuse_pois) = match_map_aois(
net=self.net, matchers=matchers, workers=self.workers
net=self.net,
matchers=matchers,
workers=self.workers,
multiprocessing_chunk_size=self.multiprocessing_chunk_size,
)
return (reuse_aois, reuse_pois)

Expand All @@ -4053,7 +4058,9 @@ def _add_input_aoi(self):
self.raw_aois, self.public_transport_data, self.proj_str
)
pois = convert_poi(self.raw_pois, self.proj_str)
aois, stops, pois = generate_aoi_poi(aois, pois, stops, self.workers)
aois, stops, pois = generate_aoi_poi(
aois, pois, stops, self.workers, self.multiprocessing_chunk_size
)
d_right_lanes = [
w["lanes"][-1]
for w in self.map_roads.values()
Expand Down Expand Up @@ -4117,6 +4124,7 @@ def _add_input_aoi(self):
dis_gate=self.aoi_matching_distance_threshold,
projstr=self.proj_str,
shp_path=self.landuse_shp_path,
multiprocessing_chunk_size=self.multiprocessing_chunk_size,
)
# added_aois = add_aoi_pop(
# aois=added_aois,
Expand All @@ -4128,6 +4136,7 @@ def _add_input_aoi(self):
# upsample_factor=UPSAMPLE_FACTOR,
# workers=self.workers,
# tif_path=self.pop_tif_path,
# multiprocessing_chunk_size=self.multiprocessing_chunk_size,
# )
return (added_aois, added_pois)

Expand Down
11 changes: 9 additions & 2 deletions mosstool/map/public_transport/public_transport_post.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ def aoi2driving_lane_id(aoi: dict, lane_id: int, road_id: int):
taz_results = pool.map(
_get_taz_cost_unit,
taz_cost_args,
chunksize=max((len(taz_cost_args) // workers), 500),
chunksize=max((len(taz_cost_args) // workers), MAX_CHUNK_SIZE),
)
subline_id2taz_costs = {r[0]: r[1] for r in taz_results}
for subline in sublines_data:
Expand All @@ -442,7 +442,14 @@ def aoi2driving_lane_id(aoi: dict, lane_id: int, road_id: int):
return m


def public_transport_process(m: dict, server_address: str, workers: int = cpu_count()):
def public_transport_process(
m: dict,
server_address: str,
workers: int = cpu_count(),
multiprocessing_chunk_size: int = 500,
):
global MAX_CHUNK_SIZE
MAX_CHUNK_SIZE = multiprocessing_chunk_size
m = asyncio.run(_fill_public_lines(m, server_address))
m = _post_compute(m, workers)
return m
Loading

0 comments on commit 2cbeead

Please sign in to comment.