Skip to content

K Nearest Neighbors

KNearestNeighbors

Bases: MachineLearningInterface

Implements the K-Nearest Neighbors (KNN) algorithm to predict missing values in a dataset. This component is compatible with time series data and supports customizable weighted or unweighted averaging for predictions.

Example:

from src.sdk.python.rtdip_sdk.pipelines.machine_learning.spark.k_nearest_neighbors import KNearestNeighbors
from pyspark.ml.feature import StandardScaler, VectorAssembler
from pyspark.sql import SparkSession
spark = ... # SparkSession
raw_df = ... # Get a PySpark DataFrame
assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="assembled_features")
df = assembler.transform(raw_df)
scaler = StandardScaler(inputCol="assembled_features", outputCol="features", withStd=True, withMean=True)
scaled_df = scaler.fit(df).transform(df)
knn = KNearestNeighbors(
    df=scaled_df,
    features_col="features",
    label_col="label",
    timestamp_col="timestamp",
    k=3,
    weighted=True,
    distance_metric="combined",  # Options: "euclidean", "temporal", "combined"
    temporal_weight=0.3  # Weight for temporal distance when using combined metric
)
train_df, test_df = knn.randomSplit([0.8, 0.2], seed=42)
knn.train(train_df)
predictions = knn.predict(test_df)
Parameters:


df (pyspark.sql.Dataframe): DataFrame containing the features and labels
features_col (str): Name of the column containing the features (the input). Default is 'features'
label_col (str): Name of the column containing the label (the input). Default is 'label'
timestamp_col (str, optional): Name of the column containing timestamps
k (int): The number of neighbors to consider in the KNN algorithm. Default is 3
weighted (bool): Whether to use weighted averaging based on distance. Default is False (unweighted averaging)
distance_metric (str): Type of distance calculation ("euclidean", "temporal", or "combined")
temporal_weight (float): Weight for temporal distance in combined metric (0 to 1)
Source code in src/sdk/python/rtdip_sdk/pipelines/forecasting/spark/k_nearest_neighbors.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
class KNearestNeighbors(MachineLearningInterface):
    """
    Implements the K-Nearest Neighbors (KNN) algorithm to predict missing values in a dataset.
    This component is compatible with time series data and supports customizable weighted or unweighted averaging for predictions.

    Example:
    --------
    ```python
    from src.sdk.python.rtdip_sdk.pipelines.machine_learning.spark.k_nearest_neighbors import KNearestNeighbors
    from pyspark.ml.feature import StandardScaler, VectorAssembler
    from pyspark.sql import SparkSession
    spark = ... # SparkSession
    raw_df = ... # Get a PySpark DataFrame
    assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="assembled_features")
    df = assembler.transform(raw_df)
    scaler = StandardScaler(inputCol="assembled_features", outputCol="features", withStd=True, withMean=True)
    scaled_df = scaler.fit(df).transform(df)
    knn = KNearestNeighbors(
        df=scaled_df,
        features_col="features",
        label_col="label",
        timestamp_col="timestamp",
        k=3,
        weighted=True,
        distance_metric="combined",  # Options: "euclidean", "temporal", "combined"
        temporal_weight=0.3  # Weight for temporal distance when using combined metric
    )
    train_df, test_df = knn.randomSplit([0.8, 0.2], seed=42)
    knn.train(train_df)
    predictions = knn.predict(test_df)
    ```
    Parameters:
    --------
        df (pyspark.sql.Dataframe): DataFrame containing the features and labels
        features_col (str): Name of the column containing the features (the input). Default is 'features'
        label_col (str): Name of the column containing the label (the input). Default is 'label'
        timestamp_col (str, optional): Name of the column containing timestamps
        k (int): The number of neighbors to consider in the KNN algorithm. Default is 3
        weighted (bool): Whether to use weighted averaging based on distance. Default is False (unweighted averaging)
        distance_metric (str): Type of distance calculation ("euclidean", "temporal", or "combined")
        temporal_weight (float): Weight for temporal distance in combined metric (0 to 1)
    """

    def __init__(
        self,
        df: DataFrame,
        features_col,
        label_col,
        timestamp_col=None,
        k=3,
        weighted=False,
        distance_metric="euclidean",
        temporal_weight=0.5,
    ):
        self.df = df
        self.features_col = features_col
        self.label_col = label_col
        self.timestamp_col = timestamp_col
        self.k = k
        self.weighted = weighted
        self.distance_metric = distance_metric
        self.temporal_weight = temporal_weight
        self.train_features = None
        self.train_labels = None
        self.train_timestamps = None

        if distance_metric not in ["euclidean", "temporal", "combined"]:
            raise ValueError(
                "distance_metric must be 'euclidean', 'temporal', or 'combined'"
            )

        if distance_metric in ["temporal", "combined"] and timestamp_col is None:
            raise ValueError(
                "timestamp_col must be provided when using temporal or combined distance metrics"
            )

    @staticmethod
    def system_type():
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def train(self, train_df: DataFrame):
        """
        Sets up the training DataFrame including temporal information if specified.
        """
        if self.timestamp_col:
            df = train_df.select(
                self.features_col, self.label_col, self.timestamp_col
            ).collect()
            self.train_timestamps = np.array(
                [row[self.timestamp_col].timestamp() for row in df]
            )
        else:
            df = train_df.select(self.features_col, self.label_col).collect()

        self.train_features = np.array([row[self.features_col] for row in df])
        self.train_labels = np.array([row[self.label_col] for row in df])
        return self

    def predict(self, test_df: DataFrame) -> DataFrame:
        """
        Predicts labels using the specified distance metric.
        """
        train_features = self.train_features
        train_labels = self.train_labels
        train_timestamps = self.train_timestamps
        k = self.k
        weighted = self.weighted
        distance_metric = self.distance_metric
        temporal_weight = self.temporal_weight

        def calculate_distances(features, timestamp=None):
            test_point = np.array(features)

            if distance_metric == "euclidean":
                return np.sqrt(np.sum((train_features - test_point) ** 2, axis=1))

            elif distance_metric == "temporal":
                return np.abs(train_timestamps - timestamp)

            else:  # combined
                feature_distances = np.sqrt(
                    np.sum((train_features - test_point) ** 2, axis=1)
                )
                temporal_distances = np.abs(train_timestamps - timestamp)

                # Normalize distances to [0, 1] range
                feature_distances = (feature_distances - feature_distances.min()) / (
                    feature_distances.max() - feature_distances.min() + 1e-10
                )
                temporal_distances = (temporal_distances - temporal_distances.min()) / (
                    temporal_distances.max() - temporal_distances.min() + 1e-10
                )

                # Combine distances with weights
                return (
                    1 - temporal_weight
                ) * feature_distances + temporal_weight * temporal_distances

        def knn_predict(features, timestamp=None):
            distances = calculate_distances(features, timestamp)
            k_nearest_indices = np.argsort(distances)[:k]
            k_nearest_labels = train_labels[k_nearest_indices]

            if weighted:
                k_distances = distances[k_nearest_indices]
                weights = 1 / (k_distances + 1e-10)
                weights /= np.sum(weights)
                unique_labels = np.unique(k_nearest_labels)
                weighted_votes = {
                    label: np.sum(weights[k_nearest_labels == label])
                    for label in unique_labels
                }
                return float(max(weighted_votes.items(), key=lambda x: x[1])[0])
            else:
                return float(
                    max(set(k_nearest_labels), key=list(k_nearest_labels).count)
                )

        if self.distance_metric in ["temporal", "combined"]:
            predict_udf = udf(
                lambda features, timestamp: knn_predict(
                    features, timestamp.timestamp()
                ),
                DoubleType(),
            )
            return test_df.withColumn(
                "prediction",
                predict_udf(col(self.features_col), col(self.timestamp_col)),
            )
        else:
            predict_udf = udf(lambda features: knn_predict(features), DoubleType())
            return test_df.withColumn("prediction", predict_udf(col(self.features_col)))

