From b0c9bf9c3677dcd4e1debb131cc03f3079d1a920 Mon Sep 17 00:00:00 2001 From: xyluo25 Date: Fri, 23 Aug 2024 05:06:45 -0700 Subject: [PATCH] enable dataclass functions --- pyufunc/util_data_processing/_dataclass.py | 61 +- pyufunc/util_geo/_gmns.py | 645 ++++++++++++++------- 2 files changed, 477 insertions(+), 229 deletions(-) diff --git a/pyufunc/util_data_processing/_dataclass.py b/pyufunc/util_data_processing/_dataclass.py index 1fcfee4..dcbbde2 100644 --- a/pyufunc/util_data_processing/_dataclass.py +++ b/pyufunc/util_data_processing/_dataclass.py @@ -6,8 +6,9 @@ ############################################################## ''' -from dataclasses import dataclass, field, fields, make_dataclass, MISSING, is_dataclass +from dataclasses import dataclass, field, fields, make_dataclass, MISSING, is_dataclass, asdict from typing import Any, List, Tuple, Type, Union, Dict +import copy def create_dataclass_from_dict(name: str, data: Dict[str, Any]) -> Type: @@ -37,16 +38,27 @@ def __setitem__(self, key, value): else: raise KeyError(f"Key {key} not found in {self.__class__.__name__}") + # Define a method to convert the dataclass to a dictionary + def as_dict(self): + return asdict(self) + # Extract fields and their types from the dictionary - fields = [(key, type(value), field(default=value)) - for key, value in data.items()] + dataclass_fields = [] + for key, value in data.items(): + if isinstance(value, (list, dict, set)): # For mutable types + dataclass_fields.append( + (key, type(value), field(default_factory=lambda v=value: v))) + else: # For immutable types + dataclass_fields.append((key, type(value), field(default=value))) # Create the dataclass dynamically DataClass = make_dataclass( cls_name=name, - fields=fields, + fields=dataclass_fields, bases=(), - namespace={'__getitem__': __getitem__, '__setitem__': __setitem__} + namespace={'__getitem__': __getitem__, + '__setitem__': __setitem__, + 'as_dict': as_dict} ) # Instantiate the dataclass with the values from the dictionary @@ -95,6 +107,10 @@ def __setitem__(self, key, value): setattr(self, key, value) else: raise KeyError(f"Key {key} not found in {self.__class__.__name__}") + + def as_dict(self): + return asdict(self) + processed_attributes = [] for attr in attributes: @@ -106,7 +122,9 @@ def __setitem__(self, key, value): processed_attributes.append((attr[0], attr[1], attr[2])) return make_dataclass(class_name, processed_attributes, - namespace={'__getitem__': __getitem__, '__setitem__': __setitem__}) + namespace={'__getitem__': __getitem__, + '__setitem__': __setitem__, + 'as_dict': as_dict}) def merge_dataclass(dataclass_one: Type[Any], dataclass_two: Type[Any], @@ -185,6 +203,21 @@ def extend_dataclass( additional_attributes (list): A list of tuples in the form (name, type, default_value). or (name, default_value) to add to the base dataclass. + Example: + >>> from dataclasses import dataclass + >>> from typing import List + >>> from pyufunc import extend_dataclass + >>> @dataclass + ... class BaseDataclass: + ... name: str = 'base' + + >>> ExtendedDataclass = extend_dataclass( + ... base_dataclass=BaseDataclass, + ... additional_attributes=[('new_attr', List[int], [1, 2, 3])]) + >>> ExtendedDataclass + + + Returns: dataclass: A new dataclass that includes fields from base_dataclass and additional_attributes. """ @@ -198,9 +231,13 @@ def extend_dataclass( raise ValueError('additional_attributes must be a list of tuples' ' in the form (name, default_value) or (name, type, default_value)') + # deepcopy the base dataclass + base_dataclass_ = copy.deepcopy(base_dataclass) + # base_dataclass_ = base_dataclass + # Extract existing fields from the base dataclass base_fields = [] - for f in fields(base_dataclass): + for f in fields(base_dataclass_): if f.default is not MISSING: base_fields.append((f.name, f.type, f.default)) elif f.default_factory is not MISSING: @@ -220,12 +257,18 @@ def extend_dataclass( # Combine base fields with additional attributes all_fields = base_fields + additional_attributes - return make_dataclass( - cls_name=f'{base_dataclass.__name__}', + new_dataclass = make_dataclass( + cls_name=f'{base_dataclass_.__name__}', fields=all_fields, bases=(base_dataclass,), ) + # Register the new dataclass in the global scope to allow pickling + globals()[new_dataclass.__name__] = new_dataclass + + # new_dataclass.__module__ = base_dataclass_.__module__ + return new_dataclass + def dataclass_dict_access(dataclass_instance: Any) -> Any: """Wrap a dataclass instance to provide dictionary-like access. diff --git a/pyufunc/util_geo/_gmns.py b/pyufunc/util_geo/_gmns.py index b62a43a..330b33e 100644 --- a/pyufunc/util_geo/_gmns.py +++ b/pyufunc/util_geo/_gmns.py @@ -7,20 +7,24 @@ # GMNS: General Modeling Network Specification ############################################################## from __future__ import annotations -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Any import os from dataclasses import dataclass, field, asdict, fields from multiprocessing import Pool + from pyufunc.util_common._func_time_decorator import func_time from pyufunc.util_pathio._path import path2linux from pyufunc.util_common._dependency_requires_decorator import requires from pyufunc.util_common._import_package import import_package from pyufunc.pkg_configs import config_gmns +from pyufunc.util_data_processing._dataclass import extend_dataclass, create_dataclass_from_dict import pandas as pd if TYPE_CHECKING: import shapely + import tqdm + from pyproj import Transformer __all__ = ['Node', 'Link', 'POI', 'Zone', 'Agent', 'read_node', 'read_poi', 'read_link', 'read_zone'] @@ -101,7 +105,8 @@ class Link: link_type_name: The name of the link type. geometry: The geometry of the link. based on wkt format. as_dict: The method to convert the link to a dictionary. - to_networkx: The method to convert the link to a networkx edge tuple format. (from_node_id, to_node_id, attr_dict) + to_networkx: The method to convert the link to a networkx edge tuple format. + (from_node_id, to_node_id, attr_dict) """ @@ -307,8 +312,6 @@ def __setitem__(self, key, value): def to_dict(self): return {f.name: getattr(self, f.name) for f in fields(self)} -# todo: read node, link and zone - @requires("shapely") def _create_node_from_dataframe(df_node: pd.DataFrame) -> dict[int, Node]: @@ -321,31 +324,37 @@ def _create_node_from_dataframe(df_node: pd.DataFrame) -> dict[int, Node]: dict[int, Node]: a dict of nodes.{node_id: Node} """ - import_package("shapely", verbose=False) + import_package("shapely") import shapely # Reset index to avoid index error df_node = df_node.reset_index(drop=True) + col_names = df_node.columns.tolist() + + if "node_id" in col_names: + col_names.remove("node_id") - print(df_node.head()) + # get node dataclass fields + # Get the list of attribute names + node_attr_names = [f.name for f in fields(Node)] + + # check difference between node_attr_names and col_names + diff = list(set(col_names) - set(node_attr_names)) + + # create attributes for node class if diff is not empty + if diff: + diff_attr = [(val, str, "") for val in diff] + Node_ext = extend_dataclass(Node, diff_attr) + else: + Node_ext = Node node_dict = {} for i in range(len(df_node)): try: - # check activity location tab - activity_type = df_node.loc[i, 'activity_type'] - boundary_flag = df_node.loc[i, 'is_boundary'] - if activity_type in ["residential", "poi"]: - activity_location_tab = activity_type - elif boundary_flag == 1: - activity_location_tab = "boundary" - else: - activity_location_tab = '' - # check whether zone_id field in node.csv or not - # if zone_id field exists and is not empty, assign it to __zone_id + # if zone_id field exists and is not empty, assign it to _zone_id try: - _zone_id = df_node.loc[i, 'zone_id'] + _zone_id = int(df_node.loc[i, 'zone_id']) # check if _zone is none or empty, assign -1 if pd.isna(_zone_id) or not _zone_id: @@ -354,25 +363,41 @@ def _create_node_from_dataframe(df_node: pd.DataFrame) -> dict[int, Node]: except Exception: _zone_id = -1 - node = Node( - id=df_node.loc[i, 'node_id'], - activity_type=activity_type, - activity_location_tab=activity_location_tab, - ctrl_type=df_node.loc[i, 'ctrl_type'], - x_coord=df_node.loc[i, 'x_coord'], - y_coord=df_node.loc[i, 'y_coord'], - poi_id=df_node.loc[i, 'poi_id'], - boundary_flag=boundary_flag, - geometry=shapely.Point(df_node.loc[i, 'x_coord'], df_node.loc[i, 'y_coord']), - _zone_id=_zone_id - ) - node_dict[df_node.loc[i, 'node_id']] = node + # get node id + node_id = int(df_node.loc[i, 'node_id']) + x_coord = float(df_node.loc[i, 'x_coord']) + y_coord = float(df_node.loc[i, 'y_coord']) + + node = Node_ext() + + for col in col_names: + setattr(node, col, df_node.loc[i, col]) + + node.id = node_id + node._zone_id = _zone_id + node.geometry = shapely.Point(x_coord, y_coord) + + # node = Node( + # id=node_id, + # # activity_type=activity_type, + # # ctrl_type=df_node.loc[i, 'ctrl_type'], + # x_coord=x_coord, + # y_coord=y_coord, + # # poi_id=df_node.loc[i, 'poi_id'], + # # is_boundary=is_boundary, + # geometry=shapely.Point(x_coord, y_coord), + # _zone_id=_zone_id + # ) + + node_dict[node_id] = asdict(node) + except Exception as e: - print(f" : Unable to create node: {df_node.loc[i, 'node_id']}, error: {e}", flush=True) + print(f" : Unable to create node: {node_id}, error: {e}") + return node_dict -@requires("shapely") +@requires("shapely", "pyproj") def _create_poi_from_dataframe(df_poi: pd.DataFrame) -> dict[int, POI]: """Create POI from df_poi. @@ -382,72 +407,90 @@ def _create_poi_from_dataframe(df_poi: pd.DataFrame) -> dict[int, POI]: Returns: dict[int, POI]: a dict of POIs.{poi_id: POI} """ - - import_package("shapely", verbose=False) - import shapely + # import_package("shapely") + # import_package("pyproj") + # import shapely df_poi = df_poi.reset_index(drop=True) + col_names = df_poi.columns.tolist() + + if "poi_id" in col_names: + col_names.remove("poi_id") + + # get node dataclass fields + # Get the list of attribute names + poi_attr_names = [f.name for f in fields(POI)] + + # check difference between node_attr_names and col_names + diff = list(set(col_names) - set(poi_attr_names)) + + # create attributes for node class if diff is not empty + if diff: + diff_attr = [(val, Any, "") for val in diff] + POI_ext = extend_dataclass(POI, diff_attr) + else: + POI_ext = POI + poi_dict = {} for i in range(len(df_poi)): try: centroid = shapely.from_wkt(df_poi.loc[i, 'centroid']) + + # check if area is empty or not area = df_poi.loc[i, 'area'] - if area > 90000: - area = 0 - poi = POI( - id=df_poi.loc[i, 'poi_id'], - x_coord=centroid.x, - y_coord=centroid.y, - area=[area, area * 10.7639104], # square meter and square feet - poi_type=df_poi.loc[i, 'building'] or "", - geometry=df_poi.loc[i, "geometry"] - ) - poi_dict[df_poi.loc[i, 'poi_id']] = poi - except Exception as e: - print(f" : Unable to create poi: {df_poi.loc[i, 'poi_id']}, error: {e}", flush=True) - return poi_dict + if pd.isna(area) or not area: + geometry_shapely = shapely.from_wkt(df_poi.loc[i, 'geometry']) + # Set up a Transformer to convert from WGS 84 to UTM zone 18N (EPSG:32618) + transformer = Transformer.from_crs( + "EPSG:4326", "EPSG:32618", always_xy=True) -def _create_link_from_dataframe(df_link: pd.DataFrame) -> dict[int, Zone]: - """Create Link from df_link. + # Transform the polygon's coordinates to UTM + transformed_coords = [transformer.transform( + x, y) for x, y in geometry_shapely.exterior.coords] + transformed_polygon = shapely.Polygon(transformed_coords) - Args: - df_link (pd.DataFrame): dataframe of link from link.csv + # square meters + area_sqm = transformed_polygon.area - Returns: - dict[int, Zone]: a dict of Link.{link_id: Link} - """ + # square feet + # area = area_sqm * 10.7639104 - try: - df_link = df_link.reset_index(drop=True) - link_dict = {} + area = area_sqm - for i in range(len(df_link)): - try: - link = Link( - id=df_link.loc[i, 'link_id'], - name=df_link.loc[i, 'name'], - from_node_id=df_link.loc[i, 'from_node_id'], - to_node_id=df_link.loc[i, 'to_node_id'], - length=df_link.loc[i, 'length'], - lanes=df_link.loc[i, 'lanes'], - dir_flag=df_link.loc[i, 'dir_flag'], - free_speed=df_link.loc[i, 'free_speed'], - capacity=df_link.loc[i, 'capacity'], - link_type=df_link.loc[i, 'link_type'], - link_type_name=df_link.loc[i, 'link_type_name'], - geometry=df_link.loc[i, 'geometry'], - allowed_uses=df_link.loc[i, 'allowed_uses'], - from_biway=df_link.loc[i, 'from_biway'], - ) - link_dict[df_link.loc[i, 'link_id']] = link - except Exception as e: - print(f" : Unable to create link: {df_link.loc[i, 'link_id']}, error: {e}", flush=True) - return link_dict - except Exception as e: - print(f" : Unable to create link: {e}", flush=True) - return {} + elif area > 90000: + area = 0 + else: + pass + + # get poi id + poi_id = int(df_poi.loc[i, 'poi_id']) + + poi = POI_ext() + + for col in col_names: + setattr(poi, col, df_poi.loc[i, col]) + + poi.id = poi_id + poi.x_coord = centroid.x + poi.y_coord = centroid.y + poi.area = area + + # poi = POI( + # id=df_poi.loc[i, 'poi_id'], + # x_coord=centroid.x, + # y_coord=centroid.y, + # area=area, # square feet: area * 10.7639104 + # building=df_poi.loc[i, 'building'] or "", + # amenity=df_poi.loc[i, 'amenity'] or "", + # centroid=df_poi.loc[i, 'centroid'], + # geometry=df_poi.loc[i, "geometry"] + # ) + poi_dict[poi_id] = asdict(poi) + except Exception as e: + print(f" : Unable to create poi: {poi_id}, error: {e}") + return poi_dict @requires("shapely") @@ -461,10 +504,29 @@ def _create_zone_from_dataframe_by_geometry(df_zone: pd.DataFrame) -> dict[int, dict[int, Zone]: a dict of Zones.{zone_id: Zone} """ - import_package("shapely", verbose=False) - import shapely + # import_package("shapely") + # import shapely df_zone = df_zone.reset_index(drop=True) + col_names = df_zone.columns.tolist() + + if "zone_id" in col_names: + col_names.remove("zone_id") + + # get node dataclass fields + # Get the list of attribute names + zone_attr_names = [f.name for f in fields(Zone)] + + # check difference between node_attr_names and col_names + diff = list(set(col_names) - set(zone_attr_names)) + + # create attributes for node class if diff is not empty + if diff: + diff_attr = [(val, Any, "") for val in diff] + Zone_ext = extend_dataclass(Zone, diff_attr) + else: + Zone_ext = Zone + zone_dict = {} for i in range(len(df_zone)): @@ -474,30 +536,46 @@ def _create_zone_from_dataframe_by_geometry(df_zone: pd.DataFrame) -> dict[int, zone_geometry_shapely = shapely.from_wkt(zone_geometry) centroid_wkt = zone_geometry_shapely.centroid.wkt - centroid_x = zone_geometry_shapely.centroid.x - centroid_y = zone_geometry_shapely.centroid.y - zone = Zone( - id=zone_id, - name=zone_id, - centroid_x=centroid_x, - centroid_y=centroid_y, - centroid=centroid_wkt, - x_max=zone_geometry_shapely.bounds[2], - x_min=zone_geometry_shapely.bounds[0], - y_max=zone_geometry_shapely.bounds[3], - y_min=zone_geometry_shapely.bounds[1], - node_id_list=[], - poi_id_list=[], - production=0, - attraction=0, - production_fixed=0, - attraction_fixed=0, - geometry=zone_geometry - ) - - zone_dict[zone_id] = zone + x_coord = zone_geometry_shapely.centroid.x + y_coord = zone_geometry_shapely.centroid.y + + zone = Zone_ext() + + for col in col_names: + setattr(zone, col, df_zone.loc[i, col]) + + zone.id = zone_id + zone.name = zone_id + zone.x_coord = x_coord + zone.y_coord = y_coord + zone.centroid = centroid_wkt + zone.x_min = zone_geometry_shapely.bounds[0] + zone.y_min = zone_geometry_shapely.bounds[1] + zone.x_max = zone_geometry_shapely.bounds[2] + zone.y_max = zone_geometry_shapely.bounds[3] + + # zone = Zone( + # id=zone_id, + # name=zone_id, + # x_coord=x_coord, + # y_coord=y_coord, + # centroid=centroid_wkt, + # x_max=zone_geometry_shapely.bounds[2], + # x_min=zone_geometry_shapely.bounds[0], + # y_max=zone_geometry_shapely.bounds[3], + # y_min=zone_geometry_shapely.bounds[1], + # node_id_list=[], + # poi_id_list=[], + # production=0, + # attraction=0, + # production_fixed=0, + # attraction_fixed=0, + # geometry=zone_geometry + # ) + + zone_dict[zone_id] = asdict(zone) except Exception as e: - print(f" : Unable to create zone: {zone_id}, error: {e}", flush=True) + print(f" : Unable to create zone: {zone_id}, error: {e}") return zone_dict @@ -512,50 +590,130 @@ def _create_zone_from_dataframe_by_centroid(df_zone: pd.DataFrame) -> dict[int, dict[int, Zone]: a dict of Zones.{zone_id: Zone} """ - import_package("shapely", verbose=False) - import shapely + # import_package("shapely") + # import shapely df_zone = df_zone.reset_index(drop=True) + col_names = df_zone.columns.tolist() + + if "zone_id" in col_names: + col_names.remove("zone_id") + + # get node dataclass fields + # Get the list of attribute names + zone_attr_names = [f.name for f in fields(Zone)] + + # check difference between node_attr_names and col_names + diff = list(set(col_names) - set(zone_attr_names)) + + # create attributes for node class if diff is not empty + if diff: + diff_attr = [(val, Any, "") for val in diff] + Zone_ext = extend_dataclass(Zone, diff_attr) + else: + Zone_ext = Zone + zone_dict = {} for i in range(len(df_zone)): try: zone_id = df_zone.loc[i, 'zone_id'] - centroid_x = df_zone.loc[i, 'x_coord'] - centroid_y = df_zone.loc[i, 'y_coord'] + x_coord = df_zone.loc[i, 'x_coord'] + y_coord = df_zone.loc[i, 'y_coord'] - zone_centroid_shapely = shapely.Point(centroid_x, centroid_y) + # load zone geometry + try: + zone_geometry = df_zone.loc[i, 'geometry'] + except Exception: + zone_geometry = "" + + zone_centroid_shapely = shapely.Point(x_coord, y_coord) centroid_wkt = zone_centroid_shapely.wkt - zone = Zone( - id=zone_id, - name=zone_id, - centroid_x=centroid_x, - centroid_y=centroid_y, - centroid=centroid_wkt, - node_id_list=[], - poi_id_list=[], - production=0, - attraction=0, - production_fixed=0, - attraction_fixed=0, - ) - - zone_dict[zone_id] = zone + zone = Zone_ext() + + for col in col_names: + setattr(zone, col, df_zone.loc[i, col]) + + zone.id = zone_id + zone.name = zone_id + zone.centroid = centroid_wkt + zone.geometry = zone_geometry + + # zone = Zone( + # id=zone_id, + # name=zone_id, + # x_coord=x_coord, + # y_coord=y_coord, + # centroid=centroid_wkt, + # node_id_list=[], + # poi_id_list=[], + # production=0, + # attraction=0, + # production_fixed=0, + # attraction_fixed=0, + # geometry=zone_geometry + # ) + + zone_dict[zone_id] = asdict(zone) except Exception as e: - print(f" : Unable to create zone: {zone_id}, error: {e}", flush=True) + print(f" : Unable to create zone: {zone_id}, error: {e}") return zone_dict +def _create_link_from_dataframe(df_link: pd.DataFrame) -> dict[int, Zone]: + """Create Link from df_link. + + Args: + df_link (pd.DataFrame): dataframe of link from link.csv + + Returns: + dict[int, Zone]: a dict of Link.{link_id: Link} + """ + + try: + df_link = df_link.reset_index(drop=True) + link_dict = {} + + for i in range(len(df_link)): + try: + link = Link( + id=df_link.loc[i, 'link_id'], + name=df_link.loc[i, 'name'], + from_node_id=df_link.loc[i, 'from_node_id'], + to_node_id=df_link.loc[i, 'to_node_id'], + length=df_link.loc[i, 'length'], + lanes=df_link.loc[i, 'lanes'], + dir_flag=df_link.loc[i, 'dir_flag'], + free_speed=df_link.loc[i, 'free_speed'], + capacity=df_link.loc[i, 'capacity'], + link_type=df_link.loc[i, 'link_type'], + link_type_name=df_link.loc[i, 'link_type_name'], + geometry=df_link.loc[i, 'geometry'], + allowed_uses=df_link.loc[i, 'allowed_uses'], + from_biway=df_link.loc[i, 'from_biway'], + ) + link_dict[df_link.loc[i, 'link_id']] = link + except Exception as e: + print(f" : Unable to create link: { + df_link.loc[i, 'link_id']}, error: {e}", flush=True) + return link_dict + except Exception as e: + print(f" : Unable to create link: {e}", flush=True) + return {} + + # main functions for reading node, poi, zone files and network + @func_time -def read_node(node_file: str = "", cpu_cores: int = -1, verbose: bool = False) -> dict[int: Node]: +@requires("tqdm") +def read_node(node_file: str = "", cpu_cores: int = 1, verbose: bool = False) -> dict[int: Node]: """Read node.csv file and return a dict of nodes. Args: node_file (str, optional): node file path. Defaults to "". - cpu_cores (int, optional): number of cpu cores for parallel processing. Defaults to -1. + cpu_cores (int, optional): number of cpu cores for parallel processing. Defaults to 1. verbose (bool, optional): print processing information. Defaults to False. Raises: @@ -567,16 +725,13 @@ def read_node(node_file: str = "", cpu_cores: int = -1, verbose: bool = False) - Examples: >>> node_dict = read_node(node_file = r"../dataset/ASU/node.csv") >>> node_dict[1] - Node(id=1, zone_id=0, x_coord=0.0, y_coord=0.0, boundary_flag=0, geometry='POINT (0 0)',...) + Node(id=1, zone_id=0, x_coord=0.0, y_coord=0.0, is_boundary=0, geometry='POINT (0 0)',...) # if node_file does not exist, raise error >>> node_dict = read_node(node_file = r"../dataset/ASU/node.csv") FileNotFoundError: File: ../dataset/ASU/node.csv does not exist. """ - if verbose: - print(" :Running on parallel processing, make sure you are running under if __name__ == '__main__': \n") - # convert path to linux path node_file = path2linux(node_file) @@ -592,35 +747,51 @@ def read_node(node_file: str = "", cpu_cores: int = -1, verbose: bool = False) - df_node_2rows = pd.read_csv(node_file, nrows=2) col_names = df_node_2rows.columns.tolist() - if "zone_id" in col_names: + if "zone_id" in col_names and "zone_id" not in node_required_cols: node_required_cols.append("zone_id") if verbose: print(f" : Reading node.csv with specified columns: {node_required_cols} \ \n and chunksize {chunk_size} for iterations...") - df_node_chunk = pd.read_csv(node_file, usecols=node_required_cols, chunksize=chunk_size) + try: + # Get total rows in poi.csv and calculate total chunks + total_rows = sum(1 for _ in open(node_file)) - 1 # Exclude header row + total_chunks = total_rows // chunk_size + 1 + df_node_chunk = pd.read_csv( + node_file, usecols=node_required_cols, chunksize=chunk_size) + except Exception as e: + raise Exception(f"Error: Unable to read node.csv file for: {e}") if verbose: print(f" : Parallel creating Nodes using Pool with {cpu_cores} CPUs. Please wait...") + node_dict_final = {} - results = [] # Parallel processing using Pool with Pool(cpu_cores) as pool: - results = pool.map(_create_node_from_dataframe, df_node_chunk, chunksize=chunk_size) - # for df_node in df_node_chunk: - # results.append(_create_node_from_dataframe(df_node)) + # results = pool.map(_create_node_from_dataframe, df_node_chunk) + results = list( + tqdm(pool.imap(_create_node_from_dataframe, df_node_chunk), total=total_chunks)) + pool.close() + pool.join() + + # results = process_map(_create_node_from_dataframe, df_node_chunk, max_workers=cpu_cores) for node_dict in results: node_dict_final.update(node_dict) if verbose: print(f" : Successfully loaded node.csv: {len(node_dict_final)} Nodes loaded.") + + node_dict_final = {k: create_dataclass_from_dict( + "Node", v) for k, v in node_dict_final.items()} + return node_dict_final @func_time +@requires("tqdm") def read_poi(poi_file: str = "", cpu_cores: int = 1, verbose: bool = False) -> dict[int: POI]: """Read poi.csv file and return a dict of POIs. @@ -661,86 +832,30 @@ def read_poi(poi_file: str = "", cpu_cores: int = 1, verbose: bool = False) -> d print(f" : Reading poi.csv with specified columns: {poi_required_cols} \ \n and chunksize {chunk_size} for iterations...") try: - df_poi_chunk = pd.read_csv(poi_file, usecols=poi_required_cols, chunksize=chunk_size, encoding='utf-8') - except Exception: - df_poi_chunk = pd.read_csv(poi_file, usecols=poi_required_cols, chunksize=chunk_size, encoding='latin-1') - - # Parallel processing using Pool - if verbose: - print(f" : Parallel creating POIs using Pool with {cpu_cores} CPUs. Please wait...") - poi_dict_final = {} - - with Pool(cpu_cores) as pool: - results = pool.map(_create_poi_from_dataframe, df_poi_chunk) - - for poi_dict in results: - poi_dict_final.update(poi_dict) - - if verbose: - print(f" : Successfully loaded poi.csv: {len(poi_dict_final)} POIs loaded.") - - return poi_dict_final - - -@func_time -def read_link(link_file: str = "", cpu_cores: int = -1, verbose: bool = False) -> dict[int: Link]: - """Read link.csv file and return a dict of Links. - - Args: - link_file (str): The link.csv file path. default is "". - cpu_cores (int, optional): number of cpu cores for parallel processing. Defaults to -1. - verbose (bool, optional): print processing information. Defaults to False. - - Raises: - FileNotFoundError: File: {link_file} does not exist. - ValueError: cpu_cores should be integer, but got {type(cpu_cores)} + # Get total rows in poi.csv and calculate total chunks + total_rows = sum(1 for _ in open(poi_file)) - 1 # Exclude header row + total_chunks = total_rows // chunk_size + 1 - Returns: - dict: A dict of Links. - - Examples: - >>> from pyufunc import gmns_read_link - >>> link_dict = gmns_read_link(link_file = r"../dataset/ASU/link.csv") - >>> link_dict[1] - Link(id=1, name='A', from_node_id=1, to_node_id=2, length=0.0, lanes=1, dir_flag=1, free_speed=0.0, capacity=0.0, link_type=1, link_type_name='motorway', geometry='LINESTRING (0 0, 1 1)') - """ - - # convert path to linux path - link_file = path2linux(link_file) - - # check link file - if not os.path.exists(link_file): - raise FileNotFoundError(f"File: {link_file} does not exist.") - - # check cpu_cores - if not isinstance(cpu_cores, int): - raise ValueError(f"cpu_cores should be integer, but got {type(cpu_cores)}") - - if cpu_cores == -1: - cpu_cores = config_gmns["set_cpu_cores"] - - # Read link.csv with specified columns and chunksize for iterations - link_required_cols = config_gmns["link_fields"] - chunk_size = config_gmns["data_chunk_size"] - - print("columns: ", link_required_cols) - if verbose: - print(f" : Reading link.csv with specified columns: {link_required_cols} \ - \n and chunksize {chunk_size} for iterations...") - try: - df_link_chunk = pd.read_csv( - link_file, usecols=link_required_cols, chunksize=chunk_size, encoding='utf-8') + df_poi_chunk = pd.read_csv( + poi_file, usecols=poi_required_cols, chunksize=chunk_size, encoding='utf-8') except Exception: - df_link_chunk = pd.read_csv( - link_file, usecols=link_required_cols, chunksize=chunk_size, encoding='latin-1') + df_poi_chunk = pd.read_csv( + poi_file, usecols=poi_required_cols, chunksize=chunk_size, encoding='latin-1') # Parallel processing using Pool if verbose: print(f" : Parallel creating POIs using Pool with {cpu_cores} CPUs. Please wait...") + poi_dict_final = {} with Pool(cpu_cores) as pool: - results = pool.map(_create_link_from_dataframe, df_link_chunk) + # results = pool.map(_create_poi_from_dataframe, df_poi_chunk) + results = list( + tqdm(pool.imap(_create_poi_from_dataframe, df_poi_chunk), total=total_chunks)) + pool.close() + pool.join() + + # results = process_map(_create_poi_from_dataframe, df_poi_chunk, max_workers=cpu_cores) for poi_dict in results: poi_dict_final.update(poi_dict) @@ -748,10 +863,14 @@ def read_link(link_file: str = "", cpu_cores: int = -1, verbose: bool = False) - if verbose: print(f" : Successfully loaded poi.csv: {len(poi_dict_final)} POIs loaded.") + poi_dict_final = {k: create_dataclass_from_dict( + "POI", v) for k, v in poi_dict_final.items()} + return poi_dict_final -# @func_time +@func_time +@requires("tqdm") def read_zone_by_geometry(zone_file: str = "", cpu_cores: int = 1, verbose: bool = False) -> dict[int: Zone]: """Read zone.csv file and return a dict of Zones. @@ -792,15 +911,20 @@ def read_zone_by_geometry(zone_file: str = "", cpu_cores: int = 1, verbose: bool Please make sure you have {zone_required_cols} in zone.csv.") # load zone.csv with specified columns and chunksize for iterations - df_zone_chunk = pd.read_csv(zone_file, usecols=zone_required_cols, chunksize=chunk_size) + df_zone_chunk = pd.read_csv( + zone_file, usecols=zone_required_cols, chunksize=chunk_size) # Parallel processing using Pool if verbose: print(f" : Parallel creating Zones using Pool with {cpu_cores} CPUs. Please wait...") + zone_dict_final = {} with Pool(cpu_cores) as pool: - results = pool.map(_create_zone_from_dataframe_by_geometry, df_zone_chunk) + results = pool.map( + _create_zone_from_dataframe_by_geometry, df_zone_chunk) + pool.close() + pool.join() for zone_dict in results: zone_dict_final.update(zone_dict) @@ -808,10 +932,14 @@ def read_zone_by_geometry(zone_file: str = "", cpu_cores: int = 1, verbose: bool if verbose: print(f" : Successfully loaded zone.csv: {len(zone_dict_final)} Zones loaded.") + zone_dict_final = {k: create_dataclass_from_dict( + "POI", v) for k, v in zone_dict_final.items()} + return zone_dict_final -# @func_time +@func_time +@requires("tqdm") def read_zone_by_centroid(zone_file: str = "", cpu_cores: int = 1, verbose: bool = False) -> dict[int: Zone]: """Read zone.csv file and return a dict of Zones. @@ -852,7 +980,8 @@ def read_zone_by_centroid(zone_file: str = "", cpu_cores: int = 1, verbose: bool Please make sure you have {zone_required_cols} in zone.csv.") # load zone.csv with specified columns and chunksize for iterations - df_zone_chunk = pd.read_csv(zone_file, usecols=zone_required_cols, chunksize=chunk_size) + df_zone_chunk = pd.read_csv( + zone_file, usecols=zone_required_cols, chunksize=chunk_size) # Parallel processing using Pool if verbose: @@ -861,7 +990,10 @@ def read_zone_by_centroid(zone_file: str = "", cpu_cores: int = 1, verbose: bool zone_dict_final = {} with Pool(cpu_cores) as pool: - results = pool.map(_create_zone_from_dataframe_by_centroid, df_zone_chunk) + results = pool.map( + _create_zone_from_dataframe_by_centroid, df_zone_chunk) + pool.close() + pool.join() for zone_dict in results: zone_dict_final.update(zone_dict) @@ -869,9 +1001,82 @@ def read_zone_by_centroid(zone_file: str = "", cpu_cores: int = 1, verbose: bool if verbose: print(f" : Successfully loaded zone.csv: {len(zone_dict_final)} Zones loaded.") + zone_dict_final = {k: create_dataclass_from_dict( + "POI", v) for k, v in zone_dict_final.items()} + return zone_dict_final +@func_time +def read_link(link_file: str = "", cpu_cores: int = -1, verbose: bool = False) -> dict[int: Link]: + """Read link.csv file and return a dict of Links. + + Args: + link_file (str): The link.csv file path. default is "". + cpu_cores (int, optional): number of cpu cores for parallel processing. Defaults to -1. + verbose (bool, optional): print processing information. Defaults to False. + + Raises: + FileNotFoundError: File: {link_file} does not exist. + ValueError: cpu_cores should be integer, but got {type(cpu_cores)} + + Returns: + dict: A dict of Links. + + Examples: + >>> from pyufunc import gmns_read_link + >>> link_dict = gmns_read_link(link_file = r"../dataset/ASU/link.csv") + >>> link_dict[1] + Link(id=1, name='A', from_node_id=1, to_node_id=2, length=0.0, lanes=1, dir_flag=1, free_speed=0.0, + capacity=0.0, link_type=1, link_type_name='motorway', geometry='LINESTRING (0 0, 1 1)') + """ + + # convert path to linux path + link_file = path2linux(link_file) + + # check link file + if not os.path.exists(link_file): + raise FileNotFoundError(f"File: {link_file} does not exist.") + + # check cpu_cores + if not isinstance(cpu_cores, int): + raise ValueError(f"cpu_cores should be integer, but got {type(cpu_cores)}") + + if cpu_cores == -1: + cpu_cores = config_gmns["set_cpu_cores"] + + # Read link.csv with specified columns and chunksize for iterations + link_required_cols = config_gmns["link_fields"] + chunk_size = config_gmns["data_chunk_size"] + + print("columns: ", link_required_cols) + if verbose: + print(f" : Reading link.csv with specified columns: {link_required_cols} \ + \n and chunksize {chunk_size} for iterations...") + try: + df_link_chunk = pd.read_csv( + link_file, usecols=link_required_cols, chunksize=chunk_size, encoding='utf-8') + except Exception: + df_link_chunk = pd.read_csv( + link_file, usecols=link_required_cols, chunksize=chunk_size, encoding='latin-1') + + # Parallel processing using Pool + if verbose: + print(f" : Parallel creating POIs using Pool with {cpu_cores} CPUs. Please wait...") + poi_dict_final = {} + + with Pool(cpu_cores) as pool: + results = pool.map(_create_link_from_dataframe, df_link_chunk) + + for poi_dict in results: + poi_dict_final.update(poi_dict) + + if verbose: + print(f" : Successfully loaded poi.csv: {len(poi_dict_final)} POIs loaded.") + + return poi_dict_final + + @func_time def read_zone(zone_file: str = "", cpu_cores: int = -1, verbose: bool = False) -> dict[int: Zone]: """Read zone.csv file and return a dict of Zones.