Skip to content

Check Value Ranges

CheckValueRanges

Bases: MonitoringBaseInterface, InputValidator

Monitors data in a DataFrame by checking the 'Value' column against expected ranges for specified TagNames. Logs events when 'Value' exceeds the defined ranges for any TagName.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to monitor.

required
tag_ranges dict

A dictionary where keys are TagNames and values are dictionaries specifying 'min' and/or 'max', and optionally 'inclusive_bounds' values. Example: { 'A2PS64V0J.:ZUX09R': {'min': 0, 'max': 100, 'inclusive_bounds': True}, 'B3TS64V0K.:ZUX09R': {'min': 10, 'max': 200, 'inclusive_bounds': False}, }

required
Example
from pyspark.sql import SparkSession
from rtdip_sdk.pipelines.monitoring.spark.data_quality.check_value_ranges import CheckValueRanges

spark = SparkSession.builder.master("local[1]").appName("CheckValueRangesExample").getOrCreate()

data = [
    ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", 25.0),
    ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", -5.0),
    ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", 50.0),
    ("B3TS64V0K.:ZUX09R", "2024-01-02 16:00:12.000", "Good", 80.0),
    ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", 100.0),
]

columns = ["TagName", "EventTime", "Status", "Value"]

df = spark.createDataFrame(data, columns)

tag_ranges = {
    "A2PS64V0J.:ZUX09R": {"min": 0, "max": 50, "inclusive_bounds": True},
    "B3TS64V0K.:ZUX09R": {"min": 50, "max": 100, "inclusive_bounds": False},
}

check_value_ranges = CheckValueRanges(
    df=df,
    tag_ranges=tag_ranges,
)

result_df = check_value_ranges.check()
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/check_value_ranges.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
class CheckValueRanges(MonitoringBaseInterface, InputValidator):
    """
    Monitors data in a DataFrame by checking the 'Value' column against expected ranges for specified TagNames.
    Logs events when 'Value' exceeds the defined ranges for any TagName.

    Args:
        df (pyspark.sql.DataFrame): The DataFrame to monitor.
        tag_ranges (dict): A dictionary where keys are TagNames and values are dictionaries specifying 'min' and/or
            'max', and optionally 'inclusive_bounds' values.
            Example:
                {
                    'A2PS64V0J.:ZUX09R': {'min': 0, 'max': 100, 'inclusive_bounds': True},
                    'B3TS64V0K.:ZUX09R': {'min': 10, 'max': 200, 'inclusive_bounds': False},
                }

    Example:
        ```python
        from pyspark.sql import SparkSession
        from rtdip_sdk.pipelines.monitoring.spark.data_quality.check_value_ranges import CheckValueRanges

        spark = SparkSession.builder.master("local[1]").appName("CheckValueRangesExample").getOrCreate()

        data = [
            ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", 25.0),
            ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", -5.0),
            ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", 50.0),
            ("B3TS64V0K.:ZUX09R", "2024-01-02 16:00:12.000", "Good", 80.0),
            ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", 100.0),
        ]

        columns = ["TagName", "EventTime", "Status", "Value"]

        df = spark.createDataFrame(data, columns)

        tag_ranges = {
            "A2PS64V0J.:ZUX09R": {"min": 0, "max": 50, "inclusive_bounds": True},
            "B3TS64V0K.:ZUX09R": {"min": 50, "max": 100, "inclusive_bounds": False},
        }

        check_value_ranges = CheckValueRanges(
            df=df,
            tag_ranges=tag_ranges,
        )

        result_df = check_value_ranges.check()
        ```
    """

    df: PySparkDataFrame
    tag_ranges: dict
    EXPECTED_SCHEMA = StructType(
        [
            StructField("TagName", StringType(), True),
            StructField("EventTime", TimestampType(), True),
            StructField("Status", StringType(), True),
            StructField("Value", FloatType(), True),
        ]
    )

    def __init__(
        self,
        df: PySparkDataFrame,
        tag_ranges: dict,
    ) -> None:
        self.df = df
        self.validate(self.EXPECTED_SCHEMA)
        self.tag_ranges = tag_ranges

        # Configure logging
        self.logger = logging.getLogger(self.__class__.__name__)
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter(
                "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
            )
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.INFO)

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def check(self) -> PySparkDataFrame:
        """
        Executes the value range checking logic for the specified TagNames. Identifies and logs any rows
        where 'Value' exceeds the defined ranges for each TagName.

        Returns:
            pyspark.sql.DataFrame:
                Returns the original PySpark DataFrame without changes.
        """
        out_of_range_df = self.check_for_out_of_range()

        if out_of_range_df.count() > 0:
            self.log_out_of_range_values(out_of_range_df)
        else:
            self.logger.info(f"No out of range values found in 'Value' column.")

        return self.df

    def check_for_out_of_range(self) -> PySparkDataFrame:
        """
        Identifies rows where 'Value' exceeds defined ranges.

        Returns:
        pyspark.sql.DataFrame: A DataFrame containing rows with out-of-range values.
        """

        self._validate_inputs()

        out_of_range_df = self.df.filter("1=0")

        for tag_name, range_dict in self.tag_ranges.items():
            df = self.df.filter(col("TagName") == tag_name)

            if df.count() == 0:
                self.logger.warning(f"No data found for TagName '{tag_name}'.")
                continue

            min_value = range_dict.get("min", None)
            max_value = range_dict.get("max", None)
            inclusive_bounds = range_dict.get("inclusive_bounds", True)

            conditions = []

            # Build minimum value condition
            if min_value is not None:
                if inclusive_bounds:
                    min_condition = col("Value") < min_value
                else:
                    min_condition = col("Value") <= min_value
                conditions.append(min_condition)

            # Build maximum value condition
            if max_value is not None:
                if inclusive_bounds:
                    max_condition = col("Value") > max_value
                else:
                    max_condition = col("Value") >= max_value
                conditions.append(max_condition)

            if conditions:
                condition = reduce(or_, conditions)
                tag_out_of_range_df = df.filter(condition)
                out_of_range_df = out_of_range_df.union(tag_out_of_range_df)

        return out_of_range_df

    def log_out_of_range_values(self, out_of_range_df: PySparkDataFrame):
        """
        Logs out-of-range values for all TagNames.
        """
        for tag_name in (
            out_of_range_df.select("TagName")
            .distinct()
            .rdd.map(lambda row: row[0])
            .collect()
        ):
            tag_out_of_range_df = out_of_range_df.filter(col("TagName") == tag_name)
            count = tag_out_of_range_df.count()
            self.logger.info(
                f"Found {count} rows in 'Value' column for TagName '{tag_name}' out of range."
            )
            for row in tag_out_of_range_df.collect():
                self.logger.info(f"Out of range row for TagName '{tag_name}': {row}")

    def _validate_inputs(self):
        if not isinstance(self.tag_ranges, dict):
            raise TypeError("tag_ranges must be a dictionary.")

        available_tags = (
            self.df.select("TagName").distinct().rdd.map(lambda row: row[0]).collect()
        )

        for tag_name, range_dict in self.tag_ranges.items():
            if not isinstance(tag_name, str):
                raise ValueError(f"TagName '{tag_name}' must be a string.")

            if tag_name not in available_tags:
                raise ValueError(f"TagName '{tag_name}' not found in DataFrame.")

            if "min" not in range_dict and "max" not in range_dict:
                raise ValueError(
                    f"TagName '{tag_name}' must have at least 'min' or 'max' specified."
                )

            inclusive_bounds = range_dict.get("inclusive_bounds", True)
            if not isinstance(inclusive_bounds, bool):
                raise ValueError(
                    f"Inclusive_bounds for TagName '{tag_name}' must be a boolean."
                )

            min_value = range_dict.get("min", None)
            max_value = range_dict.get("max", None)
            if min_value is not None and not isinstance(min_value, (int, float)):
                raise ValueError(
                    f"Minimum value for TagName '{tag_name}' must be a number."
                )
            if max_value is not None and not isinstance(max_value, (int, float)):
                raise ValueError(
                    f"Maximum value for TagName '{tag_name}' must be a number."
                )

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/check_value_ranges.py
117
118
119
120
121
122
123
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