train(train_df)

Sets up the training DataFrame including temporal information if specified.

Source code in src/sdk/python/rtdip_sdk/pipelines/forecasting/spark/k_nearest_neighbors.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def train(self, train_df: DataFrame):
    """
    Sets up the training DataFrame including temporal information if specified.
    """
    if self.timestamp_col:
        df = train_df.select(
            self.features_col, self.label_col, self.timestamp_col
        ).collect()
        self.train_timestamps = np.array(
            [row[self.timestamp_col].timestamp() for row in df]
        )
    else:
        df = train_df.select(self.features_col, self.label_col).collect()

    self.train_features = np.array([row[self.features_col] for row in df])
    self.train_labels = np.array([row[self.label_col] for row in df])
    return self

predict(test_df)

Predicts labels using the specified distance metric.

Source code in src/sdk/python/rtdip_sdk/pipelines/forecasting/spark/k_nearest_neighbors.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def predict(self, test_df: DataFrame) -> DataFrame:
    """
    Predicts labels using the specified distance metric.
    """
    train_features = self.train_features
    train_labels = self.train_labels
    train_timestamps = self.train_timestamps
    k = self.k
    weighted = self.weighted
    distance_metric = self.distance_metric
    temporal_weight = self.temporal_weight

    def calculate_distances(features, timestamp=None):
        test_point = np.array(features)

        if distance_metric == "euclidean":
            return np.sqrt(np.sum((train_features - test_point) ** 2, axis=1))

        elif distance_metric == "temporal":
            return np.abs(train_timestamps - timestamp)

        else:  # combined
            feature_distances = np.sqrt(
                np.sum((train_features - test_point) ** 2, axis=1)
            )
            temporal_distances = np.abs(train_timestamps - timestamp)

            # Normalize distances to [0, 1] range
            feature_distances = (feature_distances - feature_distances.min()) / (
                feature_distances.max() - feature_distances.min() + 1e-10
            )
            temporal_distances = (temporal_distances - temporal_distances.min()) / (
                temporal_distances.max() - temporal_distances.min() + 1e-10
            )

            # Combine distances with weights
            return (
                1 - temporal_weight
            ) * feature_distances + temporal_weight * temporal_distances

    def knn_predict(features, timestamp=None):
        distances = calculate_distances(features, timestamp)
        k_nearest_indices = np.argsort(distances)[:k]
        k_nearest_labels = train_labels[k_nearest_indices]

        if weighted:
            k_distances = distances[k_nearest_indices]
            weights = 1 / (k_distances + 1e-10)
            weights /= np.sum(weights)
            unique_labels = np.unique(k_nearest_labels)
            weighted_votes = {
                label: np.sum(weights[k_nearest_labels == label])
                for label in unique_labels
            }
            return float(max(weighted_votes.items(), key=lambda x: x[1])[0])
        else:
            return float(
                max(set(k_nearest_labels), key=list(k_nearest_labels).count)
            )

    if self.distance_metric in ["temporal", "combined"]:
        predict_udf = udf(
            lambda features, timestamp: knn_predict(
                features, timestamp.timestamp()
            ),
            DoubleType(),
        )
        return test_df.withColumn(
            "prediction",
            predict_udf(col(self.features_col), col(self.timestamp_col)),
        )
    else:
        predict_udf = udf(lambda features: knn_predict(features), DoubleType())
        return test_df.withColumn("prediction", predict_udf(col(self.features_col)))