Skip to content

Interval Based

IdentifyMissingDataInterval

Bases: MonitoringBaseInterface, InputValidator

Detects missing data intervals in a DataFrame by identifying time differences between consecutive measurements that exceed a specified tolerance or a multiple of the Median Absolute Deviation (MAD). Logs the start and end times of missing intervals along with their durations.

Parameters:

Name Type Description Default
df Dataframe

DataFrame containing at least the 'EventTime' column.

required
interval str

Expected interval between data points (e.g., '10ms', '500ms'). If not specified, the median of time differences is used.

None
tolerance str

Tolerance time beyond which an interval is considered missing (e.g., '10ms'). If not specified, it defaults to 'mad_multiplier' times the Median Absolute Deviation (MAD) of time differences.

None
mad_multiplier float

Multiplier for MAD to calculate tolerance. Default is 3.

3
min_tolerance str

Minimum tolerance for pattern-based detection (e.g., '100ms'). Default is '10ms'.

'10ms'

Returns:

Name Type Description
df Dataframe

Returns the original PySparkDataFrame without changes.

Example

```python from rtdip_sdk.pipelines.monitoring.spark.data_manipulation import IdentifyMissingDataInterval from pyspark.sql import SparkSession

missing_data_monitor = IdentifyMissingDataInterval(
    df=df,
    interval='100ms',
    tolerance='10ms',
)

df_result = missing_data_monitor.check()
```
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/identify_missing_data_interval.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
class IdentifyMissingDataInterval(MonitoringBaseInterface, InputValidator):
    """
    Detects missing data intervals in a DataFrame by identifying time differences between consecutive
    measurements that exceed a specified tolerance or a multiple of the Median Absolute Deviation (MAD).
    Logs the start and end times of missing intervals along with their durations.


    Args:
        df (pyspark.sql.Dataframe): DataFrame containing at least the 'EventTime' column.
        interval (str, optional): Expected interval between data points (e.g., '10ms', '500ms'). If not specified, the median of time differences is used.
        tolerance (str, optional): Tolerance time beyond which an interval is considered missing (e.g., '10ms'). If not specified, it defaults to 'mad_multiplier' times the Median Absolute Deviation (MAD) of time differences.
        mad_multiplier (float, optional): Multiplier for MAD to calculate tolerance. Default is 3.
        min_tolerance (str, optional): Minimum tolerance for pattern-based detection (e.g., '100ms'). Default is '10ms'.

    Returns:
        df (pyspark.sql.Dataframe): Returns the original PySparkDataFrame without changes.

    Example
    --------
    ```python
      from rtdip_sdk.pipelines.monitoring.spark.data_manipulation import IdentifyMissingDataInterval
    from pyspark.sql import SparkSession

        missing_data_monitor = IdentifyMissingDataInterval(
            df=df,
            interval='100ms',
            tolerance='10ms',
        )

        df_result = missing_data_monitor.check()
        ```

    """

    df: PySparkDataFrame
    EXPECTED_SCHEMA = StructType(
        [
            StructField("TagName", StringType(), True),
            StructField("EventTime", TimestampType(), True),
            StructField("Status", StringType(), True),
            StructField("Value", FloatType(), True),
        ]
    )

    def __init__(
        self,
        df: PySparkDataFrame,
        interval: str = None,
        tolerance: str = None,
        mad_multiplier: float = 3,
        min_tolerance: str = "10ms",
    ) -> None:

        self.df = df
        self.interval = interval
        self.tolerance = tolerance
        self.mad_multiplier = mad_multiplier
        self.min_tolerance = min_tolerance
        self.validate(self.EXPECTED_SCHEMA)

        # Use global pipeline logger
        self.logger_manager = LoggerManager()
        self.logger = self.logger_manager.create_logger("IdentifyMissingDataInterval")

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def check(self) -> PySparkDataFrame:
        """
        Executes the identify missing data logic.

        Returns:
            pyspark.sql.DataFrame:
                Returns the original PySpark DataFrame without changes.
        """
        if "EventTime" not in self.df.columns:
            self.logger.error("The DataFrame must contain an 'EventTime' column.")
            raise ValueError("The DataFrame must contain an 'EventTime' column.")

        df = self.df.withColumn("EventTime", F.to_timestamp("EventTime"))
        df_sorted = df.orderBy("EventTime")
        # Calculate time difference in milliseconds between consecutive rows
        df_with_diff = df_sorted.withColumn(
            "TimeDeltaMs",
            (
                F.col("EventTime").cast("double")
                - F.lag("EventTime").over(Window.orderBy("EventTime")).cast("double")
            )
            * 1000,
        ).withColumn(
            "StartMissing", F.lag("EventTime").over(Window.orderBy("EventTime"))
        )
        # Parse interval to milliseconds if given
        if self.interval is not None:
            try:
                interval_ms = parse_time_string_to_ms(self.interval)
                self.logger.info(f"Using provided expected interval: {interval_ms} ms")
            except ValueError as e:
                self.logger.error(e)
                raise
        else:
            # Calculate interval based on median of time differences
            median_expr = F.expr("percentile_approx(TimeDeltaMs, 0.5)")
            median_row = df_with_diff.select(median_expr.alias("median")).collect()[0]
            interval_ms = median_row["median"]
            self.logger.info(
                f"Using median of time differences as expected interval: {interval_ms} ms"
            )
        # Parse tolernace to milliseconds if given
        if self.tolerance is not None:
            try:
                tolerance_ms = parse_time_string_to_ms(self.tolerance)
                self.logger.info(f"Using provided tolerance: {tolerance_ms} ms")
            except ValueError as e:
                self.logger.error(e)
                raise
        else:
            # Calulate tolerance based on MAD
            mad_expr = F.expr(
                f"percentile_approx(abs(TimeDeltaMs - {interval_ms}), 0.5)"
            )
            mad_row = df_with_diff.select(mad_expr.alias("mad")).collect()[0]
            mad = mad_row["mad"]
            calculated_tolerance_ms = self.mad_multiplier * mad
            min_tolerance_ms = parse_time_string_to_ms(self.min_tolerance)
            tolerance_ms = max(calculated_tolerance_ms, min_tolerance_ms)
            self.logger.info(f"Calculated tolerance: {tolerance_ms} ms (MAD-based)")
        # Calculate the maximum acceptable interval with tolerance
        max_interval_with_tolerance_ms = interval_ms + tolerance_ms
        self.logger.info(
            f"Maximum acceptable interval with tolerance: {max_interval_with_tolerance_ms} ms"
        )

        # Identify missing intervals
        missing_intervals_df = df_with_diff.filter(
            (F.col("TimeDeltaMs") > max_interval_with_tolerance_ms)
            & (F.col("StartMissing").isNotNull())
        ).select(
            "TagName",
            "StartMissing",
            F.col("EventTime").alias("EndMissing"),
            "TimeDeltaMs",
        )
        # Convert time delta to readable format
        missing_intervals_df = missing_intervals_df.withColumn(
            "DurationMissing",
            F.concat(
                F.floor(F.col("TimeDeltaMs") / 3600000).cast("string"),
                F.lit("h "),
                F.floor((F.col("TimeDeltaMs") % 3600000) / 60000).cast("string"),
                F.lit("m "),
                F.floor(((F.col("TimeDeltaMs") % 3600000) % 60000) / 1000).cast(
                    "string"
                ),
                F.lit("s"),
            ),
        ).select("TagName", "StartMissing", "EndMissing", "DurationMissing")
        missing_intervals = missing_intervals_df.collect()
        if missing_intervals:
            self.logger.info("Detected Missing Intervals:")
            for row in missing_intervals:
                self.logger.info(
                    f"Tag: {row['TagName']} Missing Interval from {row['StartMissing']} to {row['EndMissing']} "
                    f"Duration: {row['DurationMissing']}"
                )
        else:
            self.logger.info("No missing intervals detected.")
        return self.df

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/identify_missing_data_interval.py
104
105
106
107
108
109
110
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

