Source code for demo.pipeline_utils

# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: 2023 Berkay Bozkurt <resitberkaybozkurt@gmail.com>

import json
import os

from logger import get_logger

log = get_logger()

from bdc.steps import (
    AnalyzeEmails,
    GooglePlaces,
    GooglePlacesDetailed,
    GPTReviewSentimentAnalyzer,
    GPTSummarizer,
    HashGenerator,
    PreprocessPhonenumbers,
    RegionalAtlas,
    SearchOffeneRegister,
    SmartReviewInsightsEnhancer,
)

DEFAULT_PIPELINE_PATH = os.path.join(os.path.dirname(__file__), "pipeline_configs/")

STEP_STR_TO_CLASS = {
    "HashGenerator": HashGenerator,
    "AnalyzeEmails": AnalyzeEmails,
    "GooglePlaces": GooglePlaces,
    "GooglePlacesDetailed": GooglePlacesDetailed,
    "GPTReviewSentimentAnalyzer": GPTReviewSentimentAnalyzer,
    "GPTSummarizer": GPTSummarizer,
    "PreprocessPhonenumbers": PreprocessPhonenumbers,
    "RegionalAtlas": RegionalAtlas,
    "SearchOffeneRegister": SearchOffeneRegister,
    "SmartReviewInsightsEnhancer": SmartReviewInsightsEnhancer,
}

# Please do not write following lists! Use the functions below instead.
_additional_pipeline_steps = [
    (SearchOffeneRegister, "Search OffeneRegister", "(will take a long time)"),
    (PreprocessPhonenumbers, "Phone Number Validation", ""),
    (
        GooglePlaces,
        "Google API",
        "(will use token and generate cost!)",
    ),
    (
        GooglePlacesDetailed,
        "Google API Detailed",
        "(will use token and generate cost!)",
    ),
    (
        GPTReviewSentimentAnalyzer,
        "openAI GPT Sentiment Analyzer",
        "(will use token and generate cost!)",
    ),
    (
        GPTSummarizer,
        "openAI GPT Summarizer",
        "(will use token and generate cost!)",
    ),
    (
        SmartReviewInsightsEnhancer,
        "Smart Review Insights",
        "(will take looong time!)",
    ),
    (RegionalAtlas, "Regionalatlas", ""),
]

_initial_pipeline_steps = [
    (HashGenerator, "Hash Generator", ""),
    (AnalyzeEmails, "Analyze Emails", ""),
]
# Please do not write above lists! Use the functions below instead.


[docs] def get_pipeline_steps() -> list: """ Returns a copy of the pipeline steps, which includes both the initial pipeline steps and the additional pipeline steps. Returns: list: A copy of the pipeline steps. """ return (_initial_pipeline_steps + _additional_pipeline_steps).copy()
[docs] def get_pipeline_initial_steps() -> list: """ Returns a copy of the initial pipeline steps. Returns: list: A copy of the initial pipeline steps. """ return _initial_pipeline_steps.copy()
[docs] def get_pipeline_additional_steps() -> list: """ Returns a copy of the additional pipeline steps. Returns: list: A copy of the additional pipeline steps. """ return _additional_pipeline_steps.copy()
[docs] def get_all_available_pipeline_json_configs( config_path: str = DEFAULT_PIPELINE_PATH, ) -> list: """ Returns a list of all available pipeline json configs in the given path. :param config_path: Path to the pipeline json configs :return: List of all available pipeline json configs """ return [f for f in os.listdir(config_path) if f.endswith(".json")]
[docs] def get_pipeline_config_from_json( config_name: str, config_path: str = DEFAULT_PIPELINE_PATH ) -> list: """ Retrieves the pipeline configuration from a JSON file. Args: config_name (str): The name of the configuration file. config_path (str, optional): The path to the configuration file. Defaults to DEFAULT_PIPELINE_PATH. Returns: list: A list of pipeline steps. """ with open(os.path.join(config_path, config_name), "r") as f: steps_json = json.load(f) steps = [] for step in steps_json["config"]["steps"]: log.info(f"Adding step {step}") steps.append( (STEP_STR_TO_CLASS[step["name"]](force_refresh=step["force_refresh"])) ) return steps