database.leads package

Submodules

database.leads.local_repository module

class database.leads.local_repository.LocalRepository[source]

Bases: Repository

BASE_PATH = '/home/runner/work/amos2023ws06-sales-lead-qualifier/amos2023ws06-sales-lead-qualifier/src/database/leads'
CLASSIFICATION_REPORTS = '/home/runner/work/amos2023ws06-sales-lead-qualifier/amos2023ws06-sales-lead-qualifier/src/data/classification_reports'
DF_HISTORICAL_OUTPUT = '/home/runner/work/amos2023ws06-sales-lead-qualifier/amos2023ws06-sales-lead-qualifier/src/data/100k_historic_enriched.csv'
DF_INPUT = '/home/runner/work/amos2023ws06-sales-lead-qualifier/amos2023ws06-sales-lead-qualifier/src/data/sumup_leads_email.csv'
DF_OUTPUT = '/home/runner/work/amos2023ws06-sales-lead-qualifier/amos2023ws06-sales-lead-qualifier/src/data/leads_enriched.csv'
DF_PREDICTION_OUTPUT = '/home/runner/work/amos2023ws06-sales-lead-qualifier/amos2023ws06-sales-lead-qualifier/src/data/leads_predicted_size.csv'
DF_PREPROCESSED_INPUT = '/home/runner/work/amos2023ws06-sales-lead-qualifier/amos2023ws06-sales-lead-qualifier/src/data/preprocessed_data_files'
GPT_RESULTS = '/home/runner/work/amos2023ws06-sales-lead-qualifier/amos2023ws06-sales-lead-qualifier/src/data/gpt-results'
ML_MODELS = '/home/runner/work/amos2023ws06-sales-lead-qualifier/amos2023ws06-sales-lead-qualifier/src/data/models'
REVIEWS = '/home/runner/work/amos2023ws06-sales-lead-qualifier/amos2023ws06-sales-lead-qualifier/src/data/reviews'
SNAPSHOTS = '/home/runner/work/amos2023ws06-sales-lead-qualifier/amos2023ws06-sales-lead-qualifier/src/data/snapshots'
clean_snapshots(prefix)[source]

Clean up the snapshots after a pipeline ran successfully :param prefix: Prefix of the current pipeline run used to identify all snapshots to delete

create_snapshot(df, prefix, name)[source]

Snapshot the current state of the dataframe :param df: Data to create a snapshot of :param prefix: Prefix for a group of snapshots belonging to a singe pipeline run, used to identify snapshots when cleaning up after a pipeline run :param name: Name of the snapshot :return: None

fetch_gpt_result(file_id, operation_name)[source]

Fetches the GPT result for a given file ID and operation name.

Parameters:
  • file_id (str) – The ID of the file.

  • operation_name (str) – The name of the GPT operation.

Returns:

The GPT result for the specified file ID and operation name.

fetch_review(place_id)[source]

Fetch review for specified place_id :return: json contents of desired review

get_preprocessed_data_path(historical: bool = True)[source]

Returns the path for a preprocessed data file (either historical or current)

insert_data(data)[source]

TODO: Insert new data into specified dataframe :param data: Data to be inserted (desired format must be checked)

load_classification_report(model_name: str)[source]

Load a given classification report to a file with a given name

Parameters:

model_name (str) – Model name that created the report

load_lookup_table(step_name: str) dict[source]

Create or load the lookup table of hashes for a given step :return: lookup table as a pandas DataFrame

load_ml_model(model_name: str)[source]

Load a ML model from a file with a given name

Parameters:

model_name (str) – File name

load_preprocessed_data(historical: bool = True)[source]

Load the preprocessed data from the given file

save_classification_report(report, model_name: str)[source]

Save a given classification report to a file with a given name

Parameters:
  • report – The classification report to save

  • model_name (str) – Model name that created the report

save_dataframe()[source]

Save dataframe in df attribute in chosen output location

save_gpt_result(gpt_result, file_id, operation_name, force_refresh=False)[source]

Save the results of GPT operations to a specified path :param gpt_results: The results of the GPT operations to be saved :param operation_name: The name of the GPT operation :param save_date: The date the results were saved

save_lookup_table(lookup_table: dict, step_name: str) None[source]

Save the lookup table for hashes for a given step

save_ml_model(model, model_name: str)[source]

Save a given ML model to a file with a given name

Parameters:
  • model – Model to save

  • model_name (str) – File name

save_prediction(df)[source]

Save dataframe in df parameter in chosen output location

save_review(review, place_id, force_refresh=False)[source]

Upload review to specified review path :param review: json contents of the review to be uploaded

database.leads.repository module

class database.leads.repository.Repository[source]

Bases: ABC

DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'
abstract property DF_HISTORICAL_OUTPUT

Define database path to store historical enriched dataframe (used for preprocessing input)

abstract property DF_INPUT

Define database path to load dataframe

abstract property DF_OUTPUT

Define database path to store dataframe

abstract property GPT_RESULTS

Define database path to store GPT operations

abstract property REVIEWS

Define database path to store reviews

abstract property SNAPSHOTS

Define database path to store snapshots

abstract clean_snapshots(prefix)[source]

Clean up the snapshots after a pipeline ran successfully :param prefix: Prefix of the current pipeline run used to identify all snapshots to delete

abstract create_snapshot(df, prefix, name)[source]

Snapshot the current state of the dataframe :param df: Data to create a snapshot of :param prefix: Prefix for a group of snapshots belonging to a singe pipeline run, used to identify snapshots when cleaning up after a pipeline run :param name: Name of the snapshot :return: None

abstract fetch_gpt_result(file_id, operation_name)[source]

Fetches the GPT result for a given file ID and operation name.

Parameters:
  • file_id (str) – The ID of the file.

  • operation_name (str) – The name of the GPT operation.

