Skip to content

Flatline Detection

FlatlineDetection

Bases: MonitoringBaseInterface, InputValidator

Detects flatlining in specified columns of a PySpark DataFrame and logs warnings.

Flatlining occurs when a column contains consecutive null or zero values exceeding a specified tolerance period. This class identifies such occurrences and logs the rows where flatlining is detected.

Parameters:

Name Type Description Default
df DataFrame

The input DataFrame to monitor for flatlining.

required
watch_columns list

List of column names to monitor for flatlining (null or zero values).

required
tolerance_timespan int

Maximum allowed consecutive flatlining period. If exceeded, a warning is logged.

required
Example
from rtdip_sdk.pipelines.monitoring.spark.data_manipulation.flatline_detection import FlatlineDetection
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[1]").appName("FlatlineDetectionExample").getOrCreate()

# Example DataFrame
data = [
    (1, 1),
    (2, 0),
    (3, 0),
    (4, 0),
    (5, 5),
]
columns = ["ID", "Value"]
df = spark.createDataFrame(data, columns)

# Initialize FlatlineDetection
flatline_detection = FlatlineDetection(
    df,
    watch_columns=["Value"],
    tolerance_timespan=2
)

# Detect flatlining
flatline_detection.check()
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/flatline_detection.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
class FlatlineDetection(MonitoringBaseInterface, InputValidator):
    """
    Detects flatlining in specified columns of a PySpark DataFrame and logs warnings.

    Flatlining occurs when a column contains consecutive null or zero values exceeding a specified tolerance period.
    This class identifies such occurrences and logs the rows where flatlining is detected.

    Args:
        df (pyspark.sql.DataFrame): The input DataFrame to monitor for flatlining.
        watch_columns (list): List of column names to monitor for flatlining (null or zero values).
        tolerance_timespan (int): Maximum allowed consecutive flatlining period. If exceeded, a warning is logged.

    Example:
        ```python
        from rtdip_sdk.pipelines.monitoring.spark.data_manipulation.flatline_detection import FlatlineDetection
        from pyspark.sql import SparkSession

        spark = SparkSession.builder.master("local[1]").appName("FlatlineDetectionExample").getOrCreate()

        # Example DataFrame
        data = [
            (1, 1),
            (2, 0),
            (3, 0),
            (4, 0),
            (5, 5),
        ]
        columns = ["ID", "Value"]
        df = spark.createDataFrame(data, columns)

        # Initialize FlatlineDetection
        flatline_detection = FlatlineDetection(
            df,
            watch_columns=["Value"],
            tolerance_timespan=2
        )

        # Detect flatlining
        flatline_detection.check()
        ```
    """

    df: PySparkDataFrame
    watch_columns: list
    tolerance_timespan: int
    EXPECTED_SCHEMA = StructType(
        [
            StructField("TagName", StringType(), True),
            StructField("EventTime", TimestampType(), True),
            StructField("Status", StringType(), True),
            StructField("Value", FloatType(), True),
        ]
    )

    def __init__(
        self, df: PySparkDataFrame, watch_columns: list, tolerance_timespan: int
    ) -> None:
        if not watch_columns or not isinstance(watch_columns, list):
            raise ValueError("watch_columns must be a non-empty list of column names.")
        if not isinstance(tolerance_timespan, int) or tolerance_timespan <= 0:
            raise ValueError("tolerance_timespan must be a positive integer.")

        self.df = df
        self.validate(self.EXPECTED_SCHEMA)
        self.watch_columns = watch_columns
        self.tolerance_timespan = tolerance_timespan

        self.logger = logging.getLogger(self.__class__.__name__)
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter(
                "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
            )
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.INFO)

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def check(self) -> PySparkDataFrame:
        """
        Detects flatlining and logs relevant rows.

        Returns:
            pyspark.sql.DataFrame: The original DataFrame with additional flatline detection metadata.
        """
        flatlined_rows = self.check_for_flatlining()
        print("Flatlined Rows:")
        flatlined_rows.show(truncate=False)
        self.log_flatlining_rows(flatlined_rows)
        return self.df

    def check_for_flatlining(self) -> PySparkDataFrame:
        """
        Identifies rows with flatlining based on the specified columns and tolerance.

        Returns:
            pyspark.sql.DataFrame: A DataFrame containing rows with flatlining detected.
        """
        partition_column = "TagName"
        sort_column = "EventTime"
        window_spec = Window.partitionBy(partition_column).orderBy(sort_column)

        # Start with an empty DataFrame, ensure it has the required schema
        flatlined_rows = (
            self.df.withColumn("Value_flatline_flag", lit(None).cast("int"))
            .withColumn("Value_group", lit(None).cast("bigint"))
            .filter("1=0")
        )

        for column in self.watch_columns:
            flagged_column = f"{column}_flatline_flag"
            group_column = f"{column}_group"

            # Add flag and group columns
            df_with_flags = self.df.withColumn(
                flagged_column,
                when((col(column).isNull()) | (col(column) == 0.0), 1).otherwise(0),
            ).withColumn(
                group_column,
                sum(
                    when(
                        col(flagged_column)
                        != lag(col(flagged_column), 1, 0).over(window_spec),
                        1,
                    ).otherwise(0)
                ).over(window_spec),
            )

            # Identify flatlining groups
            group_counts = (
                df_with_flags.filter(col(flagged_column) == 1)
                .groupBy(group_column)
                .count()
            )
            large_groups = group_counts.filter(col("count") > self.tolerance_timespan)
            large_group_ids = [row[group_column] for row in large_groups.collect()]

            if large_group_ids:
                relevant_rows = df_with_flags.filter(
                    col(group_column).isin(large_group_ids)
                )

                # Ensure both DataFrames have the same columns
                for col_name in flatlined_rows.columns:
                    if col_name not in relevant_rows.columns:
                        relevant_rows = relevant_rows.withColumn(col_name, lit(None))

                flatlined_rows = flatlined_rows.union(relevant_rows)

        return flatlined_rows

    def log_flatlining_rows(self, flatlined_rows: PySparkDataFrame):
        """
        Logs flatlining rows for all monitored columns.

        Args:
            flatlined_rows (pyspark.sql.DataFrame): The DataFrame containing rows with flatlining detected.
        """
        if flatlined_rows.count() == 0:
            self.logger.info("No flatlining detected.")
            return

        for column in self.watch_columns:
            flagged_column = f"{column}_flatline_flag"

            if flagged_column not in flatlined_rows.columns:
                self.logger.warning(
                    f"Expected column '{flagged_column}' not found in DataFrame."
                )
                continue

            relevant_rows = flatlined_rows.filter(col(flagged_column) == 1).collect()

            if relevant_rows:
                for row in relevant_rows:
                    self.logger.warning(
                        f"Flatlining detected in column '{column}' at row: {row}."
                    )
            else:
                self.logger.info(f"No flatlining detected in column '{column}'.")

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/flatline_detection.py
114
115
116
117
118
119
120
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

