# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: 2023 Felix Zailskas <felixzailskas@gmail.com>
# SPDX-FileCopyrightText: 2023 Fabian-Paul Utech <f.utech@gmx.net>
import geopandas as gpd
import osmnx
import pandas as pd
from pandas import DataFrame
from tqdm import tqdm
from bdc.steps.helpers import get_lead_hash_generator
from bdc.steps.step import Step, StepError
from logger import get_logger
log = get_logger()
[docs]
class RegionalAtlas(Step):
"""
The RegionalAtlas step will query the RegionalAtlas database for location based geographic and demographic
information, based on the address that was found for a business (currently through Google API) or the
area provided by the phonenumber (preprocess_phonenumbers.py).
Attributes:
name: Name of this step, used for logging
reagionalatlas_feature_keys: Dictionary to translate between the keys in the merged.geojson and the used column names in the df
df_fields: the keys of the merged.geojson
added_cols: List of fields that will be added to the main dataframe by executing this step
required_cols: List of fields that are required in the input dataframe before performing this step
regions_gdfs: dataframe that includes all keys/values from the merged.geojson
empty_result: empty result that will be used in case there are problems with the data
epsg_code_etrs: 25832 is the standard used by RegionAtlas
Added Columns:
pop_density (float): Population density of the searched city
pop_development (float): Population development of the searched city
age_0 (float): Population age group 0-18 of the searched city
age_1 (float): Population age group 18-30 of the searched city
age_2 (float): Population age group 30-45 of the searched city
age_3 (float): Population age group 45-60 of the searched city
age_4 (float): Population age group 60+ of the searched city
pop_avg_age (float): Average age of the searched city
per_service_sector (float): Percentage of the service sector of the searched city
per_trade (float): Percentage of the trade sector of the searched city
employment_rate (float): Employment rate of the searched city
unemployment_rate (float): Unemployment rate of the searched city
per_long_term_unemployment (float): Percentage of long term unemployment of the searched city
investments_p_employee (float): Investments per employee of the searched city
gross_salary_p_employee (float): Gross salary per employee of the searched city
disp_income_p_inhabitant (float): Disposable income per inhabitant of the searched city
tot_income_p_taxpayer (float): Total income per taxpayer of the searched city
gdp_p_employee (float): GDP per employee of the searched city
gdp_development (float): GDP development of the searched city
gdp_p_inhabitant (float): GDP per inhabitant of the searched city
gdp_p_workhours (float): GDP per workhour of the searched city
pop_avg_age_zensus (float): Average age of the searched city (zensus)
unemployment_rate (float): Unemployment rate of the searched city (zensus)
regional_score (float): Regional score of the searched city
"""
name: str = "Regional_Atlas"
reagionalatlas_feature_keys: dict = {
"pop_density": "ai0201",
"pop_development": "ai0202",
"age_0": "ai0203",
"age_1": "ai0204",
"age_2": "ai0205",
"age_3": "ai0206",
"age_4": "ai0207",
"pop_avg_age": "ai0218",
"per_service_sector": "ai0706",
"per_trade": "ai0707",
"employment_rate": "ai0710",
"unemployment_rate": "ai0801",
"per_long_term_unemployment": "ai0808",
"investments_p_employee": "ai1001",
"gross_salary_p_employee": "ai1002",
"disp_income_p_inhabitant": "ai1601",
"tot_income_p_taxpayer": "ai1602",
"gdp_p_employee": "ai1701",
"gdp_development": "ai1702",
"gdp_p_inhabitant": "ai1703",
"gdp_p_workhours": "ai1704",
"pop_avg_age_zensus": "ai_z01",
"unemployment_rate": "ai_z08",
}
df_fields: list[str] = reagionalatlas_feature_keys.values()
# Weirdly the expression [f"{name}_{field}" for field in df_fields] gives an error as name is not in the scope of the iterator
added_cols = [
name + field
for (name, field) in zip(
[f"{name.lower()}_"] * (len(df_fields)),
([f"{field}" for field in reagionalatlas_feature_keys.keys()]),
)
] + [f"{name.lower()}_regional_score"]
required_cols = ["google_places_formatted_address"]
regions_gdfs = gpd.GeoDataFrame()
empty_result: dict = dict.fromkeys(reagionalatlas_feature_keys.values())
# Adjust the EPSG code from the osmnx search query to the regionalatlas specific code
# epsg_code 4326 [WGS 84 (used by osmnx)]=> epsg_code_etrs = 25832 [ETRS89 / UTM zone 32N (used by regionalatlas)]
epsg_code_etrs = 25832
[docs]
def load_data(self) -> None:
pass
[docs]
def verify(self) -> bool:
# Load the data file
try:
self.regions_gdfs = gpd.read_file("data/merged_geo.geojson")
except:
raise StepError(
"The path for the geojson for regional information (Regionalatlas) is not valid!"
)
return super().verify()
[docs]
def run(self) -> DataFrame:
tqdm.pandas(desc="Getting social data")
# Add the new fields to the df
self.df[self.added_cols[:-1]] = self.df.progress_apply(
lambda lead: pd.Series(
get_lead_hash_generator().hash_check(
lead,
self.get_data_from_address,
self.name + "_Location-Data",
self.added_cols[:-1],
lead,
)
),
axis=1,
)
# self.df[self.added_cols[:-1]] = self.df.progress_apply(
# lambda lead: pd.Series(self.get_data_from_address(lead)), axis=1
# )
tqdm.pandas(desc="Computing Regional Score")
self.df[self.added_cols[-1:]] = self.df.progress_apply(
lambda lead: pd.Series(
get_lead_hash_generator().hash_check(
lead,
self.calculate_regional_score,
self.name + "_Regional-Score",
self.added_cols[-1:],
lead,
)
),
axis=1,
)
return self.df
[docs]
def finish(self) -> None:
success_rate = (
1
- self.df["regional_atlas_pop_density"].isna().sum()
/ len(self.df["regional_atlas_pop_density"])
) * 100
log.info(
"Percentage of regional information (germany): {:.2f}%".format(
round(success_rate, 2)
)
)
[docs]
def get_data_from_address(self, row):
"""
Retrieve the regional features for every lead. Every column of reagionalatlas_feature_keys is added.
Based on the google places address or the phonenumber area. Checks if the centroid of the
searched city is in a RegionalAtlas region.
Possible extensions could include:
- More RegionalAtlas features
:param row: Lead for which to retrieve the features
:return: dict - The retrieved features if the necessary fields are present for the lead. Empty dictionary otherwise.
"""
# can only get an result if we know the region
if (
row["google_places_formatted_address"] is None
and row["number_area"] is None
):
return self.empty_result
country = ""
# the phone number has secondary priority (because it can be a private number), therefore can be overwritten by the google places information
if row["number_country"] is not None:
country = row["number_country"]
if row["google_places_formatted_address"] is not None:
google_location = str(row["google_places_formatted_address"]).split(",")[
-2:
]
google_location = [name.strip() for name in google_location]
country = google_location[-1].lower()
# the 'regionalatlas' data is specific to germany
if country not in [
"deutschland",
"germany",
"allemagne",
"tyskland",
"germania",
]:
return self.empty_result
"""#Alternative to the if 'if country not in ...'
if not self.germany_gdf.intersects(row_gdf):
return self.empty_result"""
# Get the polygon of the city, to find the corresponding region
try:
if row["google_places_formatted_address"] is not None:
search_gdf = osmnx.geocode_to_gdf(",".join(google_location))
else: # at this point we know, that either a google_places_address exists or a number_area
search_gdf = osmnx.geocode_to_gdf(row["number_area"])
except:
log.info("Google location not found!")
return self.empty_result
search_gdf_reprojected = search_gdf.to_crs("EPSG:" + str(self.epsg_code_etrs))
# Use the centroid of the city, to check if a region
search_centroid = search_gdf_reprojected.centroid
area_key = None
return_values = {}
# go through all regions of germany ...
for idx, region in self.regions_gdfs.iterrows():
if area_key is not None:
if region["schluessel"] != area_key:
continue
else:
return_values.update(
region[self.reagionalatlas_feature_keys.values()].to_dict()
)
break
else:
region_polygon = region["geometry"]
b_contains = region_polygon.contains(search_centroid).item()
if b_contains:
area_key = region["schluessel"]
return_values.update(
region[self.reagionalatlas_feature_keys.values()].to_dict()
)
break
return return_values
[docs]
def calculate_regional_score(self, lead) -> float | None:
"""
Calculate a regional score for a lead based on information from the RegionalAtlas API.
This function uses population density, employment rate, and average income to compute
the buying power of potential customers in the area in millions of euro.
The score is computed as:
(population density * employment rate * average income) / 1,000,000
Possible extensions could include:
- Population age groups
:param lead: Lead for which to compute the score
:return: float | None - The computed score if the necessary fields are present for the lead. None otherwise.
"""
pop_density_col = f"{self.name.lower()}_pop_density"
employment_rate_col = f"{self.name.lower()}_employment_rate"
income_col = f"{self.name.lower()}_disp_income_p_inhabitant"
pop_density = lead[pop_density_col]
employment_rate = lead[employment_rate_col]
income_per_inhabitant = lead[income_col]
pop_density = pop_density if pd.notnull(pop_density) else 0
employment_rate = employment_rate if pd.notnull(employment_rate) else 0
income_per_inhabitant = (
income_per_inhabitant if pd.notnull(income_per_inhabitant) else 0
)
regional_score = (
pop_density * employment_rate * income_per_inhabitant
) / 1000000
if pd.notnull(regional_score):
return regional_score
else:
raise ValueError("Regional score is null")