Source code for bdc.steps.regionalatlas

# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: 2023 Felix Zailskas <felixzailskas@gmail.com>
# SPDX-FileCopyrightText: 2023 Fabian-Paul Utech <f.utech@gmx.net>


import geopandas as gpd
import osmnx
import pandas as pd
from pandas import DataFrame
from tqdm import tqdm

from bdc.steps.helpers import get_lead_hash_generator
from bdc.steps.step import Step, StepError
from logger import get_logger

log = get_logger()


[docs] class RegionalAtlas(Step): """ The RegionalAtlas step will query the RegionalAtlas database for location based geographic and demographic information, based on the address that was found for a business (currently through Google API) or the area provided by the phonenumber (preprocess_phonenumbers.py). Attributes: name: Name of this step, used for logging reagionalatlas_feature_keys: Dictionary to translate between the keys in the merged.geojson and the used column names in the df df_fields: the keys of the merged.geojson added_cols: List of fields that will be added to the main dataframe by executing this step required_cols: List of fields that are required in the input dataframe before performing this step regions_gdfs: dataframe that includes all keys/values from the merged.geojson empty_result: empty result that will be used in case there are problems with the data epsg_code_etrs: 25832 is the standard used by RegionAtlas Added Columns: pop_density (float): Population density of the searched city pop_development (float): Population development of the searched city age_0 (float): Population age group 0-18 of the searched city age_1 (float): Population age group 18-30 of the searched city age_2 (float): Population age group 30-45 of the searched city age_3 (float): Population age group 45-60 of the searched city age_4 (float): Population age group 60+ of the searched city pop_avg_age (float): Average age of the searched city per_service_sector (float): Percentage of the service sector of the searched city per_trade (float): Percentage of the trade sector of the searched city employment_rate (float): Employment rate of the searched city unemployment_rate (float): Unemployment rate of the searched city per_long_term_unemployment (float): Percentage of long term unemployment of the searched city investments_p_employee (float): Investments per employee of the searched city gross_salary_p_employee (float): Gross salary per employee of the searched city disp_income_p_inhabitant (float): Disposable income per inhabitant of the searched city tot_income_p_taxpayer (float): Total income per taxpayer of the searched city gdp_p_employee (float): GDP per employee of the searched city gdp_development (float): GDP development of the searched city gdp_p_inhabitant (float): GDP per inhabitant of the searched city gdp_p_workhours (float): GDP per workhour of the searched city pop_avg_age_zensus (float): Average age of the searched city (zensus) unemployment_rate (float): Unemployment rate of the searched city (zensus) regional_score (float): Regional score of the searched city """ name: str = "Regional_Atlas" reagionalatlas_feature_keys: dict = { "pop_density": "ai0201", "pop_development": "ai0202", "age_0": "ai0203", "age_1": "ai0204", "age_2": "ai0205", "age_3": "ai0206", "age_4": "ai0207", "pop_avg_age": "ai0218", "per_service_sector": "ai0706", "per_trade": "ai0707", "employment_rate": "ai0710", "unemployment_rate": "ai0801", "per_long_term_unemployment": "ai0808", "investments_p_employee": "ai1001", "gross_salary_p_employee": "ai1002", "disp_income_p_inhabitant": "ai1601", "tot_income_p_taxpayer": "ai1602", "gdp_p_employee": "ai1701", "gdp_development": "ai1702", "gdp_p_inhabitant": "ai1703", "gdp_p_workhours": "ai1704", "pop_avg_age_zensus": "ai_z01", "unemployment_rate": "ai_z08", } df_fields: list[str] = reagionalatlas_feature_keys.values() # Weirdly the expression [f"{name}_{field}" for field in df_fields] gives an error as name is not in the scope of the iterator added_cols = [ name + field for (name, field) in zip( [f"{name.lower()}_"] * (len(df_fields)), ([f"{field}" for field in reagionalatlas_feature_keys.keys()]), ) ] + [f"{name.lower()}_regional_score"] required_cols = ["google_places_formatted_address"] regions_gdfs = gpd.GeoDataFrame() empty_result: dict = dict.fromkeys(reagionalatlas_feature_keys.values()) # Adjust the EPSG code from the osmnx search query to the regionalatlas specific code # epsg_code 4326 [WGS 84 (used by osmnx)]=> epsg_code_etrs = 25832 [ETRS89 / UTM zone 32N (used by regionalatlas)] epsg_code_etrs = 25832
[docs] def load_data(self) -> None: pass
[docs] def verify(self) -> bool: # Load the data file try: self.regions_gdfs = gpd.read_file("data/merged_geo.geojson") except: raise StepError( "The path for the geojson for regional information (Regionalatlas) is not valid!" ) return super().verify()
[docs] def run(self) -> DataFrame: tqdm.pandas(desc="Getting social data") # Add the new fields to the df self.df[self.added_cols[:-1]] = self.df.progress_apply( lambda lead: pd.Series( get_lead_hash_generator().hash_check( lead, self.get_data_from_address, self.name + "_Location-Data", self.added_cols[:-1], lead, ) ), axis=1, ) # self.df[self.added_cols[:-1]] = self.df.progress_apply( # lambda lead: pd.Series(self.get_data_from_address(lead)), axis=1 # ) tqdm.pandas(desc="Computing Regional Score") self.df[self.added_cols[-1:]] = self.df.progress_apply( lambda lead: pd.Series( get_lead_hash_generator().hash_check( lead, self.calculate_regional_score, self.name + "_Regional-Score", self.added_cols[-1:], lead, ) ), axis=1, ) return self.df
[docs] def finish(self) -> None: success_rate = ( 1 - self.df["regional_atlas_pop_density"].isna().sum() / len(self.df["regional_atlas_pop_density"]) ) * 100 log.info( "Percentage of regional information (germany): {:.2f}%".format( round(success_rate, 2) ) )
[docs] def get_data_from_address(self, row): """ Retrieve the regional features for every lead. Every column of reagionalatlas_feature_keys is added. Based on the google places address or the phonenumber area. Checks if the centroid of the searched city is in a RegionalAtlas region. Possible extensions could include: - More RegionalAtlas features :param row: Lead for which to retrieve the features :return: dict - The retrieved features if the necessary fields are present for the lead. Empty dictionary otherwise. """ # can only get an result if we know the region if ( row["google_places_formatted_address"] is None and row["number_area"] is None ): return self.empty_result country = "" # the phone number has secondary priority (because it can be a private number), therefore can be overwritten by the google places information if row["number_country"] is not None: country = row["number_country"] if row["google_places_formatted_address"] is not None: google_location = str(row["google_places_formatted_address"]).split(",")[ -2: ] google_location = [name.strip() for name in google_location] country = google_location[-1].lower() # the 'regionalatlas' data is specific to germany if country not in [ "deutschland", "germany", "allemagne", "tyskland", "germania", ]: return self.empty_result """#Alternative to the if 'if country not in ...' if not self.germany_gdf.intersects(row_gdf): return self.empty_result""" # Get the polygon of the city, to find the corresponding region try: if row["google_places_formatted_address"] is not None: search_gdf = osmnx.geocode_to_gdf(",".join(google_location)) else: # at this point we know, that either a google_places_address exists or a number_area search_gdf = osmnx.geocode_to_gdf(row["number_area"]) except: log.info("Google location not found!") return self.empty_result search_gdf_reprojected = search_gdf.to_crs("EPSG:" + str(self.epsg_code_etrs)) # Use the centroid of the city, to check if a region search_centroid = search_gdf_reprojected.centroid area_key = None return_values = {} # go through all regions of germany ... for idx, region in self.regions_gdfs.iterrows(): if area_key is not None: if region["schluessel"] != area_key: continue else: return_values.update( region[self.reagionalatlas_feature_keys.values()].to_dict() ) break else: region_polygon = region["geometry"] b_contains = region_polygon.contains(search_centroid).item() if b_contains: area_key = region["schluessel"] return_values.update( region[self.reagionalatlas_feature_keys.values()].to_dict() ) break return return_values
[docs] def calculate_regional_score(self, lead) -> float | None: """ Calculate a regional score for a lead based on information from the RegionalAtlas API. This function uses population density, employment rate, and average income to compute the buying power of potential customers in the area in millions of euro. The score is computed as: (population density * employment rate * average income) / 1,000,000 Possible extensions could include: - Population age groups :param lead: Lead for which to compute the score :return: float | None - The computed score if the necessary fields are present for the lead. None otherwise. """ pop_density_col = f"{self.name.lower()}_pop_density" employment_rate_col = f"{self.name.lower()}_employment_rate" income_col = f"{self.name.lower()}_disp_income_p_inhabitant" pop_density = lead[pop_density_col] employment_rate = lead[employment_rate_col] income_per_inhabitant = lead[income_col] pop_density = pop_density if pd.notnull(pop_density) else 0 employment_rate = employment_rate if pd.notnull(employment_rate) else 0 income_per_inhabitant = ( income_per_inhabitant if pd.notnull(income_per_inhabitant) else 0 ) regional_score = ( pop_density * employment_rate * income_per_inhabitant ) / 1000000 if pd.notnull(regional_score): return regional_score else: raise ValueError("Regional score is null")