# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: 2023 Ahmed Sheta <ahmed.sheta@fau.de>
import os
import sys
from ast import literal_eval
import pandas as pd
from scipy import stats
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import (
MinMaxScaler,
MultiLabelBinarizer,
Normalizer,
OneHotEncoder,
RobustScaler,
StandardScaler,
)
current_dir = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd()
parent_dir = os.path.join(current_dir, "..")
sys.path.append(parent_dir)
from database import get_database
from logger import get_logger
sys.path.append(current_dir)
log = get_logger()
[docs]
class Preprocessing:
def __init__(self, filter_null_data=True, historical_bool=True):
data_repo = get_database()
self.data_path = data_repo.get_enriched_data_path(historical=historical_bool)
self.preprocessed_df = None
self.preprocessed_data_output_path = data_repo.get_preprocessed_data_path(
historical_bool
)
self.filter_bool = filter_null_data
# columns that would be added later after one-hot encoding each class
self.added_features = []
self.numerical_data = [
"google_places_rating",
"google_places_user_ratings_total",
"google_places_confidence",
"reviews_sentiment_score",
"review_avg_grammatical_score",
"review_polarization_score",
"review_highest_rating_ratio",
"review_lowest_rating_ratio",
"review_rating_trend",
"regional_atlas_pop_density",
"regional_atlas_pop_development",
"regional_atlas_age_0",
"regional_atlas_age_1",
"regional_atlas_age_2",
"regional_atlas_age_3",
"regional_atlas_age_4",
"regional_atlas_pop_avg_age",
"regional_atlas_per_service_sector",
"regional_atlas_per_trade",
"regional_atlas_employment_rate",
"regional_atlas_unemployment_rate",
"regional_atlas_per_long_term_unemployment",
"regional_atlas_investments_p_employee",
"regional_atlas_gross_salary_p_employee",
"regional_atlas_disp_income_p_inhabitant",
"regional_atlas_tot_income_p_taxpayer",
"regional_atlas_gdp_p_employee",
"regional_atlas_gdp_development",
"regional_atlas_gdp_p_inhabitant",
"regional_atlas_gdp_p_workhours",
"regional_atlas_pop_avg_age_zensus",
"regional_atlas_regional_score",
]
# numerical data that need scaling
self.data_to_scale = []
# categorical data that needs one-hot encoding
self.categorical_data = [
# "number_country",
# "number_area",
"google_places_detailed_type",
"review_polarization_type",
]
self.class_labels = "MerchantSizeByDPV"
[docs]
def filter_out_null_data(self):
self.preprocessed_df = self.preprocessed_df[
self.preprocessed_df["google_places_rating"].notnull()
]
[docs]
def fill_missing_values(self, column, strategy="constant"):
if (
column in self.preprocessed_df.columns
and not self.preprocessed_df[column].empty
):
imputer = SimpleImputer(strategy=strategy)
self.preprocessed_df[column] = imputer.fit_transform(
self.preprocessed_df[[column]]
)
else:
log.info(f"The column '{column}' does not exist in the DataFrame.")
return self.preprocessed_df
[docs]
def standard_scaling(self, column):
# scales the data in such that the mean of the data becomes 0 and the standard deviation becomes 1.
if column in self.preprocessed_df.columns:
scaler = StandardScaler()
self.preprocessed_df[column] = scaler.fit_transform(
self.preprocessed_df[[column]]
)
return self.preprocessed_df
[docs]
def min_max_scaling(self, column):
# scales the data to a given range, usually between 0 and 1.
if column in self.preprocessed_df.columns:
scaler = MinMaxScaler()
self.preprocessed_df[column] = scaler.fit_transform(
self.preprocessed_df[[column]]
)
return self.preprocessed_df
[docs]
def robust_scaling(self, column):
if column in self.preprocessed_df.columns:
scaler = RobustScaler()
self.preprocessed_df[column] = scaler.fit_transform(
self.preprocessed_df[[column]]
)
return self.preprocessed_df
[docs]
def normalization(self, column):
if column in self.preprocessed_df.columns:
scaler = Normalizer()
self.preprocessed_df[column] = scaler.fit_transform(
self.preprocessed_df[[column]]
)
return self.preprocessed_df
[docs]
def remove_outliers_zscore(self, column):
THRESHOLD = 3
z_scores = stats.zscore(self.preprocessed_df[[column]])
self.preprocessed_df[column] = self.preprocessed_df[
(z_scores < THRESHOLD) & (z_scores > -1 * THRESHOLD)
]
return self.preprocessed_df
[docs]
def class_label_encoding(self, column):
size_mapping = {"XS": 0, "S": 1, "M": 2, "L": 3, "XL": 4}
if column in self.preprocessed_df.columns:
self.preprocessed_df[column] = self.preprocessed_df[column].map(
size_mapping
)
else:
log.info(f"Class labels {column} does not exist in the dataframe!")
return self.preprocessed_df
[docs]
def single_one_hot_encoding(self, column):
# one-hot encoding categorical data and creating columns for the newly created classes
if column in self.preprocessed_df.columns:
data_to_encode = self.preprocessed_df[[column]].fillna("").astype(str)
encoder = OneHotEncoder(sparse=False)
encoded_data = encoder.fit_transform(data_to_encode)
encoded_columns = encoder.get_feature_names_out([column])
self.added_features.extend(encoded_columns)
encoded_df = pd.DataFrame(
encoded_data, columns=encoded_columns, index=self.preprocessed_df.index
)
self.preprocessed_df = pd.concat([self.preprocessed_df, encoded_df], axis=1)
else:
log.info(f"The column '{column}' does not exist in the DataFrame.")
return self.preprocessed_df
[docs]
def multiple_label_encoding(self, column):
if column in self.preprocessed_df.columns:
# one-hot encoding for the columns that has multiple labels as element
self.preprocessed_df[column].fillna("", inplace=True)
self.preprocessed_df[column] = self.preprocessed_df[column].apply(
lambda x: literal_eval(x) if x != "" else []
)
mlb = MultiLabelBinarizer()
encoded_data = mlb.fit_transform(self.preprocessed_df[column])
self.added_features.extend(mlb.classes_)
if self.filter_bool:
encoded_df = pd.DataFrame(
encoded_data, columns=mlb.classes_, index=self.preprocessed_df.index
)
else:
encoded_df = pd.DataFrame(encoded_data, columns=mlb.classes_)
self.preprocessed_df = pd.concat([self.preprocessed_df, encoded_df], axis=1)
else:
log.info(f"The column '{column}' does not exist in the DataFrame.")
return self.preprocessed_df
[docs]
def implement_preprocessing_pipeline(self):
if self.filter_bool:
self.filter_out_null_data()
for data_column in self.numerical_data:
self.preprocessed_df = self.fill_missing_values(data_column)
if data_column in self.data_to_scale:
self.preprocessed_df = self.robust_scaling(data_column)
for data_column in self.categorical_data:
if data_column == "google_places_detailed_type":
continue
try:
self.preprocessed_df = self.single_one_hot_encoding(data_column)
except ValueError as e:
log.error(
f"Failed to one-hot encode data type ({data_column})! Error: {e}"
)
try:
self.preprocessed_df = self.multiple_label_encoding(
"google_places_detailed_type"
)
except ValueError as e:
log.error(
f"Failed to one-hot encode data type 'google_places_detailed_type'! Error: {e}"
)
try:
self.preprocessed_df = self.class_label_encoding(self.class_labels)
except ValueError as e:
log.error(f"Failed to label the classes '{self.class_labels}'! Error: {e}")
log.info("Preprocessing complete!")
return self.preprocessed_df
[docs]
def save_preprocessed_data(self):
columns_to_save = []
columns_to_save.extend(self.numerical_data)
columns_to_save.extend(self.added_features)
columns_to_save.append(self.class_labels)
selected_df = pd.DataFrame()
try:
for column in columns_to_save:
if column in self.preprocessed_df.columns:
selected_df[column] = self.preprocessed_df[column]
except ValueError as e:
log.error(f"Failed to save the selected columns for preprocessing! {e}")
try:
selected_df.to_csv(self.preprocessed_data_output_path, index=False)
log.info(
f"Preprocessed dataframe of shape {self.preprocessed_df.shape} is saved at {self.preprocessed_data_output_path}"
)
except ValueError as e:
log.error(f"Failed to save preprocessed data file! {e}")