Skip to content

Missing Value Imputation

MissingValueImputation

Bases: DataManipulationBaseInterface, InputValidator

Imputes missing values in a univariate time series creating a continuous curve of data points. For that, the time intervals of each individual source is calculated, to then insert empty records at the missing timestamps with NaN values. Through spline interpolation the missing NaN values are calculated resulting in a consistent data set and thus enhance your data quality.

Example

from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.types import StructType, StructField, StringType
from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_manipulation.missing_value_imputation import (
    MissingValueImputation,
)

spark = spark_session()

schema = StructType([
    StructField("TagName", StringType(), True),
    StructField("EventTime", StringType(), True),
    StructField("Status", StringType(), True),
    StructField("Value", StringType(), True)
])

data = [
    ("A2PS64V0J.:ZUX09R", "2024-01-01 03:29:21.000", "Good", "1.0"),
    ("A2PS64V0J.:ZUX09R", "2024-01-01 07:32:55.000", "Good", "2.0"),
    ("A2PS64V0J.:ZUX09R", "2024-01-01 11:36:29.000", "Good", "3.0"),
    ("A2PS64V0J.:ZUX09R", "2024-01-01 15:39:03.000", "Good", "4.0"),
    ("A2PS64V0J.:ZUX09R", "2024-01-01 19:42:37.000", "Good", "5.0"),
    #("A2PS64V0J.:ZUX09R", "2024-01-01 23:46:11.000", "Good", "6.0"), # Test values
    #("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "7.0"),
    ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "8.0"),
]
df = spark.createDataFrame(data, schema=schema)

missing_value_imputation = MissingValueImputation(spark, df)
result = missing_value_imputation.filter()

Parameters:

Name Type Description Default
df DataFrame

Dataframe containing the raw data.

required
tolerance_percentage int

Percentage value that indicates how much the time series data points may vary in each interval

