Skip to content

Linear Regression

LinearRegression

Bases: MachineLearningInterface

This function uses pyspark.ml.LinearRegression to train a linear regression model on time data. And the uses the model to predict next values in the time series.

Parameters:

Name Type Description Default
df Dataframe

DataFrame containing the features and labels.

required
features_col str

Name of the column containing the features (the input). Default is 'features'.

'features'
label_col str

Name of the column containing the label (the input). Default is 'label'.

'label'
prediction_col str

Name of the column to which the prediction will be written. Default is 'prediction'.

'prediction'
Source code in src/sdk/python/rtdip_sdk/pipelines/machine_learning/spark/linear_regression.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
class LinearRegression(MachineLearningInterface):
    """
    This function uses pyspark.ml.LinearRegression to train a linear regression model on time data.
    And the uses the model to predict next values in the time series.

    Args:
        df (pyspark.sql.Dataframe): DataFrame containing the features and labels.
        features_col (str): Name of the column containing the features (the input). Default is 'features'.
        label_col (str): Name of the column containing the label (the input). Default is 'label'.
        prediction_col (str): Name of the column to which the prediction will be written. Default is 'prediction'.
    Returns:
        PySparkDataFrame: Returns the original PySpark DataFrame without changes.
    """

    def __init__(
        self,
        df: DataFrame,
        features_col: str = "features",
        label_col: str = "label",
        prediction_col: str = "prediction",
    ) -> None:
        self.df = df
        self.features_col = features_col
        self.label_col = label_col
        self.prediction_col = prediction_col

    @staticmethod
    def system_type():
        """
        Attributes:
            SystemType (Environment): Requires PYSPARK
        """
        return SystemType.PYSPARK

    @staticmethod
    def libraries():
        libraries = Libraries()
        return libraries

    @staticmethod
    def settings() -> dict:
        return {}

    def split_data(self, train_ratio: float = 0.8) -> tuple[DataFrame, DataFrame]:
        """
        Splits the dataset into training and testing sets.

        Args:
            train_ratio (float): The ratio of the data to be used for training. Default is 0.8 (80% for training).

        Returns:
            tuple[DataFrame, DataFrame]: Returns the training and testing datasets.
        """
        train_df, test_df = self.df.randomSplit([train_ratio, 1 - train_ratio], seed=42)
        return train_df, test_df

    def train(self, train_df: DataFrame):
        """
        Trains a linear regression model on the provided data.
        """
        linear_regression = ml.regression.LinearRegression(
            featuresCol=self.features_col,
            labelCol=self.label_col,
            predictionCol=self.prediction_col,
        )

        self.model = linear_regression.fit(train_df)
        return self

    def predict(self, prediction_df: DataFrame):
        """
        Predicts the next values in the time series.
        """

        return self.model.transform(
            prediction_df,
        )

    def evaluate(self, test_df: DataFrame) -> Optional[float]:
        """
        Evaluates the trained model using RMSE.

        Args:
            test_df (DataFrame): The testing dataset to evaluate the model.

        Returns:
            Optional[float]: The Root Mean Squared Error (RMSE) of the model or None if the prediction columnd doesn't exist.
        """
        # Check the columns of the test DataFrame
        test_df.show(5)

        if self.prediction_col not in test_df.columns:
            print(
                f"Error: '{self.prediction_col}' column is missing in the test DataFrame."
            )
            return None

        # Evaluator for RMSE
        evaluator_rmse = RegressionEvaluator(
            labelCol=self.label_col,
            predictionCol=self.prediction_col,
            metricName="rmse",
        )
        rmse = evaluator_rmse.evaluate(test_df)

        # Evaluator for R²
        evaluator_r2 = RegressionEvaluator(
            labelCol=self.label_col, predictionCol=self.prediction_col, metricName="r2"
        )
        r2 = evaluator_r2.evaluate(test_df)

        return rmse, r2

system_type() staticmethod

Attributes:

Name Type Description
SystemType Environment

Requires PYSPARK

Source code in src/sdk/python/rtdip_sdk/pipelines/machine_learning/spark/linear_regression.py
48
49
50
51
52
53
54
@staticmethod
def system_type():
    """
    Attributes:
        SystemType (Environment): Requires PYSPARK
    """
    return SystemType.PYSPARK

split_data(train_ratio=0.8)

Splits the dataset into training and testing sets.

Parameters:

Name Type Description Default
train_ratio float

The ratio of the data to be used for training. Default is 0.8 (80% for training).

0.8

Returns:

Type Description
tuple[DataFrame, DataFrame]

tuple[DataFrame, DataFrame]: Returns the training and testing datasets.

Source code in src/sdk/python/rtdip_sdk/pipelines/machine_learning/spark/linear_regression.py
65
66
67
68
69
70
71
72
73
74
75
76
def split_data(self, train_ratio: float = 0.8) -> tuple[DataFrame, DataFrame]:
    """
    Splits the dataset into training and testing sets.

    Args:
        train_ratio (float): The ratio of the data to be used for training. Default is 0.8 (80% for training).

    Returns:
        tuple[DataFrame, DataFrame]: Returns the training and testing datasets.
    """
    train_df, test_df = self.df.randomSplit([train_ratio, 1 - train_ratio], seed=42)
    return train_df, test_df

train(train_df)

Trains a linear regression model on the provided data.

Source code in src/sdk/python/rtdip_sdk/pipelines/machine_learning/spark/linear_regression.py
78
79
80
81
82
83
84
85
86
87
88
89
def train(self, train_df: DataFrame):
    """
    Trains a linear regression model on the provided data.
    """
    linear_regression = ml.regression.LinearRegression(
        featuresCol=self.features_col,
        labelCol=self.label_col,
        predictionCol=self.prediction_col,
    )

    self.model = linear_regression.fit(train_df)
    return self

predict(prediction_df)

Predicts the next values in the time series.

Source code in src/sdk/python/rtdip_sdk/pipelines/machine_learning/spark/linear_regression.py
91
92
93
94
95
96
97
98
def predict(self, prediction_df: DataFrame):
    """
    Predicts the next values in the time series.
    """

    return self.model.transform(
        prediction_df,
    )

evaluate(test_df)

Evaluates the trained model using RMSE.

Parameters:

Name Type Description Default
test_df DataFrame

The testing dataset to evaluate the model.

required

Returns:

Type Description
Optional[float]

Optional[float]: The Root Mean Squared Error (RMSE) of the model or None if the prediction columnd doesn't exist.

Source code in src/sdk/python/rtdip_sdk/pipelines/machine_learning/spark/linear_regression.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def evaluate(self, test_df: DataFrame) -> Optional[float]:
    """
    Evaluates the trained model using RMSE.

    Args:
        test_df (DataFrame): The testing dataset to evaluate the model.

    Returns:
        Optional[float]: The Root Mean Squared Error (RMSE) of the model or None if the prediction columnd doesn't exist.
    """
    # Check the columns of the test DataFrame
    test_df.show(5)

    if self.prediction_col not in test_df.columns:
        print(
            f"Error: '{self.prediction_col}' column is missing in the test DataFrame."
        )
        return None

    # Evaluator for RMSE
    evaluator_rmse = RegressionEvaluator(
        labelCol=self.label_col,
        predictionCol=self.prediction_col,
        metricName="rmse",
    )
    rmse = evaluator_rmse.evaluate(test_df)

    # Evaluator for R²
    evaluator_r2 = RegressionEvaluator(
        labelCol=self.label_col, predictionCol=self.prediction_col, metricName="r2"
    )
    r2 = evaluator_r2.evaluate(test_df)

    return rmse, r2