import math import gpxpy import gpxpy.gpx from geopy.distance import geodesic import neat import numpy import os import pickle import multiprocessing import matplotlib.pyplot as plt import random config = None # Helper - cube root function that handles negative numbers correctly def cbrt(x): return math.copysign(abs(x)**(1/3), x) # Constants for velocity calculations W = 73 C_dA = 0.3 Rho = 1.225 def get_velocity(P, G): # Coefficients for cubic func. a = 0.5 * C_dA * Rho b = 0.0 c = 9.8067 * W * math.sin(math.atan(G/100)) d = -P # Coefficients for Cardano's p = (3*a*c - b*b) / (3*a*a) q = (2*b**3 - 9*a*b*c + 27*a*a*d) / (27*a*a*a) # Discriminant - determines how many roots the cubic has Δ = (q*q)/4 + (p*p*p)/27 if Δ >= 0: # one real root S = cbrt(-q/2 + math.sqrt(Δ)) T = cbrt(-q/2 - math.sqrt(Δ)) x = S + T - b/(3*a) return x else: # three real roots - use trig method r = math.sqrt(-(p**3)/27) φ = math.acos(-q/(2*r)) r = (-p/3)**0.5 # Find the three roots and return largest (all others are negative) x1 = 2*r*math.cos(φ/3) - b/(3*a) x2 = 2*r*math.cos((φ + 2*math.pi)/3) - b/(3*a) x3 = 2*r*math.cos((φ + 4*math.pi)/3) - b/(3*a) return max(x1, x2, x3) # Constants for fatigue model cp = 250.0 Wprime = 20000.0 t = 0.0 S = 0.0 Wbal = Wprime rec_time = 0.0 rec_total_power = 0.0 def get_Wbal(P, t, S, rec_time, rec_total_power): # Can't use global variables with multiprocessing if P >= cp: Wexp = (P-cp) else: rec_total_power += P Wexp = 0 rec_time += 1 Dcp = cp - (rec_total_power / rec_time) tau = 546 * math.exp(-0.01 * Dcp) + 316 S += Wexp * math.exp(t / tau) # Running sum is kept - used to calculate Wbal at each time step Wbal = Wprime - 1/2 * S * math.exp(-t / tau) return Wbal, S, rec_time, rec_total_power # Variables passed back to genome to be stored # Helper - get slope between two points def get_slope(lat1, long1, alt1, lat2, long2, alt2): point1 = (lat1, long1) point2 = (lat2, long2) distance = geodesic(point1, point2).meters d_alt = (alt1-alt2) if distance < 0.5: return(0) else: return(round((d_alt/distance)*100, 2)) # Helper - get distance in meters between two points def get_distance(lat1, long1, lat2, long2): point1 = (lat1, long1) point2 = (lat2, long2) distance = geodesic(point1, point2).meters return distance # Create course segments from GPX file def make_course_segments(gpx_file_path): gpx_file = open(gpx_file_path, 'r') gpx = gpxpy.parse(gpx_file) # Give GPX file to gpxpy library course_segments = [] # Iterate through every point and save segments as distance and slope tuples for track in gpx.tracks: for segment in track.segments: for i, point in enumerate(segment.points): if i == 0: prev_point = point continue distance = get_distance(prev_point.latitude, prev_point.longitude, point.latitude, point.longitude) slope = get_slope(prev_point.latitude, prev_point.longitude, prev_point.elevation, point.latitude, point.longitude, point.elevation) course_segments.append((distance, slope)) prev_point = point return course_segments total_distance = None def resample_segments(segments, N=100): total_distance = sum(length for length, _ in segments) # segments = [(length, slope), ...] # N is number of output segments # Expand input into total distance array cumulative = [] slopes = [] total = 0.0 for length, slope in segments: total += length cumulative.append(total) slopes.append(slope) # Length of each new segment target_len = total / N output = [] current_pos = 0.0 # Helper: get slope at an distance into the course def slope_at(distance): # Find which segment distance falls in lo, hi = 0, len(cumulative) - 1 while lo < hi: mid = (lo + hi) // 2 if cumulative[mid] < distance: lo = mid + 1 else: hi = mid return slopes[lo] # Generate N new equal-length segments for i in range(N): start = current_pos end = min(total, current_pos + target_len) # Sample 20 points and average slope (distance-weighted) samples = 20 xs = [start + (end - start) * (j / (samples - 1)) for j in range(samples)] avg_slope = sum(slope_at(x) for x in xs) / samples segment = [target_len, avg_slope] output.append(segment) current_pos = end return output # NEAT evaluation function standardized_segments = None def make_all_courses(directory): courses = [] for filename in os.listdir(directory): if filename.endswith(".gpx"): course = make_course_segments(os.path.join(directory, filename)) course = resample_segments(course, N=100) courses.append(course) return courses def eval_genome(genome, config): global generation, plot_data # Create neural network for genome net = neat.nn.FeedForwardNetwork.create(genome, config) course_fitness = 0 for course, total_distance, course_num in zip(courses, total_distances, range(len(courses))): #reset stuff t = 0.0 S = 0.0 Wbal = Wprime rec_time = 0.0 rec_total_power = 0.0 segments = course.copy() distance_remaining = total_distance distance_travelled = 0 total_distance_travelled = 0 power = cp d_powers = [] powers = [] while distance_remaining > 0 and t <= 2500 and segments: # Get current grade current_grade = segments[0][1] flat_segments = [slope for _, slope in segments] avg_slope = sum(flat_segments) / len(flat_segments) if flat_segments else 0 # Run network at time inputs = [distance_remaining/10000, Wbal/10000, current_grade/20, avg_slope/20] output = net.activate(inputs)[0] power = 750 * output power = min(abs(power), (cp + Wbal)) # Update variables v = get_velocity(power, current_grade) distance_remaining -= v distance_travelled += v total_distance_travelled += v Wbal_vars = get_Wbal(power, t, S, rec_time, rec_total_power) Wbal = Wbal_vars[0] S = Wbal_vars[1] rec_time = Wbal_vars[2] rec_total_power = Wbal_vars[3] t += 1 #remove completed segments if distance_travelled >= segments[0][0]: distance_travelled -= segments[0][0] segments.pop(0) d_powers.append(total_distance_travelled) powers.append(power) course_fitness += total_distance_travelled - t if course_num == 1: plot_data.append((d_powers, powers)) return(course_fitness / len(courses)) generation = 1 # Eval function for whole generation - required for multiprocessing def eval_genomes(genomes, config): global generation, plot_data # Manager needed to pass graph data from genomes from multiprocessing import Manager manager = Manager() plot_data = manager.list() pe = neat.ParallelEvaluator(multiprocessing.cpu_count(), eval_genome) pe.evaluate(genomes, config) # Graph every 10 gens if generation % 10 == 0 or generation == 1: for d_powers, powers in plot_data: prev_power = None real = False # Only graph if power increases at some point (filters out go hard then die strategy) for power in powers: if (prev_power is not None and power - prev_power > 1): real = True prev_power = power if real: plt.plot(d_powers, powers, alpha = 0.03, color = 'black') plt.xlim(0, total_distances[1]) filepath = f'Graphs/LCloop_{generation}.png' plt.savefig(filepath) plt.clf() generation += 1 if __name__ == "__main__": # Initial setup needs to be here for multiprocessing to work multiprocessing.set_start_method('fork') # NEAT conifg local_dir = os.path.dirname(__file__) config_path = os.path.join(local_dir, 'config-pacing') config = neat.Config( neat.DefaultGenome, neat.DefaultReproduction, neat.DefaultSpeciesSet, neat.DefaultStagnation, config_path ) config.save('config-pacing') # Course setup courses = make_all_courses('GPXs') total_distances = [] for course in courses: total_distance = sum([seg[0] for seg in course]) total_distances.append(total_distance) plot_data = None # NEAT setup p = neat.Population(config) p.add_reporter(neat.StdOutReporter(True)) # Used to seed population with winner of previous run - uncomment to use #with open("winner.pkl", "rb") as f: # loaded_genome = pickle.load(f) #loaded_genome.key = 0 #population = {0: loaded_genome} #for i in range(1, config.pop_size): # Create mutated clones of initial_genome - STAY COMMENTED # g = loaded_genome.__class__(i) # g.configure_crossover(loaded_genome, loaded_genome, config.genome_config) # g.mutate(config.genome_config) # population[i] = g #p.population = population #p.species.speciate(config, p.population, p.generation) winner = p.run(eval_genomes, 500) # Save the winner print('\nBest genome:\n{!s}'.format(winner)) with open("winner.pkl", "wb") as f: pickle.dump(winner, f)