5
Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/missing_value_imputation.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
class MissingValueImputation(DataManipulationBaseInterface, InputValidator):
    """
    Imputes missing values in a univariate time series creating a continuous curve of data points. For that, the
    time intervals of each individual source is calculated, to then insert empty records at the missing timestamps with
    NaN values. Through spline interpolation the missing NaN values are calculated resulting in a consistent data set
    and thus enhance your data quality.

    Example
    --------
    ```python
    from pyspark.sql import SparkSession
    from pyspark.sql.dataframe import DataFrame
    from pyspark.sql.types import StructType, StructField, StringType
    from src.sdk.python.rtdip_sdk.pipelines.data_wranglers.spark.data_manipulation.missing_value_imputation import (
        MissingValueImputation,
    )

    spark = spark_session()

    schema = StructType([
        StructField("TagName", StringType(), True),
        StructField("EventTime", StringType(), True),
        StructField("Status", StringType(), True),
        StructField("Value", StringType(), True)
    ])

    data = [
        ("A2PS64V0J.:ZUX09R", "2024-01-01 03:29:21.000", "Good", "1.0"),
        ("A2PS64V0J.:ZUX09R", "2024-01-01 07:32:55.000", "Good", "2.0"),
        ("A2PS64V0J.:ZUX09R", "2024-01-01 11:36:29.000", "Good", "3.0"),
        ("A2PS64V0J.:ZUX09R", "2024-01-01 15:39:03.000", "Good", "4.0"),
        ("A2PS64V0J.:ZUX09R", "2024-01-01 19:42:37.000", "Good", "5.0"),
        #("A2PS64V0J.:ZUX09R", "2024-01-01 23:46:11.000", "Good", "6.0"), # Test values
        #("A2PS64V0J.:ZUX09R", "2024-01-02 03:49:45.000", "Good", "7.0"),
        ("A2PS64V0J.:ZUX09R", "2024-01-02 07:53:11.000", "Good", "8.0"),
    ]
    df = spark.createDataFrame(data, schema=schema)

    missing_value_imputation = MissingValueImputation(spark, df)
    result = missing_value_imputation.filter()
    ```

    Parameters:
        df (DataFrame): Dataframe containing the raw data.
        tolerance_percentage (int): Percentage value that indicates how much the time series data points may vary
            in each interval
    """

    df: PySparkDataFrame

    def __init__(
        self,
        spark: SparkSession,
        df: PySparkDataFrame,
        tolerance_percentage: int = 5,
    ) -> None:
        self.spark = spark
        self.df = df
        self.tolerance_percentage = tolerance_percentage

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    @staticmethod
    def _impute_missing_values_sp(df) -> PySparkDataFrame:
        """
        Imputes missing values by Spline Interpolation
        """
        data = np.array(
            df.select("Value").rdd.flatMap(lambda x: x).collect(), dtype=float
        )
        mask = np.isnan(data)

        x_data = np.arange(len(data))
        y_data = data[~mask]

        spline = UnivariateSpline(x_data[~mask], y_data, s=0)

        data_imputed = data.copy()
        data_imputed[mask] = spline(x_data[mask])
        data_imputed_list = data_imputed.tolist()

        imputed_rdd = df.rdd.zipWithIndex().map(
            lambda row: Row(
                TagName=row[0][0],
                EventTime=row[0][1],
                Status=row[0][2],
                Value=float(data_imputed_list[row[1]]),
            )
        )
        imputed_df = imputed_rdd.toDF(df.schema)

        return imputed_df

    @staticmethod
    def _flag_missing_values(df, tolerance_percentage) -> PySparkDataFrame:
        """
        Determines intervals of each respective source time series and inserts empty records at missing timestamps
        with NaN values
        """
        window_spec = Window.partitionBy("TagName").orderBy("EventTime")

        df = df.withColumn("prev_event_time", F.lag("EventTime").over(window_spec))
        df = df.withColumn(
            "time_diff_seconds",
            (F.unix_timestamp("EventTime") - F.unix_timestamp("prev_event_time")),
        )

        df_diff = df.filter(F.col("time_diff_seconds").isNotNull())
        interval_counts = df_diff.groupBy("time_diff_seconds").count()
        most_frequent_interval = interval_counts.orderBy(F.desc("count")).first()
        expected_interval = (
            most_frequent_interval["time_diff_seconds"]
            if most_frequent_interval
            else None
        )

        tolerance = (
            (expected_interval * tolerance_percentage) / 100 if expected_interval else 0
        )

        existing_timestamps = (
            df.select("TagName", "EventTime")
            .rdd.map(lambda row: (row["TagName"], row["EventTime"]))
            .groupByKey()
            .collectAsMap()
        )

        def generate_missing_timestamps(prev_event_time, event_time, tag_name):
            # Check for first row
            if (
                prev_event_time is None
                or event_time is None
                or expected_interval is None
            ):
                return []

            # Check against existing timestamps to avoid duplicates
            tag_timestamps = set(existing_timestamps.get(tag_name, []))
            missing_timestamps = []
            current_time = prev_event_time

            while current_time < event_time:
                next_expected_time = current_time + timedelta(seconds=expected_interval)
                time_diff = abs((next_expected_time - event_time).total_seconds())
                if time_diff <= tolerance:
                    break
                if next_expected_time not in tag_timestamps:
                    missing_timestamps.append(next_expected_time)
                current_time = next_expected_time

            return missing_timestamps

        generate_missing_timestamps_udf = udf(
            generate_missing_timestamps, ArrayType(TimestampType())
        )

        df_with_missing = df.withColumn(
            "missing_timestamps",
            generate_missing_timestamps_udf("prev_event_time", "EventTime", "TagName"),
        )

        df_missing_entries = df_with_missing.select(
            "TagName",
            F.explode("missing_timestamps").alias("EventTime"),
            F.lit("Good").alias("Status"),
            F.lit(float("nan")).cast(FloatType()).alias("Value"),
        )

        df_combined = (
            df.select("TagName", "EventTime", "Status", "Value")
            .union(df_missing_entries)
            .orderBy("EventTime")
        )

        return df_combined

    @staticmethod
    def _is_column_type(df, column_name, data_type):
        """
        Helper method for data type checking
        """
        type_ = df.schema[column_name]

        return isinstance(type_.dataType, data_type)

    def filter(self) -> PySparkDataFrame:
        """
        Imputate missing values based on [Spline Interpolation, ]
        """
        if not all(
            col_ in self.df.columns
            for col_ in ["TagName", "EventTime", "Value", "Status"]
        ):
            raise ValueError("Columns not as expected")

        if not self._is_column_type(self.df, "EventTime", TimestampType):
            if self._is_column_type(self.df, "EventTime", StringType):
                # Attempt to parse the first format, then fallback to the second
                self.df = self.df.withColumn(
                    "EventTime",
                    F.coalesce(
                        F.to_timestamp("EventTime", "yyyy-MM-dd HH:mm:ss.SSS"),
                        F.to_timestamp("EventTime", "dd.MM.yyyy HH:mm:ss"),
                    ),
                )
        if not self._is_column_type(self.df, "Value", FloatType):
            self.df = self.df.withColumn("Value", self.df["Value"].cast(FloatType()))

        dfs_by_source = self._split_by_source()

        imputed_dfs: List[PySparkDataFrame] = []

        for source, df in dfs_by_source.items():
            # Determine, insert and flag all the missing entries
            flagged_df = self._flag_missing_values(df, self.tolerance_percentage)

            # Impute the missing values of flagged entries
            try:
                imputed_df_sp = self._impute_missing_values_sp(flagged_df)
            except Exception as e:
                if flagged_df.count() != 1:  # Account for single entries
                    raise Exception(
                        "Something went wrong while imputing missing values"
                    )

            imputed_dfs.append(imputed_df_sp)

        result_df = imputed_dfs[0]
        for df in imputed_dfs[1:]:
            result_df = result_df.unionByName(df)

        return result_df

    def _split_by_source(self) -> dict:
        """
        Helper method to separate individual time series based on their source
        """
        tag_names = self.df.select("TagName").distinct().collect()
        tag_names = [row["TagName"] for row in tag_names]
        source_dict = {
            tag: self.df.filter(col("TagName") == tag).orderBy("EventTime")
            for tag in tag_names
        }

        return source_dict

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/missing_value_imputation.py
91
92
93
94
95
96
97
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