Returns:

The GPT result for the specified file ID and operation name.

abstract fetch_review(place_id)[source]

Fetch review for specified place_id :return: json contents of desired review

get_dataframe()[source]
get_enriched_data_path(historical=False)[source]
get_input_path()[source]
abstract get_preprocessed_data_path(historical: bool = True)[source]

Returns the path for a preprocessed data file (either historical or current)

abstract insert_data(data)[source]

Insert new data into specified dataframe :param data: Data to be inserted (desired format must be checked)

abstract load_classification_report(model_name: str)[source]

Load a given classification report to a file with a given name

Parameters:

model_name (str) – Model name that created the report

abstract load_lookup_table(step_name: str) dict[source]

Create or load the lookup table of hashes for a given step :return: lookup table as a pandas DataFrame

abstract load_ml_model(model_name: str)[source]

Load a ML model from a file with a given name

Parameters:

model_name (str) – File name

abstract load_preprocessed_data(historical: bool = True)[source]

Load the preprocessed data from the given file

abstract save_classification_report(report, model_name: str)[source]

Save a given classification report to a file with a given name

Parameters:
  • report – The classification report to save

  • model_name (str) – Model name that created the report

abstract save_dataframe()[source]

Save dataframe in df attribute in chosen output location

abstract save_gpt_result(gpt_result, file_id, operation_name, force_refresh=False)[source]

Saves the GPT result for a given file ID and operation name.

Parameters:
  • gpt_result (str) – The GPT result to be saved.

  • file_id (str) – The ID of the file.

  • operation_name (str) – The name of the operation.

  • force_refresh (bool, optional) – Whether to force a refresh of the saved result. Defaults to False.

abstract save_lookup_table(lookup_table: dict, step_name: str) None[source]

Save the lookup table for hashes for a given step

abstract save_ml_model(model, model_name: str)[source]

Save a given ML model to a file with a given name

Parameters:
  • model – Model to save

  • model_name (str) – File name

abstract save_prediction(df)[source]

Save dataframe in df parameter in chosen output location

abstract save_review(review, place_id, force_refresh=False)[source]

Upload review to specified review path :param review: json contents of the review to be uploaded

set_dataframe(df)[source]

database.leads.s3_repository module

class database.leads.s3_repository.S3Repository[source]

Bases: Repository

CLASSIFICATION_REPORTS = 's3://amos--models/classification_reports/'
DF_HISTORICAL_OUTPUT = 's3://amos--data--events/historical_data/100k_historic_enriched.csv'
DF_INPUT = 's3://amos--data--events/leads/enriched.csv'
DF_OUTPUT = 's3://amos--data--events/leads/enriched.csv'
DF_PREDICTION_OUTPUT = 's3://amos--data--events/leads/leads_predicted_size.csv'
DF_PREPROCESSED_INPUT = 's3://amos--data--features/preprocessed_data_files/'
EVENTS_BUCKET = 'amos--data--events'
FEATURES_BUCKET = 'amos--data--features'
GPT_RESULTS = 's3://amos--data--events/gpt-results/'
LOOKUP_TABLES = 's3://amos--data--events/lookup_tables/'
ML_MODELS = 's3://amos--models/models/'
MODELS_BUCKET = 'amos--models'
REVIEWS = 's3://amos--data--events/reviews/'
SNAPSHOTS = 's3://amos--data--events/snapshots/'
clean_snapshots(prefix)[source]

Clean up the snapshots after a pipeline ran successfully :param prefix: Prefix of the current pipeline run used to identify all snapshots to delete

create_snapshot(df, prefix, name)[source]

Snapshot the current state of the dataframe :param df: Data to create a snapshot of :param prefix: Prefix for a group of snapshots belonging to a singe pipeline run, used to identify snapshots when cleaning up after a pipeline run :param name: Name of the snapshot :return: None

fetch_gpt_result(file_id, operation_name)[source]

Fetches the GPT result for a given file ID and operation name from S3

fetch_review(place_id)[source]

Fetch review for specified place_id :return: json contents of desired review

get_preprocessed_data_path(historical: bool = True)[source]

Returns the path for a preprocessed data file (either historical or current)

insert_data(data)[source]

TODO: Insert new data into specified dataframe :param data: Data to be inserted (desired format must be checked)

load_classification_report(model_name: str)[source]

Load a given classification report to a file with a given name

Parameters:

model_name (str) – Model name that created the report

load_lookup_table(step_name: str) dict[source]

Create or load the lookup table of hashes for a given step :return: lookup table as a pandas DataFrame

load_ml_model(model_name: str)[source]

Load a ML model from a file with a given name

Parameters:

model_name (str) – File name

load_preprocessed_data(historical: bool = True)[source]

Load the preprocessed data from the given file

save_classification_report(report, model_name: str)[source]

Save a given classification report to a file with a given name

Parameters:
  • report – The classification report to save

  • model_name (str) – Model name that created the report

save_dataframe()[source]

Save dataframe in df attribute in chosen output location

save_gpt_result(gpt_result, file_id, operation_name, force_refresh=False)[source]

Saves the GPT result for a given file ID and operation name on S3

save_lookup_table(lookup_table: dict, step_name: str) None[source]

Save the lookup table for hashes for a given step

save_ml_model(model, model_name: str)[source]

Save a given ML model to a file with a given name

Parameters:
  • model – Model to save

  • model_name (str) – File name

save_prediction(df)[source]

Save dataframe in df parameter in chosen output location

save_review(review, place_id, force_refresh=False)[source]

Upload review to specified review path :param review: json contents of the review to be uploaded

database.leads.s3_repository.decode_s3_url(url)[source]

Retrieve the bucket and object key from object url :return: bucket string, object key string

Module contents