Skip to content

K-Sigma Anomaly Detection

KSigmaAnomalyDetection

Bases: DataManipulationBaseInterface, InputValidator

Anomaly detection with the k-sigma method. This method either computes the mean and standard deviation, or the median and the median absolute deviation (MAD) of the data. The k-sigma method then filters out all data points that are k times the standard deviation away from the mean, or k times the MAD away from the median. Assuming a normal distribution, this method keeps around 99.7% of the data points when k=3 and use_median=False.

Example

from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_manipulation.k_sigma_anomaly_detection import KSigmaAnomalyDetection

spark = ... # SparkSession
df = ... # Get a PySpark DataFrame

filtered_df = KSigmaAnomalyDetection(
    spark, df, ["<column to filter>"]
).filter()

filtered_df.show()

Parameters:

Name Type Description Default
spark SparkSession

A SparkSession object.

required
df DataFrame

Dataframe containing the raw data.

required
column_names list[str]

The names of the columns to be filtered (currently only one column is supported).

required
k_value float

The number of deviations to build the threshold.

3.0
use_median book

If True the median and the median absolute deviation (MAD) are used, instead of the mean and standard deviation.

False
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/k_sigma_anomaly_detection.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
class KSigmaAnomalyDetection(DataManipulationBaseInterface, InputValidator):
    """
    Anomaly detection with the k-sigma method. This method either computes the mean and standard deviation, or the median and the median absolute deviation (MAD) of the data.
    The k-sigma method then filters out all data points that are k times the standard deviation away from the mean, or k times the MAD away from the median.
    Assuming a normal distribution, this method keeps around 99.7% of the data points when k=3 and use_median=False.

    Example
    --------
    ```python
    from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_manipulation.k_sigma_anomaly_detection import KSigmaAnomalyDetection

    spark = ... # SparkSession
    df = ... # Get a PySpark DataFrame

    filtered_df = KSigmaAnomalyDetection(
        spark, df, ["<column to filter>"]
    ).filter()

    filtered_df.show()
    ```

    Parameters:
        spark (SparkSession): A SparkSession object.
        df (DataFrame): Dataframe containing the raw data.
        column_names (list[str]): The names of the columns to be filtered (currently only one column is supported).
        k_value (float): The number of deviations to build the threshold.
        use_median (book): If True the median and the median absolute deviation (MAD) are used, instead of the mean and standard deviation.
    """

    def __init__(
        self,
        spark: SparkSession,
        df: DataFrame,
        column_names: list[str],
        k_value: float = 3.0,
        use_median: bool = False,
    ) -> None:
        if len(column_names) == 0:
            raise Exception("You must provide at least one column name")
        if len(column_names) > 1:
            raise NotImplemented("Multiple columns are not supported yet")

        self.column_names = column_names
        self.use_median = use_median
        self.spark = spark
        self.df = df
        self.k_value = k_value

        self.validate(
            StructType(
                [StructField(column, DoubleType(), True) for column in column_names]
            )
        )

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def filter(self) -> DataFrame:
        """
        Filter anomalies based on the k-sigma rule
        """

        column_name = self.column_names[0]
        mean_value, deviation = 0, 0

        if self.use_median:
            mean_value = self.df.approxQuantile(column_name, [0.5], 0.0)[0]
            if mean_value is None:
                raise Exception("Failed to calculate the mean value")

            df_with_deviation = self.df.withColumn(
                "absolute_deviation", abs(col(column_name) - mean_value)
            )
            deviation = df_with_deviation.approxQuantile(
                "absolute_deviation", [0.5], 0.0
            )[0]
            if deviation is None:
                raise Exception("Failed to calculate the deviation value")
        else:
            stats = self.df.select(
                mean(column_name), stddev(self.column_names[0])
            ).first()
            if stats is None:
                raise Exception(
                    "Failed to calculate the mean value and the standard deviation value"
                )

            mean_value = stats[0]
            deviation = stats[1]

        shift = self.k_value * deviation
        lower_bound = mean_value - shift
        upper_bound = mean_value + shift

        return self.df.filter(
            (self.df[column_name] >= lower_bound)
            & (self.df[column_name] <= upper_bound)
        )

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/k_sigma_anomaly_detection.py
84
85
86
87
88
89
90
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

filter()

Filter anomalies based on the k-sigma rule

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/k_sigma_anomaly_detection.py
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
def filter(self) -> DataFrame:
    """
    Filter anomalies based on the k-sigma rule
    """

    column_name = self.column_names[0]
    mean_value, deviation = 0, 0

    if self.use_median:
        mean_value = self.df.approxQuantile(column_name, [0.5], 0.0)[0]
        if mean_value is None:
            raise Exception("Failed to calculate the mean value")

        df_with_deviation = self.df.withColumn(
            "absolute_deviation", abs(col(column_name) - mean_value)
        )
        deviation = df_with_deviation.approxQuantile(
            "absolute_deviation", [0.5], 0.0
        )[0]
        if deviation is None:
            raise Exception("Failed to calculate the deviation value")
    else:
        stats = self.df.select(
            mean(column_name), stddev(self.column_names[0])
        ).first()
        if stats is None:
            raise Exception(
                "Failed to calculate the mean value and the standard deviation value"
            )

        mean_value = stats[0]
        deviation = stats[1]

    shift = self.k_value * deviation
    lower_bound = mean_value - shift
    upper_bound = mean_value + shift

    return self.df.filter(
        (self.df[column_name] >= lower_bound)
        & (self.df[column_name] <= upper_bound)
    )