Skip to content

Moving Average

MovingAverage

Bases: MonitoringBaseInterface, InputValidator

Computes and logs the moving average over a specified window size for a given PySpark DataFrame.

Parameters:

Name Type Description Default
df DataFrame

The DataFrame to process.

required
window_size int

The size of the moving window.

required
Example
from pyspark.sql import SparkSession
from rtdip_sdk.pipelines.monitoring.spark.data_quality.moving_average import MovingAverage

spark = SparkSession.builder.master("local[1]").appName("MovingAverageExample").getOrCreate()

data = [
    ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", 1.0),
    ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", 2.0),
    ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", 3.0),
    ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", 4.0),
    ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", 5.0),
]

columns = ["TagName", "EventTime", "Status", "Value"]

df = spark.createDataFrame(data, columns)

moving_avg = MovingAverage(
    df=df,
    window_size=3,
)

moving_avg.check()
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/moving_average.py
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class MovingAverage(MonitoringBaseInterface, InputValidator):
    """
    Computes and logs the moving average over a specified window size for a given PySpark DataFrame.

    Args:
        df (pyspark.sql.DataFrame): The DataFrame to process.
        window_size (int): The size of the moving window.

    Example:
        ```python
        from pyspark.sql import SparkSession
        from rtdip_sdk.pipelines.monitoring.spark.data_quality.moving_average import MovingAverage

        spark = SparkSession.builder.master("local[1]").appName("MovingAverageExample").getOrCreate()

        data = [
            ("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", 1.0),
            ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", 2.0),
            ("A2PS64V0J.:ZUX09R", "2024-01-02 11:56:42.000", "Good", 3.0),
            ("A2PS64V0J.:ZUX09R", "2024-01-02 16:00:12.000", "Good", 4.0),
            ("A2PS64V0J.:ZUX09R", "2024-01-02 20:03:46.000", "Good", 5.0),
        ]

        columns = ["TagName", "EventTime", "Status", "Value"]

        df = spark.createDataFrame(data, columns)

        moving_avg = MovingAverage(
            df=df,
            window_size=3,
        )

        moving_avg.check()
        ```
    """

    df: PySparkDataFrame
    window_size: int
    EXPECTED_SCHEMA = StructType(
        [
            StructField("TagName", StringType(), True),
            StructField("EventTime", TimestampType(), True),
            StructField("Status", StringType(), True),
            StructField("Value", FloatType(), True),
        ]
    )

    def __init__(
        self,
        df: PySparkDataFrame,
        window_size: int,
    ) -> None:
        if not isinstance(window_size, int) or window_size <= 0:
            raise ValueError("window_size must be a positive integer.")

        self.df = df
        self.validate(self.EXPECTED_SCHEMA)
        self.window_size = window_size

        self.logger = logging.getLogger(self.__class__.__name__)
        if not self.logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter(
                "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
            )
            handler.setFormatter(formatter)
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.INFO)

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def check(self) -> None:
        """
        Computes and logs the moving average using a specified window size.
        """

        self._validate_inputs()

        window_spec = (
            Window.partitionBy("TagName")
            .orderBy("EventTime")
            .rowsBetween(-(self.window_size - 1), 0)
        )

        self.logger.info("Computing moving averages:")

        for row in (
            self.df.withColumn("MovingAverage", avg(col("Value")).over(window_spec))
            .select("TagName", "EventTime", "Value", "MovingAverage")
            .collect()
        ):
            self.logger.info(
                f"Tag: {row.TagName}, Time: {row.EventTime}, Value: {row.Value}, Moving Avg: {row.MovingAverage}"
            )

    def _validate_inputs(self):
        if not isinstance(self.window_size, int) or self.window_size <= 0:
            raise ValueError("window_size must be a positive integer.")

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/moving_average.py
90
91
92
93
94
95
96
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

check()

Computes and logs the moving average using a specified window size.

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/monitoring/spark/moving_average.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def check(self) -> None:
    """
    Computes and logs the moving average using a specified window size.
    """

    self._validate_inputs()

    window_spec = (
        Window.partitionBy("TagName")
        .orderBy("EventTime")
        .rowsBetween(-(self.window_size - 1), 0)
    )

    self.logger.info("Computing moving averages:")

    for row in (
        self.df.withColumn("MovingAverage", avg(col("Value")).over(window_spec))
        .select("TagName", "EventTime", "Value", "MovingAverage")
        .collect()
    ):
        self.logger.info(
            f"Tag: {row.TagName}, Time: {row.EventTime}, Value: {row.Value}, Moving Avg: {row.MovingAverage}"
        )