Source code for bdc.pipeline

# SPDX-License-Identifier: MIT
# SPDX-FileCopyrightText: 2023 Lucca Baumgärtner <lucca.baumgaertner@fau.de>
# SPDX-FileCopyrightText: 2023 Sophie Heasman <sophieheasmann@gmail.com>
from datetime import datetime

import numpy as np

from bdc.steps.step import Step, StepError
from database import get_database
from logger import get_logger

log = get_logger()


[docs] class Pipeline: def __init__( self, steps, limit: int = None, ): self.steps: list[Step] = steps self.limit: int = limit self.df = get_database().get_dataframe() if limit is not None: self.df = self.df[:limit]
[docs] def run(self): run_id = datetime.now().strftime("%Y/%m/%d/%H%M%S/") error_occurred = False if self.df is None: log.error( "Error: DataFrame of pipeline has not been initialized, aborting pipeline run!" ) return # helper to pass the dataframe and/or input location from previous step to next step for step in self.steps: log.info(f"Processing step {step.name}") # load dataframe and/or input location for this step if step.df is None: step.df = self.df.copy() try: step.load_data() verified = step.verify() log.info(f"Verification for step {step.name}: {verified}") data_present = step.check_data_presence() if verified and not data_present: step_df = step.run() self.df = step_df # cleanup step.finish() except (StepError, Exception) as e: error_occurred = True log.error(f"Step {step.name} failed! {e}") finally: # Create snapshots to avoid data loss get_database().create_snapshot(step.df, prefix=run_id, name=step.name) self.df = self.df.replace(np.nan, None) # Set dataframe in DAL get_database().set_dataframe(self.df) # Upload DAL dataframe to chosen database get_database().save_dataframe() # Delete snapshots if not error_occurred: get_database().clean_snapshots(run_id) log.info(f"Pipeline finished running {len(self.steps)} steps!")