import os import cv2 import torch import numpy as np import pandas as pd import matplotlib.pyplot as plt import torch.nn as nn import torch.optim as optim from torch.utils.data import Dataset, DataLoader from sklearn.model_selection import train_test_split from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score csv_path = '/home/iolson26/SCC_Rover/run_2026-03-19_17-18-55/data.csv' image_folder = "/home/iolson26/SCC_Rover/run_2026-03-19_17-18-55/" output_dir = "./model_outputs" os.makedirs(output_dir, exist_ok=True) #Model Config Stuff batch_size = 32 num_epochs = 50 learning_rate = 1e-3 random_seed = 42 #Load df = pd.read_csv(csv_path) df.columns = df.columns.str.strip() samples = [] for i, row in df.iterrows(): image_name = str(row['image']).strip() image_path = os.path.join(image_folder, image_name) img = cv2.imread(image_path) #Skip if img is None: print(f"Skipping bad image [{i}]: {image_path}") continue angle = float(row['servo']) samples.append((image_path, angle)) print(f"Total valid samples: {len(samples)}") #Train Test Split train_samples, temp_samples = train_test_split(samples, test_size=0.30, random_state=random_seed, shuffle=True) val_samples, test_samples = train_test_split(temp_samples, test_size=0.50, random_state=random_seed, shuffle=True) print(f"Train samples: {len(train_samples)}") print(f"Val samples: {len(val_samples)}") print(f"Test samples: {len(test_samples)}") # Preprocessing def preprocess_image(img): img = img[60:400, :, :] img = cv2.resize(img, (200, 66)) img = img.astype(np.float32) img = (img / 127.5) - 1.0 return img def denormalize_image(img): img = ((img + 1.0) * 127.5).clip(0, 255).astype(np.uint8) return img # Dataset class RoverDataset(Dataset): def __init__(self, samples): self.samples = samples def __len__(self): return len(self.samples) def __getitem__(self, index): image_path, angle = self.samples[index] img = cv2.imread(image_path) img = preprocess_image(img) img_tensor = torch.tensor(img, dtype=torch.float32).permute(2, 0, 1) angle_tensor = torch.tensor(angle, dtype=torch.float32) return img_tensor, angle_tensor, image_path #Load the Data train_dataset = RoverDataset(train_samples) val_dataset = RoverDataset(val_samples) test_dataset = RoverDataset(test_samples) train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False) test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False) # Define Model class SteeringCNN(nn.Module): def __init__(self): super(SteeringCNN, self).__init__() self.conv_layers = nn.Sequential( nn.Conv2d(3, 16, kernel_size=5, stride=2), nn.ReLU(), nn.Conv2d(16, 32, kernel_size=5, stride=2), nn.ReLU(), nn.Conv2d(32, 64, kernel_size=5, stride=2), nn.ReLU() ) self.flatten = nn.Flatten() self.fc_layers = nn.Sequential( nn.Linear(64 * 5 * 22, 50), nn.ReLU(), nn.Linear(50, 10), nn.ReLU(), nn.Linear(10, 1) ) def forward(self, x): x = self.conv_layers(x) x = self.flatten(x) x = self.fc_layers(x) return x device = "cpu" model = SteeringCNN().to(device) criterion = nn.MSELoss() optimizer = optim.Adam(model.parameters(), lr=learning_rate) #Train train_losses = [] val_losses = [] best_val_loss = float("inf") for epoch in range(num_epochs): model.train() total_train_loss = 0.0 for images_batch, angles_batch, _ in train_loader: images_batch = images_batch.to(device) angles_batch = angles_batch.to(device) preds = model(images_batch).squeeze(-1) loss = criterion(preds, angles_batch) optimizer.zero_grad() loss.backward() optimizer.step() total_train_loss += loss.item() avg_train_loss = total_train_loss / len(train_loader) model.eval() total_val_loss = 0.0 with torch.no_grad(): for images_batch, angles_batch, _ in val_loader: images_batch = images_batch.to(device) angles_batch = angles_batch.to(device) preds = model(images_batch).squeeze(-1) loss = criterion(preds, angles_batch) total_val_loss += loss.item() avg_val_loss = total_val_loss / len(val_loader) train_losses.append(avg_train_loss) val_losses.append(avg_val_loss) print(f"Epoch [{epoch+1}/{num_epochs}] - Train Loss: {avg_train_loss:.4f} - Val Loss: {avg_val_loss:.4f}") # Save best model if avg_val_loss < best_val_loss: best_val_loss = avg_val_loss torch.save(model.state_dict(), "steering_cnn.pth") # Save loss curve (log) plt.figure(figsize=(8, 5)) plt.plot(train_losses, label="Train Loss", marker="o", markersize=3) plt.plot(val_losses, label="Val Loss", marker="o", markersize=3) plt.xlabel("Epoch") plt.ylabel("Loss") plt.title("Training and Validation Loss") plt.yscale("log") # log-scale y axis plt.legend() plt.grid(True, which="both", linestyle="--", alpha=0.6) plt.tight_layout() plt.savefig(os.path.join(output_dir, "loss_curve.png")) plt.close() # Load Best Model model.load_state_dict(torch.load("steering_cnn.pth", map_location=device)) model.eval() # Test eval all_paths = [] all_targets = [] all_preds = [] with torch.no_grad(): for images_batch, angles_batch, paths_batch in test_loader: images_batch = images_batch.to(device) angles_batch = angles_batch.to(device) preds = model(images_batch).squeeze(-1) all_paths.extend(paths_batch) all_targets.extend(angles_batch.cpu().numpy().tolist()) all_preds.extend(preds.cpu().numpy().tolist()) all_targets = np.array(all_targets) all_preds = np.array(all_preds) mse = mean_squared_error(all_targets, all_preds) rmse = np.sqrt(mse) mae = mean_absolute_error(all_targets, all_preds) r2 = r2_score(all_targets, all_preds) print("\n=== TEST METRICS ===") print(f"Test MSE: {mse:.4f}") print(f"Test RMSE: {rmse:.4f}") print(f"Test MAE: {mae:.4f}") print(f"Test R^2: {r2:.4f}") # Baseline: predict mean baseline_preds = np.full_like(all_targets, fill_value=np.mean(all_targets)) baseline_mse = mean_squared_error(all_targets, baseline_preds) baseline_rmse = np.sqrt(baseline_mse) baseline_mae = mean_absolute_error(all_targets, baseline_preds) baseline_r2 = r2_score(all_targets, baseline_preds) print("\n=== BASELINE (predict mean) ===") print(f"Baseline MSE: {baseline_mse:.4f}") print(f"Baseline RMSE: {baseline_rmse:.4f}") print(f"Baseline MAE: {baseline_mae:.4f}") print(f"Baseline R^2: {baseline_r2:.4f}") # Sasve results_df = pd.DataFrame({ "image_path": all_paths, "true_servo": all_targets, "predicted_servo": all_preds, "error": all_preds - all_targets, "abs_error": np.abs(all_preds - all_targets) }) results_df.to_csv(os.path.join(output_dir, "test_predictions.csv"), index=False) print(f"\nSaved predictions to: {os.path.join(output_dir, 'test_predictions.csv')}") # Scatter plot plt.figure(figsize=(6, 6)) plt.scatter(all_targets, all_preds, alpha=0.5) plt.xlabel("True Servo") plt.ylabel("Predicted Servo") plt.title("True vs Predicted Servo") min_val = min(all_targets.min(), all_preds.min()) max_val = max(all_targets.max(), all_preds.max()) plt.plot([min_val, max_val], [min_val, max_val], 'r--') plt.grid(True) plt.tight_layout() plt.savefig(os.path.join(output_dir, "true_vs_predicted.png")) plt.close() # Error histogram errors = all_preds - all_targets plt.figure(figsize=(8, 5)) plt.hist(errors, bins=30, edgecolor="black") plt.xlabel("Prediction Error") plt.ylabel("Count") plt.title("Prediction Error Distribution") plt.grid(True) plt.tight_layout() plt.savefig(os.path.join(output_dir, "error_histogram.png")) plt.close()