check()

Executes the identify missing data logic.

Returns:

Type Description
DataFrame

pyspark.sql.DataFrame: Returns the original PySpark DataFrame without changes.

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/identify_missing_data_interval.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
def check(self) -> PySparkDataFrame:
    """
    Executes the identify missing data logic.

    Returns:
        pyspark.sql.DataFrame:
            Returns the original PySpark DataFrame without changes.
    """
    if "EventTime" not in self.df.columns:
        self.logger.error("The DataFrame must contain an 'EventTime' column.")
        raise ValueError("The DataFrame must contain an 'EventTime' column.")

    df = self.df.withColumn("EventTime", F.to_timestamp("EventTime"))
    df_sorted = df.orderBy("EventTime")
    # Calculate time difference in milliseconds between consecutive rows
    df_with_diff = df_sorted.withColumn(
        "TimeDeltaMs",
        (
            F.col("EventTime").cast("double")
            - F.lag("EventTime").over(Window.orderBy("EventTime")).cast("double")
        )
        * 1000,
    ).withColumn(
        "StartMissing", F.lag("EventTime").over(Window.orderBy("EventTime"))
    )
    # Parse interval to milliseconds if given
    if self.interval is not None:
        try:
            interval_ms = parse_time_string_to_ms(self.interval)
            self.logger.info(f"Using provided expected interval: {interval_ms} ms")
        except ValueError as e:
            self.logger.error(e)
            raise
    else:
        # Calculate interval based on median of time differences
        median_expr = F.expr("percentile_approx(TimeDeltaMs, 0.5)")
        median_row = df_with_diff.select(median_expr.alias("median")).collect()[0]
        interval_ms = median_row["median"]
        self.logger.info(
            f"Using median of time differences as expected interval: {interval_ms} ms"
        )
    # Parse tolernace to milliseconds if given
    if self.tolerance is not None:
        try:
            tolerance_ms = parse_time_string_to_ms(self.tolerance)
            self.logger.info(f"Using provided tolerance: {tolerance_ms} ms")
        except ValueError as e:
            self.logger.error(e)
            raise
    else:
        # Calulate tolerance based on MAD
        mad_expr = F.expr(
            f"percentile_approx(abs(TimeDeltaMs - {interval_ms}), 0.5)"
        )
        mad_row = df_with_diff.select(mad_expr.alias("mad")).collect()[0]
        mad = mad_row["mad"]
        calculated_tolerance_ms = self.mad_multiplier * mad
        min_tolerance_ms = parse_time_string_to_ms(self.min_tolerance)
        tolerance_ms = max(calculated_tolerance_ms, min_tolerance_ms)
        self.logger.info(f"Calculated tolerance: {tolerance_ms} ms (MAD-based)")
    # Calculate the maximum acceptable interval with tolerance
    max_interval_with_tolerance_ms = interval_ms + tolerance_ms
    self.logger.info(
        f"Maximum acceptable interval with tolerance: {max_interval_with_tolerance_ms} ms"
    )

    # Identify missing intervals
    missing_intervals_df = df_with_diff.filter(
        (F.col("TimeDeltaMs") > max_interval_with_tolerance_ms)
        & (F.col("StartMissing").isNotNull())
    ).select(
        "TagName",
        "StartMissing",
        F.col("EventTime").alias("EndMissing"),
        "TimeDeltaMs",
    )
    # Convert time delta to readable format
    missing_intervals_df = missing_intervals_df.withColumn(
        "DurationMissing",
        F.concat(
            F.floor(F.col("TimeDeltaMs") / 3600000).cast("string"),
            F.lit("h "),
            F.floor((F.col("TimeDeltaMs") % 3600000) / 60000).cast("string"),
            F.lit("m "),
            F.floor(((F.col("TimeDeltaMs") % 3600000) % 60000) / 1000).cast(
                "string"
            ),
            F.lit("s"),
        ),
    ).select("TagName", "StartMissing", "EndMissing", "DurationMissing")
    missing_intervals = missing_intervals_df.collect()
    if missing_intervals:
        self.logger.info("Detected Missing Intervals:")
        for row in missing_intervals:
            self.logger.info(
                f"Tag: {row['TagName']} Missing Interval from {row['StartMissing']} to {row['EndMissing']} "
                f"Duration: {row['DurationMissing']}"
            )
    else:
        self.logger.info("No missing intervals detected.")
    return self.df