Source code for demo.demos

# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: 2023 Lucca Baumgärtner <lucca.baumgaertner@fau.de>
# SPDX-FileCopyrightText: 2023 Sophie Heasman <sophieheasmann@gmail.com>
# SPDX-FileCopyrightText: 2023 Felix Zailskas <felixzailskas@gmail.com>
# SPDX-FileCopyrightText: 2023 Fabian-Paul Utech  <f.utech@gmx.net>
# SPDX-FileCopyrightText: 2023 Ruchita Nathani <Ruchita.nathani@fau.de>
# SPDX-FileCopyrightText: 2023 Ahmed Sheta <ahmed.sheta@fau.de>


import re
import warnings

import pandas as pd
import xgboost as xgb
from sklearn.metrics import classification_report

from bdc.pipeline import Pipeline
from config import DATABASE_TYPE
from database import get_database
from demo.console_utils import (
    get_int_input,
    get_multiple_choice,
    get_string_input,
    get_yes_no_input,
)
from demo.pipeline_utils import (
    get_all_available_pipeline_json_configs,
    get_pipeline_additional_steps,
    get_pipeline_config_from_json,
    get_pipeline_initial_steps,
)
from evp import EstimatedValuePredictor
from evp.predictors import MerchantSizeByDPV, Predictors
from logger import get_logger
from preprocessing import Preprocessing

warnings.simplefilter(action="ignore", category=pd.errors.PerformanceWarning)
warnings.simplefilter(action="ignore", category=FutureWarning)


log = get_logger()

# Constants and configurations
LEADS_TRAIN_FILE = "data/leads_train.csv"
LEADS_TEST_FILE = "data/leads_test.csv"
INPUT_FILE_BDC = "../data/sumup_leads_email.csv"
OUTPUT_FILE_BDC = "../data/collected_data.json"


