ARTICLE AD BOX
Telecom Subscription Cancellation Prediction using PySpark and Neural Networks
This notebook builds a machine learning pipeline in PySpark to predict whether a telecom customer will cancel their subscription. The data is loaded from two CSV files — one for training and one for testing — and stored in a Spark DataFrame.
The preprocessing steps clean the data by encoding Yes/No columns as 1/0, dropping billing charge columns that are linear combinations of other features, and renaming the target column to label. Since the dataset is imbalanced (far fewer cancellations than non-cancellations), the minority class is oversampled using random resampling with replacement so both classes are equally represented during training.
The features are then assembled into a single vector and normalized using a StandardScaler. A Multilayer Perceptron (neural network) classifier is trained inside a Pipeline that chains the assembler, scaler, and classifier together. A cross-validated grid search tries four different hidden-layer architectures and selects the one with the best F1 score, which balances precision and recall — making it a more meaningful metric than accuracy on a classification problem. The best model's architecture, weights, and test set F1 score are printed, and the final model is saved to disk.
train_df = spark.read.csv('subscription_train.csv', header=True, inferSchema=True) test_df = spark.read.csv('subscription_test.csv', header=True, inferSchema=True) bool_cols = ['Roaming enabled', 'Voicemail enabled', 'Cancelled'] cols_to_drop = ['Region', 'District code', 'Daily billing total', 'Evening billing total', 'Night billing total', 'Roaming billing total'] def encode_bool_cols(df): for c in bool_cols: df = df.withColumn(c, F.when(F.col(c).isin('Yes', 'True'), 1).otherwise(0)) return df.drop(*cols_to_drop) train_clean = encode_bool_cols(train_df) test_clean = encode_bool_cols(test_df) train_clean = train_clean.withColumnRenamed('Cancelled', 'label') test_clean = test_clean.withColumnRenamed('Cancelled', 'label') train_clean.show(5) majority_df = train_clean.filter(F.col('label') == 0) minority_df = train_clean.filter(F.col('label') == 1) majority_count = majority_df.count() minority_count = minority_df.count() majority_count fraction = (majority_count / minority_count) + 0.1 minority_resampled = minority_df.sample(withReplacement=True, fraction=fraction, seed=42) \ .limit(majority_count) train_balanced = majority_df.union(minority_resampled) total_balanced = train_balanced.count() train_balanced.groupBy('label') \ .count() \ .withColumn('proportion', F.round(F.col('count') / total_balanced, 4)) \ .orderBy('label') \ .show() from pyspark.ml.feature import VectorAssembler feature_cols = [c for c in train_balanced.columns if c != 'label'] print("Feature columns:", feature_cols) assembler = VectorAssembler(inputCols=feature_cols, outputCol='features_raw') from pyspark.ml.feature import StandardScaler scaler = StandardScaler(inputCol='features_raw', outputCol='features', withMean=True, withStd=True) from pyspark.ml.classification import MultilayerPerceptronClassifier num_features = len(feature_cols) layers = [num_features, 16, 2] mlp = MultilayerPerceptronClassifier( featuresCol='features', labelCol='label', maxIter=100, layers=layers, blockSize=128, seed=42 ) from pyspark.ml import Pipeline pipeline = Pipeline(stages=[assembler, scaler, mlp]) from pyspark.ml.tuning import CrossValidator, ParamGridBuilder from pyspark.ml.evaluation import MulticlassClassificationEvaluator param_grid = ParamGridBuilder() \ .addGrid(mlp.layers, [ [num_features, 8, 2], [num_features, 16, 2], [num_features, 32, 2], [num_features, 16, 8, 2] ]).build() evaluator = MulticlassClassificationEvaluator( labelCol='label', predictionCol='prediction', metricName='f1') cv = CrossValidator( estimator=pipeline, estimatorParamMaps=param_grid, evaluator=evaluator, numFolds=3, seed=42) cv_model = cv.fit(train_balanced) best_mlp = cv_model.bestModel.stages[-1] print("Optimal layers (nodes per layer):", best_mlp.getLayers()) print("Number of layers:", len(best_mlp.getLayers())) best_mlp.weights) test_predictions = cv_model.transform(test_clean) f1_score = evaluator.evaluate(test_predictions) print(f"Test set F1 score: {f1_score:.4f}") cv_model.bestModel.save('subscription_mlp_model')