Skip to content

Data Binning

DataBinning

Bases: MachineLearningInterface

Data binning using clustering methods. This method partitions the data points into a specified number of clusters (bins) based on the specified column. Each data point is assigned to the nearest cluster center.

Example

from src.sdk.python.rtdip_sdk.pipelines.machine_learning.spark.data_binning import DataBinning

df = ... # Get a PySpark DataFrame with features column

binning = DataBinning(
    df=df,
    column_name="features",
    bins=3,
    output_column_name="bin",
    method="kmeans"
)
binned_df = binning.train().predict()
binned_df.show()

Parameters:

Name Type Description Default
df DataFrame

Dataframe containing the input data.

required
column_name str

The name of the input column to be binned (default: "features").

'features'
bins int

The number of bins/clusters to create (default: 2).

2
output_column_name str

The name of the output column containing bin assignments (default: "bin").

'bin'
method str

The binning method to use. Currently only supports "kmeans".

'kmeans'
Source code in src/sdk/python/rtdip_sdk/pipelines/machine_learning/spark/data_binning.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
class DataBinning(MachineLearningInterface):
    """
    Data binning using clustering methods. This method partitions the data points into a specified number of clusters (bins)
    based on the specified column. Each data point is assigned to the nearest cluster center.

    Example
    --------
    ```python
    from src.sdk.python.rtdip_sdk.pipelines.machine_learning.spark.data_binning import DataBinning

    df = ... # Get a PySpark DataFrame with features column

    binning = DataBinning(
        df=df,
        column_name="features",
        bins=3,
        output_column_name="bin",
        method="kmeans"
    )
    binned_df = binning.train().predict()
    binned_df.show()
    ```

    Parameters:
        df (DataFrame): Dataframe containing the input data.
        column_name (str): The name of the input column to be binned (default: "features").
        bins (int): The number of bins/clusters to create (default: 2).
        output_column_name (str): The name of the output column containing bin assignments (default: "bin").
        method (str): The binning method to use. Currently only supports "kmeans".
    """

    def __init__(
        self,
        df: DataFrame,
        column_name: str = "features",
        bins: int = 2,
        output_column_name: str = "bin",
        method: str = "kmeans",
    ) -> None:
        self.column_name = column_name

        self.df = df

        if method == "kmeans":
            self.method = clustering.KMeans(
                featuresCol=column_name, predictionCol=output_column_name, k=bins
            )
        else:
            raise Exception("Unknown method")

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def train(self):
        """
        Filter anomalies based on the k-sigma rule
        """
        self.model = self.method.fit(self.df)
        return self

    def predict(self):
        return self.model.transform(self.df)

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/machine_learning/spark/data_binning.py
73
74
75
76
77
78
79
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

train()

Filter anomalies based on the k-sigma rule

Source code in src/sdk/python/rtdip_sdk/pipelines/machine_learning/spark/data_binning.py
90
91
92
93
94
95
def train(self):
    """
    Filter anomalies based on the k-sigma rule
    """
    self.model = self.method.fit(self.df)
    return self