Skip to content

Duplicate Detection

DuplicateDetection

Bases: DataManipulationBaseInterface, InputValidator

Cleanses a PySpark DataFrame from duplicates.

Example

from rtdip_sdk.pipelines.monitoring.spark.data_manipulation.duplicate_detection import DuplicateDetection
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame

duplicate_detection_monitor = DuplicateDetection(df, primary_key_columns=["TagName", "EventTime"])

result = duplicate_detection_monitor.filter()

Parameters:

Name Type Description Default
df DataFrame

PySpark DataFrame to be cleansed.

required
primary_key_columns list

List of column names that serve as primary key for duplicate detection.

required
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/duplicate_detection.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class DuplicateDetection(DataManipulationBaseInterface, InputValidator):
    """
    Cleanses a PySpark DataFrame from duplicates.

    Example
    --------
    ```python
    from rtdip_sdk.pipelines.monitoring.spark.data_manipulation.duplicate_detection import DuplicateDetection
    from pyspark.sql import SparkSession
    from pyspark.sql.dataframe import DataFrame

    duplicate_detection_monitor = DuplicateDetection(df, primary_key_columns=["TagName", "EventTime"])

    result = duplicate_detection_monitor.filter()
    ```

    Parameters:
        df (DataFrame): PySpark DataFrame to be cleansed.
        primary_key_columns (list): List of column names that serve as primary key for duplicate detection.
    """

    df: PySparkDataFrame
    primary_key_columns: list

    def __init__(self, df: PySparkDataFrame, primary_key_columns: list) -> None:
        if not primary_key_columns or not isinstance(primary_key_columns, list):
            raise ValueError(
                "primary_key_columns must be a non-empty list of column names."
            )
        self.df = df
        self.primary_key_columns = primary_key_columns

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def filter(self) -> PySparkDataFrame:
        """
        Returns:
            DataFrame: A cleansed PySpark DataFrame from all duplicates based on primary key columns.
        """
        cleansed_df = self.df.dropDuplicates(self.primary_key_columns)
        return cleansed_df

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/duplicate_detection.py
57
58
59
60
61
62
63
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

filter()

Returns:

Name Type Description
DataFrame DataFrame

A cleansed PySpark DataFrame from all duplicates based on primary key columns.

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/duplicate_detection.py
74
75
76
77
78
79
80
def filter(self) -> PySparkDataFrame:
    """
    Returns:
        DataFrame: A cleansed PySpark DataFrame from all duplicates based on primary key columns.
    """
    cleansed_df = self.df.dropDuplicates(self.primary_key_columns)
    return cleansed_df