# evp demo
[docs] def evp_demo(): data = get_database().load_preprocessed_data() model_type_choices = [e for e in Predictors] print("Which model type do you want to load") for i, p in enumerate(Predictors): print(f"({i}) : {p.value}") choice = get_int_input("", range(0, len(model_type_choices))) model_type = model_type_choices[choice] model_name = None if get_yes_no_input("Load model from file? (y/N)\n"): model_name = get_string_input("Provide model file name\n") limit_classes = False if get_yes_no_input( "Use 3 classes ({XS}, {S, M, L}, {XL}) instead of 5 classes ({XS}, {S}, {M}, {L}, {XL})? (y/N)\n" ): limit_classes = True feature_subsets = [ ["Include all features"], [ "google_places_rating", "google_places_user_ratings_total", "google_places_confidence", "regional_atlas_regional_score", ], ] print("Do you want to train on a subset of features?") for i, p in enumerate(feature_subsets): print(f"({i}) : {p}") feature_choice = get_int_input("", range(0, len(feature_subsets))) feature_choice = None if feature_choice == 0 else feature_subsets[feature_choice] evp = EstimatedValuePredictor( data=data, model_type=model_type, model_name=model_name, limit_classes=limit_classes, selected_features=feature_choice, ) while True: choice = get_int_input( "(1) Train\n(2) Test\n(3) Predict on single lead\n(4) Save model\n(5) Exit\n", range(1, 6), ) if choice == 1: evp.train() elif choice == 2: test_evp_model(evp) elif choice == 3: predict_single_lead(evp) elif choice == 4: evp.save_model() elif choice == 5: break else: print("Invalid choice")
[docs] def test_evp_model(evp: EstimatedValuePredictor): predictions = evp.predict(evp.X_test) if len(predictions) == 1 and predictions[0] == MerchantSizeByDPV.Invalid: log.info("Untrained model results in no displayable data") return true_labels = evp.y_test print(classification_report(true_labels, predictions))
[docs] def predict_single_lead(evp: EstimatedValuePredictor): leads = evp.X_test lead_id = get_int_input( f"Choose a lead_id in range [0, {len(leads) - 1}]\n", range(len(leads)) ) if 0 <= lead_id < len(leads): prediction = evp.predict([leads[lead_id]]) if prediction[0] == MerchantSizeByDPV.Invalid: log.info("Untrained model results in no displayable data") return print( f"Lead has predicted value of {prediction} and true value of {evp.y_test[lead_id]}" ) else: print("Invalid Choice")
[docs] def add_step_if_requested(steps, step_class, step_desc, step_warning_message: str = ""): if get_yes_no_input(f"Run {step_desc} {step_warning_message}(y/N)?\n"): force = get_yes_no_input("Force execution if data is present? (y/N)\n") steps.append(step_class(force_refresh=force))
# pipeline_demo
[docs] def pipeline_demo(): """ Demonstrates the execution of a pipeline. The function prompts the user to select a pipeline configuration or create a custom one. It then sets a limit for the number of data points to be processed, if specified. Finally, it runs the pipeline with the selected configuration and limit. Args: None Returns: None """ continue_with_custom_config = True if get_yes_no_input(f"Do you want to list all available pipeline configs? (y/N)\n"): # Create the formatted string using list comprehension and join all_pipeline_configs = get_all_available_pipeline_json_configs() if len(all_pipeline_configs) > 0: prompt = "Please enter the index of requested pipeline config:\n" choices = all_pipeline_configs + ["Exit"] choice = get_multiple_choice(prompt, choices) if choice != "Exit": steps = get_pipeline_config_from_json(config_name=choice) continue_with_custom_config = False else: print("Exiting...\n") else: print("No pipeline configs found.\n") if continue_with_custom_config: print("Continuing with custom pipeline config...\n\n") steps = [] # get default steps and optional steps attrs initial_steps_attr = get_pipeline_initial_steps() additional_steps_attr = get_pipeline_additional_steps() # create step instances from default steps attrs and add them to steps list for step_class, desc, warning_message in initial_steps_attr: steps.append(step_class(force_refresh=True)) # add optional steps to steps list if requested for step_class, desc, warning_message in additional_steps_attr: add_step_if_requested(steps, step_class, desc, warning_message) limit = get_int_input("Set limit for data points to be processed (0=No limit)\n") limit = limit if limit > 0 else None if ( limit is not None and get_database().DF_OUTPUT == "s3://amos--data--events/leads/enriched.csv" ): if get_yes_no_input( f"The output cannot be limited when uploading to {get_database().DF_OUTPUT}.\nThe limit will be removed, and the pipeline will be executed on the full database.\n\nWould you like to continue? (y/n)\n" ): limit = None else: return steps_info = "\n".join([str(step) for step in steps]) log.info( f"Running Pipeline with steps:\n{steps_info}\ninput_location={get_database().get_input_path()}\noutput_location={get_database().get_enriched_data_path()}" ) pipeline = Pipeline( steps=steps, limit=limit, ) pipeline.run()
[docs] def preprocessing_demo(): if get_yes_no_input("Filter out the API-irrelevant data? (y/n)\n"): filter_bool = True else: filter_bool = False if get_yes_no_input( "Run on historical data ? (y/n)\n'n' means it will run on lead data!\n" ): historical_bool = True else: historical_bool = False preprocessor = Preprocessing( filter_null_data=filter_bool, historical_bool=historical_bool ) preprocessor.preprocessed_df = pd.read_csv(preprocessor.data_path) df = preprocessor.implement_preprocessing_pipeline() preprocessor.save_preprocessed_data()
[docs] def predict_MerchantSize_on_lead_data_demo(): import os import sys import pandas as pd log.info( "Note: In case of running locally, enriched data must be located at src/data/leads_enriched.csv\nIn case of running on S3, enriched data must be located at s3://amos--data--events/leads/enriched.csv" ) ######################### preprocessing the leads ################################## S3_bool = DATABASE_TYPE == "S3" current_dir = os.path.dirname(__file__) if "__file__" in locals() else os.getcwd() parent_dir = os.path.join(current_dir, "..") sys.path.append(parent_dir) from database import get_database from preprocessing import Preprocessing db = get_database() log.info(f"Preprocessing the leads...") preprocessor = Preprocessing(filter_null_data=False, historical_bool=False) preprocessor.preprocessed_df = pd.read_csv(preprocessor.data_path) df = preprocessor.implement_preprocessing_pipeline() preprocessor.save_preprocessed_data() ############################## adapting the preprocessing files ########################### log.info(f"Adapting the leads' preprocessed data for the ML model...") # load the data from the CSV files historical_preprocessed_data = db.load_preprocessed_data(historical=True) unlabeled_preprocessed_data = db.load_preprocessed_data(historical=False) historical_columns_order = historical_preprocessed_data.columns missing_columns = set(historical_columns_order) - set( unlabeled_preprocessed_data.columns ) unlabeled_preprocessed_data[list(missing_columns)] = 0 for column in unlabeled_preprocessed_data.columns: if column not in historical_columns_order: unlabeled_preprocessed_data = unlabeled_preprocessed_data.drop( column, axis=1 ) # reorder columns unlabeled_preprocessed_data = unlabeled_preprocessed_data[historical_columns_order] unlabeled_preprocessed_data.to_csv( preprocessor.preprocessed_data_output_path, index=False, ) log.info( f"Saving the adapted preprocessed data at {preprocessor.preprocessed_data_output_path}" ) # check if columns in both dataframe are in same order and same number assert list(unlabeled_preprocessed_data.columns) == list( historical_preprocessed_data.columns ), "Column names are different" ####################### Applying ML model on lead data #################################### bucket_name = "amos--models" if S3_bool: model_name = get_string_input( "Provide model file name in amos--models/models S3 Bucket\nInput example: lightgbm_epochs(1)_f1(0.6375)_numclasses(5)_model.pkl\n" ) else: model_name = get_string_input( "Provide model file name in data/models local directory\nInput example: lightgbm_epochs(1)_f1(0.6375)_numclasses(5)_model.pkl\n" ) model_name = model_name.strip() xgb_bool = False if model_name.lower().startswith("xgb"): xgb_bool = True def check_classification_task(string): match = re.search(r"numclasses\((\d+)\)", string) if match: last_number = int(match.group(1)) if last_number == 3: return True else: False classification_task_3 = check_classification_task(model_name) try: model = db.load_ml_model(model_name) log.info(f"Loaded the model {model_name}!") except: log.error("No model found with the given name!") return df = pd.read_csv(preprocessor.preprocessed_data_output_path) input = df.drop("MerchantSizeByDPV", axis=1) if xgb_bool: input = xgb.DMatrix(input) predictions = model.predict(input) if classification_task_3: size_mapping = {0: "XS", 1: "{S, M, L}", 2: "XL"} else: size_mapping = {0: "XS", 1: "S", 2: "M", 3: "L", 4: "XL"} remapped_predictions = [size_mapping[prediction] for prediction in predictions] enriched_data = pd.read_csv(preprocessor.data_path) # first 5 columns: Last Name,First Name,Company / Account,Phone,Email, raw_data = enriched_data.iloc[:, :5] raw_data["PredictedMerchantSize"] = remapped_predictions db.save_prediction(raw_data)