Skip to content

Gaussian Smoothing

GaussianSmoothing

Bases: DataManipulationBaseInterface

Applies Gaussian smoothing to a PySpark DataFrame. This method smooths the values in a specified column using a Gaussian filter, which helps reduce noise and fluctuations in time-series or spatial data.

The smoothing can be performed in two modes: - Temporal mode: Applies smoothing along the time axis within each unique ID. - Spatial mode: Applies smoothing across different IDs for the same timestamp.

Example

from pyspark.sql import SparkSession
from some_module import GaussianSmoothing

spark = SparkSession.builder.getOrCreate()
df = ...  # Load your PySpark DataFrame

smoothed_df = GaussianSmoothing(
    df=df,
    sigma=2.0,
    mode="temporal",
    id_col="sensor_id",
    timestamp_col="timestamp",
    value_col="measurement"
).filter()

smoothed_df.show()

Parameters:

Name Type Description Default
df DataFrame

The input PySpark DataFrame.

required
sigma float

The standard deviation for the Gaussian kernel, controlling the amount of smoothing.

required
mode str

The smoothing mode, either "temporal" (default) or "spatial".

'temporal'
id_col str

The name of the column representing unique entity IDs (default: "id").

'id'
timestamp_col str

The name of the column representing timestamps (default: "timestamp").

'timestamp'
value_col str

The name of the column containing the values to be smoothed (default: "value").

'value'

Raises:

Type Description
TypeError

If df is not a PySpark DataFrame.

ValueError

If sigma is not a positive number.

ValueError

If mode is not "temporal" or "spatial".

ValueError

If id_col, timestamp_col, or value_col are not found in the DataFrame.

Source code in src/sdk/python/rtdip_sdk/pipelines/data_quality/data_manipulation/spark/gaussian_smoothing.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class GaussianSmoothing(DataManipulationBaseInterface):
    """
    Applies Gaussian smoothing to a PySpark DataFrame. This method smooths the values in a specified column
    using a Gaussian filter, which helps reduce noise and fluctuations in time-series or spatial data.

    The smoothing can be performed in two modes:
    - **Temporal mode**: Applies smoothing along the time axis within each unique ID.
    - **Spatial mode**: Applies smoothing across different IDs for the same timestamp.

    Example
    --------
    ```python
    from pyspark.sql import SparkSession
    from some_module import GaussianSmoothing

    spark = SparkSession.builder.getOrCreate()
    df = ...  # Load your PySpark DataFrame

    smoothed_df = GaussianSmoothing(
        df=df,
        sigma=2.0,
        mode="temporal",
        id_col="sensor_id",
        timestamp_col="timestamp",
        value_col="measurement"
    ).filter()

    smoothed_df.show()
    ```

    Parameters:
        df (PySparkDataFrame): The input PySpark DataFrame.
        sigma (float): The standard deviation for the Gaussian kernel, controlling the amount of smoothing.
        mode (str, optional): The smoothing mode, either `"temporal"` (default) or `"spatial"`.
        id_col (str, optional): The name of the column representing unique entity IDs (default: `"id"`).
        timestamp_col (str, optional): The name of the column representing timestamps (default: `"timestamp"`).
        value_col (str, optional): The name of the column containing the values to be smoothed (default: `"value"`).

    Raises:
        TypeError: If `df` is not a PySpark DataFrame.
        ValueError: If `sigma` is not a positive number.
        ValueError: If `mode` is not `"temporal"` or `"spatial"`.
        ValueError: If `id_col`, `timestamp_col`, or `value_col` are not found in the DataFrame.
    """

    def __init__(
        self,
        df: PySparkDataFrame,
        sigma: float,
        mode: str = "temporal",
        id_col: str = "id",
        timestamp_col: str = "timestamp",
        value_col: str = "value",
    ) -> None:
        if not isinstance(df, PySparkDataFrame):
            raise TypeError("df must be a PySpark DataFrame")
        if not isinstance(sigma, (int, float)) or sigma <= 0:
            raise ValueError("sigma must be a positive number")
        if mode not in ["temporal", "spatial"]:
            raise ValueError("mode must be either 'temporal' or 'spatial'")

        if id_col not in df.columns:
            raise ValueError(f"Column {id_col} not found in DataFrame")
        if timestamp_col not in df.columns:
            raise ValueError(f"Column {timestamp_col} not found in DataFrame")
        if value_col not in df.columns:
            raise ValueError(f"Column {value_col} not found in DataFrame")

        self.df = df
        self.sigma = sigma
        self.mode = mode
        self.id_col = id_col
        self.timestamp_col = timestamp_col
        self.value_col = value_col

    @staticmethod
    def system_type():
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    @staticmethod
    def create_gaussian_smoother(sigma_value):
        def apply_gaussian(values):
            if not values:
                return None
            values_array = np.array([float(v) for v in values])
            smoothed = gaussian_filter1d(values_array, sigma=sigma_value)
            return float(smoothed[-1])

        return apply_gaussian

    def filter(self) -> PySparkDataFrame:

        smooth_udf = F.udf(self.create_gaussian_smoother(self.sigma), FloatType())

        if self.mode == "temporal":
            window = (
                Window.partitionBy(self.id_col)
                .orderBy(self.timestamp_col)
                .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
            )
        else:  # spatial mode
            window = (
                Window.partitionBy(self.timestamp_col)
                .orderBy(self.id_col)
                .rangeBetween(Window.unboundedPreceding, Window.unboundedFollowing)
            )

        collect_list_expr = F.collect_list(F.col(self.value_col)).over(window)

        return self.df.withColumn(self.value_col, smooth_udf(collect_list_expr))