check()

Detects flatlining and logs relevant rows.

Returns:

Type Description
DataFrame

pyspark.sql.DataFrame: The original DataFrame with additional flatline detection metadata.

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/flatline_detection.py
131
132
133
134
135
136
137
138
139
140
141
142
def check(self) -> PySparkDataFrame:
    """
    Detects flatlining and logs relevant rows.

    Returns:
        pyspark.sql.DataFrame: The original DataFrame with additional flatline detection metadata.
    """
    flatlined_rows = self.check_for_flatlining()
    print("Flatlined Rows:")
    flatlined_rows.show(truncate=False)
    self.log_flatlining_rows(flatlined_rows)
    return self.df

check_for_flatlining()

Identifies rows with flatlining based on the specified columns and tolerance.

Returns:

Type Description
DataFrame

pyspark.sql.DataFrame: A DataFrame containing rows with flatlining detected.

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/flatline_detection.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def check_for_flatlining(self) -> PySparkDataFrame:
    """
    Identifies rows with flatlining based on the specified columns and tolerance.

    Returns:
        pyspark.sql.DataFrame: A DataFrame containing rows with flatlining detected.
    """
    partition_column = "TagName"
    sort_column = "EventTime"
    window_spec = Window.partitionBy(partition_column).orderBy(sort_column)

    # Start with an empty DataFrame, ensure it has the required schema
    flatlined_rows = (
        self.df.withColumn("Value_flatline_flag", lit(None).cast("int"))
        .withColumn("Value_group", lit(None).cast("bigint"))
        .filter("1=0")
    )

    for column in self.watch_columns:
        flagged_column = f"{column}_flatline_flag"
        group_column = f"{column}_group"

        # Add flag and group columns
        df_with_flags = self.df.withColumn(
            flagged_column,
            when((col(column).isNull()) | (col(column) == 0.0), 1).otherwise(0),
        ).withColumn(
            group_column,
            sum(
                when(
                    col(flagged_column)
                    != lag(col(flagged_column), 1, 0).over(window_spec),
                    1,
                ).otherwise(0)
            ).over(window_spec),
        )

        # Identify flatlining groups
        group_counts = (
            df_with_flags.filter(col(flagged_column) == 1)
            .groupBy(group_column)
            .count()
        )
        large_groups = group_counts.filter(col("count") > self.tolerance_timespan)
        large_group_ids = [row[group_column] for row in large_groups.collect()]

        if large_group_ids:
            relevant_rows = df_with_flags.filter(
                col(group_column).isin(large_group_ids)
            )

            # Ensure both DataFrames have the same columns
            for col_name in flatlined_rows.columns:
                if col_name not in relevant_rows.columns:
                    relevant_rows = relevant_rows.withColumn(col_name, lit(None))

            flatlined_rows = flatlined_rows.union(relevant_rows)

    return flatlined_rows

log_flatlining_rows(flatlined_rows)

Logs flatlining rows for all monitored columns.

Parameters:

Name Type Description Default
flatlined_rows DataFrame

The DataFrame containing rows with flatlining detected.

required
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/flatline_detection.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def log_flatlining_rows(self, flatlined_rows: PySparkDataFrame):
    """
    Logs flatlining rows for all monitored columns.

    Args:
        flatlined_rows (pyspark.sql.DataFrame): The DataFrame containing rows with flatlining detected.
    """
    if flatlined_rows.count() == 0:
        self.logger.info("No flatlining detected.")
        return

    for column in self.watch_columns:
        flagged_column = f"{column}_flatline_flag"

        if flagged_column not in flatlined_rows.columns:
            self.logger.warning(
                f"Expected column '{flagged_column}' not found in DataFrame."
            )
            continue

        relevant_rows = flatlined_rows.filter(col(flagged_column) == 1).collect()

        if relevant_rows:
            for row in relevant_rows:
                self.logger.warning(
                    f"Flatlining detected in column '{column}' at row: {row}."
                )
        else:
            self.logger.info(f"No flatlining detected in column '{column}'.")