check()

Executes the value range checking logic for the specified TagNames. Identifies and logs any rows where 'Value' exceeds the defined ranges for each TagName.

Returns:

Type Description
DataFrame

pyspark.sql.DataFrame: Returns the original PySpark DataFrame without changes.

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/check_value_ranges.py
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def check(self) -> PySparkDataFrame:
    """
    Executes the value range checking logic for the specified TagNames. Identifies and logs any rows
    where 'Value' exceeds the defined ranges for each TagName.

    Returns:
        pyspark.sql.DataFrame:
            Returns the original PySpark DataFrame without changes.
    """
    out_of_range_df = self.check_for_out_of_range()

    if out_of_range_df.count() > 0:
        self.log_out_of_range_values(out_of_range_df)
    else:
        self.logger.info(f"No out of range values found in 'Value' column.")

    return self.df

check_for_out_of_range()

Identifies rows where 'Value' exceeds defined ranges.

Returns: pyspark.sql.DataFrame: A DataFrame containing rows with out-of-range values.

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/check_value_ranges.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def check_for_out_of_range(self) -> PySparkDataFrame:
    """
    Identifies rows where 'Value' exceeds defined ranges.

    Returns:
    pyspark.sql.DataFrame: A DataFrame containing rows with out-of-range values.
    """

    self._validate_inputs()

    out_of_range_df = self.df.filter("1=0")

    for tag_name, range_dict in self.tag_ranges.items():
        df = self.df.filter(col("TagName") == tag_name)

        if df.count() == 0:
            self.logger.warning(f"No data found for TagName '{tag_name}'.")
            continue

        min_value = range_dict.get("min", None)
        max_value = range_dict.get("max", None)
        inclusive_bounds = range_dict.get("inclusive_bounds", True)

        conditions = []

        # Build minimum value condition
        if min_value is not None:
            if inclusive_bounds:
                min_condition = col("Value") < min_value
            else:
                min_condition = col("Value") <= min_value
            conditions.append(min_condition)

        # Build maximum value condition
        if max_value is not None:
            if inclusive_bounds:
                max_condition = col("Value") > max_value
            else:
                max_condition = col("Value") >= max_value
            conditions.append(max_condition)

        if conditions:
            condition = reduce(or_, conditions)
            tag_out_of_range_df = df.filter(condition)
            out_of_range_df = out_of_range_df.union(tag_out_of_range_df)

    return out_of_range_df

log_out_of_range_values(out_of_range_df)

Logs out-of-range values for all TagNames.

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/check_value_ranges.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def log_out_of_range_values(self, out_of_range_df: PySparkDataFrame):
    """
    Logs out-of-range values for all TagNames.
    """
    for tag_name in (
        out_of_range_df.select("TagName")
        .distinct()
        .rdd.map(lambda row: row[0])
        .collect()
    ):
        tag_out_of_range_df = out_of_range_df.filter(col("TagName") == tag_name)
        count = tag_out_of_range_df.count()
        self.logger.info(
            f"Found {count} rows in 'Value' column for TagName '{tag_name}' out of range."
        )
        for row in tag_out_of_range_df.collect():
            self.logger.info(f"Out of range row for TagName '{tag_name}': {row}")