Here in the fourth part of our self-driving cars with Carla, Python, TensorFlow, and reinforcement learning project, we're going to be working on coding our actual agent. In the previous tutorial, we worked on the environment class, which our agent will interact with.
As we consider how to create our agent, we have to also consider the model itself. Assuming you've been through the reinforcement learning tutorials (which you had better, otherwise things are about to get very confusing for you), you know that every step an agent takes comes not only with a plausible prediction (depending on exploration/epsilon), but also with a fitment! This means that we're training and predicting at the same time, but it's pretty essential that our agent gets the most FPS (frames per second) possible. We'd also like our trainer to go as quick as possible. To achieve this, we can use either multiprocessing, or threading. Threading allows us to keep things still fairly simple. Later, I will likely at least open-source some multiprocessing code for this task at some point, but, for now, threading it is.
Code up to this point:
import glob import os import sys import random import time import numpy as np import cv2 import math try: sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % ( sys.version_info.major, sys.version_info.minor, 'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) except IndexError: pass import carla SHOW_PREVIEW = False IM_WIDTH = 640 IM_HEIGHT = 480 SECONDS_PER_EPISODE = 10 class CarEnv: SHOW_CAM = SHOW_PREVIEW STEER_AMT = 1.0 im_width = IM_WIDTH im_height = IM_HEIGHT front_camera = None def __init__(self): self.client = carla.Client("localhost", 2000) self.client.set_timeout(2.0) self.world = self.client.get_world() self.blueprint_library = self.world.get_blueprint_library() self.model_3 = self.blueprint_library.filter("model3")[0] def reset(self): self.collision_hist = [] self.actor_list = [] self.transform = random.choice(self.world.get_map().get_spawn_points()) self.vehicle = self.world.spawn_actor(self.model_3, self.transform) self.actor_list.append(self.vehicle) self.rgb_cam = self.blueprint_library.find('sensor.camera.rgb') self.rgb_cam.set_attribute("image_size_x", f"{self.im_width}") self.rgb_cam.set_attribute("image_size_y", f"{self.im_height}") self.rgb_cam.set_attribute("fov", f"110") transform = carla.Transform(carla.Location(x=2.5, z=0.7)) self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle) self.actor_list.append(self.sensor) self.sensor.listen(lambda data: self.process_img(data)) self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0)) time.sleep(4) colsensor = self.blueprint_library.find("sensor.other.collision") self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle) self.actor_list.append(self.colsensor) self.colsensor.listen(lambda event: self.collision_data(event)) while self.front_camera is None: time.sleep(0.01) self.episode_start = time.time() self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0)) return self.front_camera def collision_data(self, event): self.collision_hist.append(event) def process_img(self, image): i = np.array(image.raw_data) #print(i.shape) i2 = i.reshape((self.im_height, self.im_width, 4)) i3 = i2[:, :, :3] if self.SHOW_CAM: cv2.imshow("", i3) cv2.waitKey(1) self.front_camera = i3 def step(self, action): if action == 0: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT)) elif action == 1: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer= 0)) elif action == 2: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT)) v = self.vehicle.get_velocity() kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2)) if len(self.collision_hist) != 0: done = True reward = -200 elif kmh < 50: done = False reward = -1 else: done = False reward = 1 if self.episode_start + SECONDS_PER_EPISODE < time.time(): done = True return self.front_camera, reward, done, None
Now, we're going to create a new class, the DQNAgent
class. We'll start with:
class DQNAgent: def __init__(self): self.model = self.create_model() self.target_model = self.create_model() self.target_model.set_weights(self.model.get_weights())
This is the same concept as from the reinforcement learning tutorials, where we have a main network, which is constantly evolving, and then the target
network, which we update every n
things
, where n
is whatever you want and things
is something like steps or episodes.
As we train, we train from randomly selected data from our replay memory
:
self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE)
For the same reasons as before (the RL tutorial), we will be modifying TensorBoard:
self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/{MODEL_NAME}-{int(time.time())}")
Now we can wrap up our init
method by setting some final values:
self.target_update_counter = 0 # will track when it's time to update the target model self.graph = tf.get_default_graph() self.terminate = False # Should we quit? self.last_logged_episode = 0 self.training_initialized = False # waiting for TF to get rolling
We're going to use elf.training_initialized
to track when TensorFlow is ready to get going. The first predictions/fitments when a model begins take extra long, so we're going to just pass some nonsense information initially to prime our model to actually get moving.
Now let's create our model:
def create_model(self): base_model = Xception(weights=None, include_top=False, input_shape=(IM_HEIGHT, IM_WIDTH,3)) x = base_model.output x = GlobalAveragePooling2D()(x) predictions = Dense(3, activation="linear")(x) model = Model(inputs=base_model.input, outputs=predictions) model.compile(loss="mse", optimizer=Adam(lr=0.001), metrics=["accuracy"]) return model
Here, we're just going to make use of the premade Xception
model, but you could make some other model, or import a different one. Note that we're adding GlobalAveragePooling
to our ouput layer, as well as obviously adding the 3 neuron output that is each possible action for the agent to take. Let's make some required imports for this method:
from keras.applications.xception import Xception from keras.layers import Dense, GlobalAveragePooling2D from keras.optimizers import Adam from keras.models import Model
We need a quick method in our DQNAgent
for updating replay memory:
def update_replay_memory(self, transition): # transition = (current_state, action, reward, new_state, done) self.replay_memory.append(transition)
As long as our method is this simple, we probably don't even need a method for this, but I'll leave that for now.
Now to the train
method. To begin, we only want to train if we have a bare minimum of samples in replay memory:
def train(self): if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE: return
Since we have a lot of constants like this, let's go ahead and just slap all of them in at once before I forget:
REPLAY_MEMORY_SIZE = 5_000 MIN_REPLAY_MEMORY_SIZE = 1_000 MINIBATCH_SIZE = 16 PREDICTION_BATCH_SIZE = 1 TRAINING_BATCH_SIZE = MINIBATCH_SIZE // 4 UPDATE_TARGET_EVERY = 5 MODEL_NAME = "Xception" MEMORY_FRACTION = 0.8 MIN_REWARD = -200 EPISODES = 100 DISCOUNT = 0.99 epsilon = 1 EPSILON_DECAY = 0.95 ## 0.9975 99975 MIN_EPSILON = 0.001 AGGREGATE_STATS_EVERY = 10
Put those at the top of the script, obviously, outside of all classes/functions/methods...etc. If at any point you're confused about where things go, you can check the end of the tutorial to see code up to that point.
Back to our train method:
def train(self): if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE: return
If we don't have enough samples, we'll just return and be done. If we do, then we will begin our training. First, we need to grab a random minibatch:
minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE)
Once we have our minibatch, we want to grab our current and future q values
current_states = np.array([transition[0] for transition in minibatch])/255 with self.graph.as_default(): current_qs_list = self.model.predict(current_states, PREDICTION_BATCH_SIZE) new_current_states = np.array([transition[3] for transition in minibatch])/255 with self.graph.as_default(): future_qs_list = self.target_model.predict(new_current_states, PREDICTION_BATCH_SIZE)
Recall a transition is: transition = (current_state, action, reward, new_state, done)
Now, we create our inputs (X) and outputs (y):
X = [] y = [] for index, (current_state, action, reward, new_state, done) in enumerate(minibatch): if not done: max_future_q = np.max(future_qs_list[index]) new_q = reward + DISCOUNT * max_future_q else: new_q = reward current_qs = current_qs_list[index] current_qs[action] = new_q X.append(current_state) y.append(current_qs)
Nothing new or fancy here, pretty much identical to our reinforcement learning tutorials (DQN).
log_this_step = False if self.tensorboard.step > self.last_logged_episode: log_this_step = True self.last_log_episode = self.tensorboard.step
We're only trying to log per episode, not actual training step, so we're going to use the above to keep track.
Next, we'll fit:
with self.graph.as_default(): self.model.fit(np.array(X)/255, np.array(y), batch_size=TRAINING_BATCH_SIZE, verbose=0, shuffle=False, callbacks=[self.tensorboard] if log_this_step else None)
Notice that we're setting the tensorboard callback, only if log_this_step
is true. If it's false, then we'll still fit, we just wont log to TensorBoard.
Next, we want to continue tracking for logging:
if log_this_step: self.target_update_counter += 1
Finally, we'll check to see if it's time to update our target_model
:
if self.target_update_counter > UPDATE_TARGET_EVERY: self.target_model.set_weights(self.model.get_weights()) self.target_update_counter = 0
Now we just need 2 more methods and we're complete with our agent class. First, we need a method to get q values (basically to make a prediction)
def get_qs(self, state): return self.model.predict(np.array(state).reshape(-1, *state.shape)/255)[0]
Finally, we just need to actually do training:
def train_in_loop(self): X = np.random.uniform(size=(1, IM_HEIGHT, IM_WIDTH, 3)).astype(np.float32) y = np.random.uniform(size=(1, 3)).astype(np.float32) with self.graph.as_default(): self.model.fit(X,y, verbose=False, batch_size=1) self.training_initialized = True
To start, we use some random data like above to initialize, then we begin our infinite loop:
while True: if self.terminate: return self.train() time.sleep(0.01)
Alright, that's our DQNAgent
class. Full code up to this point:
import glob import os import sys import random import time import numpy as np import cv2 import math from collections import deque from keras.applications.xception import Xception from keras.layers import Dense, GlobalAveragePooling2D from keras.optimizers import Adam from keras.models import Model try: sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % ( sys.version_info.major, sys.version_info.minor, 'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) except IndexError: pass import carla SHOW_PREVIEW = False IM_WIDTH = 640 IM_HEIGHT = 480 SECONDS_PER_EPISODE = 10 REPLAY_MEMORY_SIZE = 5_000 MIN_REPLAY_MEMORY_SIZE = 1_000 MINIBATCH_SIZE = 16 PREDICTION_BATCH_SIZE = 1 TRAINING_BATCH_SIZE = MINIBATCH_SIZE // 4 UPDATE_TARGET_EVERY = 5 MODEL_NAME = "Xception" MEMORY_FRACTION = 0.8 MIN_REWARD = -200 EPISODES = 100 DISCOUNT = 0.99 epsilon = 1 EPSILON_DECAY = 0.95 ## 0.9975 99975 MIN_EPSILON = 0.001 AGGREGATE_STATS_EVERY = 10 class CarEnv: SHOW_CAM = SHOW_PREVIEW STEER_AMT = 1.0 im_width = IM_WIDTH im_height = IM_HEIGHT front_camera = None def __init__(self): self.client = carla.Client("localhost", 2000) self.client.set_timeout(2.0) self.world = self.client.get_world() self.blueprint_library = self.world.get_blueprint_library() self.model_3 = self.blueprint_library.filter("model3")[0] def reset(self): self.collision_hist = [] self.actor_list = [] self.transform = random.choice(self.world.get_map().get_spawn_points()) self.vehicle = self.world.spawn_actor(self.model_3, self.transform) self.actor_list.append(self.vehicle) self.rgb_cam = self.blueprint_library.find('sensor.camera.rgb') self.rgb_cam.set_attribute("image_size_x", f"{self.im_width}") self.rgb_cam.set_attribute("image_size_y", f"{self.im_height}") self.rgb_cam.set_attribute("fov", f"110") transform = carla.Transform(carla.Location(x=2.5, z=0.7)) self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle) self.actor_list.append(self.sensor) self.sensor.listen(lambda data: self.process_img(data)) self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0)) time.sleep(4) colsensor = self.blueprint_library.find("sensor.other.collision") self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle) self.actor_list.append(self.colsensor) self.colsensor.listen(lambda event: self.collision_data(event)) while self.front_camera is None: time.sleep(0.01) self.episode_start = time.time() self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0)) return self.front_camera def collision_data(self, event): self.collision_hist.append(event) def process_img(self, image): i = np.array(image.raw_data) #print(i.shape) i2 = i.reshape((self.im_height, self.im_width, 4)) i3 = i2[:, :, :3] if self.SHOW_CAM: cv2.imshow("", i3) cv2.waitKey(1) self.front_camera = i3 def step(self, action): if action == 0: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT)) elif action == 1: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer= 0)) elif action == 2: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT)) v = self.vehicle.get_velocity() kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2)) if len(self.collision_hist) != 0: done = True reward = -200 elif kmh < 50: done = False reward = -1 else: done = False reward = 1 if self.episode_start + SECONDS_PER_EPISODE < time.time(): done = True return self.front_camera, reward, done, None class DQNAgent: def __init__(self): self.model = self.create_model() self.target_model = self.create_model() self.target_model.set_weights(self.model.get_weights()) self.replay_memory = deque(maxlen=REPLAY_MEMORY_SIZE) self.tensorboard = ModifiedTensorBoard(log_dir=f"logs/{MODEL_NAME}-{int(time.time())}") self.target_update_counter = 0 self.graph = tf.get_default_graph() self.terminate = False self.last_logged_episode = 0 self.training_initialized = False def create_model(self): base_model = Xception(weights=None, include_top=False, input_shape=(IM_HEIGHT, IM_WIDTH,3)) x = base_model.output x = GlobalAveragePooling2D()(x) predictions = Dense(3, activation="linear")(x) model = Model(inputs=base_model.input, outputs=predictions) model.compile(loss="mse", optimizer=Adam(lr=0.001), metrics=["accuracy"]) return model def update_replay_memory(self, transition): # transition = (current_state, action, reward, new_state, done) self.replay_memory.append(transition) def train(self): if len(self.replay_memory) < MIN_REPLAY_MEMORY_SIZE: return minibatch = random.sample(self.replay_memory, MINIBATCH_SIZE) current_states = np.array([transition[0] for transition in minibatch])/255 with self.graph.as_default(): current_qs_list = self.model.predict(current_states, PREDICTION_BATCH_SIZE) new_current_states = np.array([transition[3] for transition in minibatch])/255 with self.graph.as_default(): future_qs_list = self.target_model.predict(new_current_states, PREDICTION_BATCH_SIZE) X = [] y = [] for index, (current_state, action, reward, new_state, done) in enumerate(minibatch): if not done: max_future_q = np.max(future_qs_list[index]) new_q = reward + DISCOUNT * max_future_q else: new_q = reward current_qs = current_qs_list[index] current_qs[action] = new_q X.append(current_state) y.append(current_qs) log_this_step = False if self.tensorboard.step > self.last_logged_episode: log_this_step = True self.last_log_episode = self.tensorboard.step with self.graph.as_default(): self.model.fit(np.array(X)/255, np.array(y), batch_size=TRAINING_BATCH_SIZE, verbose=0, shuffle=False, callbacks=[self.tensorboard] if log_this_step else None) if log_this_step: self.target_update_counter += 1 if self.target_update_counter > UPDATE_TARGET_EVERY: self.target_model.set_weights(self.model.get_weights()) self.target_update_counter = 0 def get_qs(self, state): return self.model.predict(np.array(state).reshape(-1, *state.shape)/255)[0] def train_in_loop(self): X = np.random.uniform(size=(1, IM_HEIGHT, IM_WIDTH, 3)).astype(np.float32) y = np.random.uniform(size=(1, 3)).astype(np.float32) with self.graph.as_default(): self.model.fit(X,y, verbose=False, batch_size=1) self.training_initialized = True while True: if self.terminate: return self.train() time.sleep(0.01)
Now, in the next tutorial, we'll put the finishing touches on things and actually run to see what we've got!