filter()

Imputate missing values based on [Spline Interpolation, ]

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/missing_value_imputation.py
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def filter(self) -> PySparkDataFrame:
    """
    Imputate missing values based on [Spline Interpolation, ]
    """
    if not all(
        col_ in self.df.columns
        for col_ in ["TagName", "EventTime", "Value", "Status"]
    ):
        raise ValueError("Columns not as expected")

    if not self._is_column_type(self.df, "EventTime", TimestampType):
        if self._is_column_type(self.df, "EventTime", StringType):
            # Attempt to parse the first format, then fallback to the second
            self.df = self.df.withColumn(
                "EventTime",
                F.coalesce(
                    F.to_timestamp("EventTime", "yyyy-MM-dd HH:mm:ss.SSS"),
                    F.to_timestamp("EventTime", "dd.MM.yyyy HH:mm:ss"),
                ),
            )
    if not self._is_column_type(self.df, "Value", FloatType):
        self.df = self.df.withColumn("Value", self.df["Value"].cast(FloatType()))

    dfs_by_source = self._split_by_source()

    imputed_dfs: List[PySparkDataFrame] = []

    for source, df in dfs_by_source.items():
        # Determine, insert and flag all the missing entries
        flagged_df = self._flag_missing_values(df, self.tolerance_percentage)

        # Impute the missing values of flagged entries
        try:
            imputed_df_sp = self._impute_missing_values_sp(flagged_df)
        except Exception as e:
            if flagged_df.count() != 1:  # Account for single entries
                raise Exception(
                    "Something went wrong while imputing missing values"
                )

        imputed_dfs.append(imputed_df_sp)

    result_df = imputed_dfs[0]
    for df in imputed_dfs[1:]:
        result_df = result_df.unionByName(df)

    return result_df