Solving the Cambridge A14 Milton Road Roundabout Using Reinforcement Learning — Part 1
Introduction
Like most people, I imagine one of life’s greatest frustrations is traffic. For me, I think this is currently the bane of my existence. The road works in Cambridge are at an all-time high at the moment and traffic is awful. This and my increasing awareness of the climate have caused me to cycle more to work, even at -2 degrees (celsius) I would rather cycle and deal with numb fingers than spend up to 40 minutes in start-stop traffic commuting through or around Cambridge to work.
One of my main grievances at the moment when driving to work is the roundabout on Milton Road leading towards the A14. In theory, this roundabout should cause me no trouble at all, it is a simple traffic light roundabout, with traffic lights on each entrance and traffic lights in the middle before the exits. However, after driving through this roundabout for over a year and a half I have consistently noticed one thing: no car can enter and exit the roundabout without stopping at a traffic light in the middle.
I’ve only recently noticed this, but ever since it has been driving me mad. I often wonder how such a poor system could be created and also how much time is wasted. I did the maths to find out. Let’s say I wait 1 minute in the middle of this roundabout per day. I commute say 4 days a week (1 day work from home or cycle into work) which is 4 minutes a week. I work 48 weeks a year, that is 192 minutes a year, or 3.2 hours a year sitting waiting for no reason at a roundabout. Imagine if this roundabout was efficient, not only could we help ease congestion by keeping the flow of traffic moving, but we are now giving everybody would drives on this roundabout 3 hours of their lives back every year!
Instead of just moaning about this problem (which I still do by the way) I decided to try and solve this problem using one of my favourite deep learning algorithms: Deep Q Reinforcement Learning.
As a side note, before I start, I think this is one of the coolest parts of Engineering. Finding an annoying problem and using theoretical knowledge to solve the problem. The solution to this problem is what Tony Fadell (the creator of the iPod) called a painkiller, it eases an annoyance rather than just being healthy for you (solutions that do this are called vitamins). Take the iPod for example, this solved the pain of carrying around 10 CDs (1000 songs) with you. Similarly, this solution solves the pain and frustration of waiting in a roundabout for no reason every day.
The solution
To aid my design, I have modelled the roundabout as seen in the figure below. A roundabout has a certain number of exits and entrances. Each entrance is governed by a traffic light and there are the same number of traffic lights in the centre of the roundabout as there are entrances. To make sure cars do not crash if a traffic light in the centre of the roundabout is on, the next clockwise entrance traffic light must be off and vice versa.
For a given car, if the car enters the roundabout within a given time of the entrance traffic light turning green, it should exit without waiting at a traffic light inside the roundabout.
Note that the code here is still very much in development, so will look quite janky at the moment. As the project progresses my aim to to clean this up.
Modelling the Objects
To start this project, we first model a traffic light using a simple Python class. Each traffic light is paired with another traffic light. That is if it is at an entrance, it is paired to the next clockwise traffic light in the roundabout and if it is in the roundabout it is paired to the next clockwise entrance traffic light.
We also denote a blocked traffic light, that is a traffic light that must be turned off if the current traffic light object is turned on. If a given entrance traffic light is turned on, the nearest anti-clockwise traffic light must be turned off.
We have variables denoting the paired traffic light, the blocked traffic light, its current state (on or off) and whether it is in the roundabout or not.
We have simple getter and setter methods for these variables and methods to turn the light on and off. Note when we set the traffic light to the on state, we must also set the blocked traffic light to off to prevent crashes.
class TrafficLight():
def __init__(self, paired_traffic_light, blocking_traffic_light) -> None:
self.isOn = False
self.paired_traffic_light = paired_traffic_light
self.blocking_traffic_light = blocking_traffic_light
self.is_in_roundabout = False
def setPairedTrafficLight(self, paired_traffic_light):
self.paired_traffic_light = paired_traffic_light
def setBlockingTrafficLight(self, blocking_traffic_light):
self.blocking_traffic_light = blocking_traffic_light
def getState(self):
return self.isOn
def turn_on(self):
self.isOn = True
self.blocking_traffic_light.turn_off()
def turn_off(self):
self.isOn = False
We also model a car with a class object. We include the following variables for a car:
- Is_waiting — Boolean variable to denote if the car is waiting at a traffic light
- Curr_traffic_light — The current traffic light the car is at (TrafficLight object)
- Exit — Integer variable to denote the destined exit of the car
- Exited — Boolean variable to denote if the car has exited the roundabout
- Has_stopped_in_roundabout — Boolean variable to denote if the var has stopped at a traffic light in the roundabout
We include getter and setter methods for these variables. For the exit variable, this is randomised when we start an episode as seen below.
class Car():
def __init__(self) -> None:
self.is_waiting = True
self.curr_traffic_light = None
self.exit = None
self.exited = False
self.has_stopped_in_roundabout = False
def set_exit_to_take(self, num_exits, starting_traffic_light_num):
exit_to_take = random.randint(0, num_exits-1)
while exit_to_take == starting_traffic_light_num:
exit_to_take = random.randint(0, num_exits-1)
self.exit = exit_to_take
Lastly, we create a roundabout object. This creates the traffic light instances and assigns the blocked and paired traffic instances to each traffic light. (Note I think this is the best class name I have ever created!)
class A14MiltonRoadRoundabout():
def __init__(self, num_traffic_lights, max_exit_time) -> None:
self.traffic_lights_enter = [TrafficLight(None, None) for i in range(num_traffic_lights)]
self.traffic_lights_roundabout = [TrafficLight(None, None) for i in range(num_traffic_lights)]
for trafficlight in self.traffic_lights_roundabout:
trafficlight.is_in_roundabout = True
self.traffic_lights = self.traffic_lights_enter + self.traffic_lights_roundabout
traffic_lights_enter_modified = self.traffic_lights_enter[1:]
traffic_lights_enter_modified.append(self.traffic_lights_enter[0])
traffic_lights_roundabout_modified = [self.traffic_lights_roundabout[-1]] + self.traffic_lights_roundabout[:-1]
for i in range(num_traffic_lights):
self.traffic_lights_enter[i].setPairedTrafficLight(self.traffic_lights_roundabout[i])
self.traffic_lights_enter[i].setBlockingTrafficLight(traffic_lights_roundabout_modified[i])
self.traffic_lights_roundabout[i].setPairedTrafficLight(traffic_lights_enter_modified[i])
self.traffic_lights_roundabout[i].setBlockingTrafficLight(traffic_lights_enter_modified[i])
Creating a simulation
We now turn our attention to creating a car simulation in which actions will be taken (again as mentioned above, this code is janky and will be improved as the project goes on). We first create a reset_simulation method which resets the car object and assigns a random traffic light and exit to each car.
class CarSimulation():
def __init__(self, num_cars, num_traffic_lights) -> None:
self.num_cars = num_cars
self.num_exits = num_traffic_lights
self.num_traffic_lights = num_traffic_lights*2
self.cars = None
self.traffic_lights = None
def reset_simulation(self, traffic_lights):
self.cars = [traffic_light.Car() for _ in range(self.num_cars)]
self.traffic_lights = traffic_lights
for idx, car in enumerate(self.cars):
self.cars[idx].set_current_traffic_light(traffic_lights[random.randint(0, (self.num_traffic_lights/2)-1)])
self.cars[idx].set_exit_to_take(self.num_exits, self.traffic_lights.index(self.cars[idx].get_current_traffic_light()))
In our main function for running the simulation, we simply loop through all the cars and check if the light attached to it has turned on. If so, we move the car to the next traffic light and update the relevant variables. If the car has stopped in the roundabout, we make sure to update that variable too.
def run_simulation(self):
while True:
for car in self.cars:
while car.get_current_traffic_light().getState() and not car.get_is_exited():
...
car.set_current_traffic_light(car.get_current_traffic_light().paired_traffic_light)
if not car.get_current_traffic_light().getState() and car.get_current_traffic_light().is_in_roundabout:
car.has_stopped_in_roundabout = True
if self.all_cars_exit():
return True
We also run a check to see if the car has found its exit yet. If the car is currently at the roundabout traffic light that is next to its exit and that traffic light is on, we set the is_exited variable to True.
def run_simulation(self):
while True:
for car in self.cars:
while car.get_current_traffic_light().getState() and not car.get_is_exited():
...
if car.get_current_traffic_light().is_in_roundabout:
idx_current_car = self.traffic_lights.index(car.get_current_traffic_light().paired_traffic_light)
if car.get_exit_to_take() == idx_current_car and car.get_current_traffic_light().getState():
car.set_is_exited(True)
continue
...
Lastly, we create a simple function to print out the current state of the cars and traffic lights so that we can monitor the environment as the agent takes action.
def watch_simulation(self):
while True:
time.sleep(7)
for idx, traffic_light in enumerate(self.traffic_lights):
traffic_light_status = 'on' if traffic_light.getState() else 'off'
traffic_light_roundabout = traffic_light.is_in_roundabout
print('Traffic Light {} is {} and is in roundabout? {}'.format(idx, traffic_light_status, traffic_light_roundabout))
for idx, car in enumerate(self.cars):
traffic_light = self.traffic_lights.index(car.get_current_traffic_light())
traffic_light_status = car.get_current_traffic_light().getState()
traffic_light_status = 'on' if traffic_light_status else 'off'
print('Car {} is currently at traffic light {}, has exit {}, which is {} and has exited? {}'.format(idx, traffic_light, car.get_exit_to_take(), traffic_light_status, car.get_is_exited()))
if self.all_cars_exit():
return True
Creating the environment
The environment is as expected, it sets up the roundabout, cars, traffic lights and simulation. We simulate in a separate thread so that we can take action on it and get rewards without needing to pause it.
class Environment():
def __init__(self, num_traffic_lights, max_num_cars) -> None:
self.num_traffic_lights = num_traffic_lights
self.max_num_cars = max_num_cars
self.car_simulation = None
self.threads = []
def reset_env(self):
for thread in self.threads:
thread.join()
self.threads = []
num_cars = random.randint(1, self.max_num_cars)
self.car_simulation = CarSimulation(num_cars, self.num_traffic_lights)
self.roundabout = traffic_light.A14MiltonRoadRoundabout(num_traffic_lights = self.num_traffic_lights, max_exit_time = self.max_exit_time)
self.car_simulation.reset_simulation(self.roundabout.traffic_lights)
#Start running the simulation in a seperate thread
thread = threading.Thread(target=self.car_simulation.watch_simulation, daemon=True)
thread.start()
self.threads.append(thread)
thread = threading.Thread(target=self.car_simulation.run_simulation, daemon=True)
thread.start()
self.threads.append(thread)
The state of the environment is set to the current status of the traffic lights (if they are on or off), the number of cars waiting at an entrance, the number of cars waiting at a roundabout and the number of cars that have exited.
def get_state(self):
traffic_light_array = []
for traffic_light in self.roundabout.traffic_lights:
if traffic_light.getState():
traffic_light_array.append(1)
else:
traffic_light_array.append(0)
num_cars_waiting_at_start = 0
num_cars_waiting_in_roundabout = 0
num_cars_exited = 0
for car in self.car_simulation.getCars():
if car.get_is_exited():
num_cars_exited += 1
elif car.get_current_traffic_light().is_in_roundabout:
num_cars_waiting_in_roundabout += 1
else:
num_cars_waiting_at_start += 1
return traffic_light_array + [num_cars_waiting_at_start, num_cars_waiting_in_roundabout, num_cars_exited]
Our “done” state is simply if all cars have exited.
def is_done(self):
for car in self.car_simulation.getCars():
if car.get_is_exited() == False:
return False
return True
For now, we return a very simple reward state, if a car has exited and has not stopped at a roundabout light we return 10. If it has exited and has stopped at a roundabout light we return 5. If it is stuck at an entrance we return 0 and lastly if is it suck at a roundabout light we return -1.
def compute_reward(self):
total_reward = 0
for car in self.car_simulation.getCars():
if car.get_is_exited() and not car.has_stopped_in_roundabout:
total_reward += 10
elif car.get_is_exited():
total_reward += 5
elif car.get_current_traffic_light().is_in_roundabout:
total_reward += -1
else:
total_reward += 0
return total_reward
Creating the network
We now turn our attention to the network used. This is a simple feed-forward network that takes in the state and outputs the action. What is interesting here is that we have essentially 2 actions to occur, the traffic light to take the action on and if that traffic light should be turned on or off. We have 2 output layers for these 2 actions.
class DQN(torch.nn.Module):
def __init__(self, input_dim, output_dim_traffic_light, output_dim_on_off, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.input_dim = input_dim
self.layer1 = nn.Linear(input_dim,64)
self.layer2 = nn.Linear(64, 128)
self.layer3 = nn.Linear(128, 256)
self.layer4 = nn.Linear(256, 512)
self.layer5 = nn.Linear(512, 512)
self.output_1 = nn.Linear(512, output_dim_traffic_light)
self.output_2 = nn.Linear(512, output_dim_on_off)
def forward(self, x):
x = F.relu6(self.layer1(x))
x = F.relu6(self.layer2(x))
x = F.relu6(self.layer3(x))
x = F.relu6(self.layer4(x))
x = F.relu6(self.layer5(x))
output_1 = self.output_1(x)
output_2 = self.output_2(x)
return output_1, output_2
Creating the agent
The code for the agent is largely the same as in my previous projects (see this blog) for a detailed explanation and code for the agent. The only difference here is that we have to deal with the fact that now 2 actions are returned from the neural network.
To handle this, we create Q values for each output and optimise these in the same way as before, using separate loss functions for each Q value.
def optimize_model(self):
...
state_output_1, state_output_2 = self.policy_net(state)
state_action_values_action_1 = state_output_1.gather(1, action_1)
state_action_values_action_2 = state_output_2.gather(1, action_2)
next_state_output_1, next_state_output_2 = self.target_net(non_final_next_states)
next_state_action_values_action_1[non_final_mask] = next_state_output_1.max(1)[0]
next_state_action_values_action_2[non_final_mask] = next_state_output_2.max(1)[0]
expected_state_action_values_action_1 = (next_state_action_values_action_1 * self.GAMMA) + reward
expected_state_action_values_action_2 = (next_state_action_values_action_2 * self.GAMMA) + reward
loss_fn_1 = torch.nn.SmoothL1Loss()
loss_fn_2 = torch.nn.SmoothL1Loss()
loss1 = loss_fn_1(state_action_values_action_1, expected_state_action_values_action_1.unsqueeze(1))
loss2 = loss_fn_2(state_action_values_action_2, expected_state_action_values_action_2.unsqueeze(1))
loss = loss1 + loss2
self.optimizer.zero_grad()
loss.backward()
The Results
The results for the first run of this were mixed. We set up the first run so that we have 8 traffic lights at the roundabout (4 entrance lights, and 4 roundabout lights) and one car. Plotting durations (number of actions taken to reach the exit state) vs episodes, we see that durations increase as the episodes increase. This is not what we expect. As the number of episodes increases, the epsilon value decreases and the agent starts to take more and more greedy actions. Therefore, we should see the model start to learn and the number of durations decrease.
However, taking a deeper look at the data we can see that the number of times the reward after taking the action is -1 decreases sharply over the training run. The number of times the reward is 0 increases sharply over the training run. This shows that the agent is learning, it is keeping the car at the entrance to the roundabout for more actions (steps) as it knows that moving it into the roundabout is likely to result in a negative reward.
To be Continued
The next steps of this project therefore must be to construct a better reward function that encourages the agent to bring the car into the roundabout. This will be detailed in an upcoming blog.
The current codebase can be found here: GitHub Link