Welcome to part 3 of the Carla autonomous/self-driving car with Python programming tutorials. In this tutorial, we're going to take our knowledge of the Carla API, and try to convert this problem to a reinforcement learning problem.
After OpenAI pioneered the open sourcing of reinforcement learning environments and solutions, we sort of wound up with a standardized way to approaching reinforcement learning environments.
The idea here is that your environment will have a step
method, which returns: observation, reward, done, extra_info
, as well as a reset
method that will restart the environment based on some sort of done
flag.
Okay, so all we need to do is create the code to represent this. We'll start with our usual Carla import stuff:
import glob import os import sys try: sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % ( sys.version_info.major, sys.version_info.minor, 'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) except IndexError: pass import carla
Next, we're going to create environment's class, which we'll call CarEnv
. It will be convenient to have some constants at the top of our script for the model, agent, and environment, so the first thing I'll do is create our first constant, which will be SHOW PREVIEW
:
SHOW_PREVIEW = False
Now let's set some initial values for our environment class:
class CarEnv: SHOW_CAM = SHOW_PREVIEW STEER_AMT = 1.0 im_width = IMG_WIDTH im_height = IMG_HEIGHT actor_list = [] front_camera = None collision_hist = []
I think these are fairly self-explanatory, but SHOW_CAM
is whether or not we want to show a preview. It could be useful to see one for debugging purposes, but you wouldn't necessarily want to show one all of the time, since it can be resource intensive to be doing all of this.
STEER_AMT
is how much we want to apply to steering. At the moment, it's a full turn. Later we might find it's better to steer a little less forcefully, maybe doing something cumulative instead... etc. For now, full steer!
The collision_hist
is going to be used because the collision sensor reports a history of incidents. Basically, if there's anything in this list, we're going to say we've crashed. Now for our init
method. All stuff you've seen in the previous tutorial:
def __init__(self): self.client = carla.Client('localhost', 2000) self.client.set_timeout(2.0) # Once we have a client we can retrieve the world that is currently # running. self.world = self.client.get_world() # The world contains the list blueprints that we can use for adding new # actors into the simulation. blueprint_library = self.world.get_blueprint_library() # Now let's filter all the blueprints of type 'vehicle' and choose one # at random. #print(blueprint_library.filter('vehicle')) self.model_3 = blueprint_library.filter('model3')[0]
Next, we're going to create our reset
method.
def reset(self): self.collision_hist = [] self.actor_list = [] self.transform = random.choice(self.world.get_map().get_spawn_points()) self.vehicle = self.world.spawn_actor(self.model_3, self.transform) self.actor_list.append(self.vehicle) self.rgb_cam = self.world.get_blueprint_library().find('sensor.camera.rgb') self.rgb_cam.set_attribute('image_size_x', f'{self.im_width}') self.rgb_cam.set_attribute('image_size_y', f'{self.im_height}') self.rgb_cam.set_attribute('fov', '110') transform = carla.Transform(carla.Location(x=2.5, z=0.7)) self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle) self.actor_list.append(self.sensor) self.sensor.listen(lambda data: self.process_img(data)) self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0)) # initially passing some commands seems to help with time. Not sure why.
Everything here you've seen before, nothing new or fancy here, just turned into OOP is all. Next, let's add our collision sensor to this method:
time.sleep(4) # sleep to get things started and to not detect a collision when the car spawns/falls from sky. colsensor = self.world.get_blueprint_library().find('sensor.other.collision') self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle) self.actor_list.append(self.colsensor) self.colsensor.listen(lambda event: self.collision_data(event))
We're leading with a sleep for four seconds here, because the car literally "drops" into the simulator. Often, when the car hits the ground, we get a collision registered. Also, initially, it can take some time for these sensors to initialize and return values, so we'll just go with a nice safe solid four seconds. Just in case it takes any longer, we can do the following:
while self.front_camera is None: time.sleep(0.01)
We cannot just do that, however, because we need to be certain the car is done falling from the sky on spawn. Finally, let's log the actual starting time for the episode, make sure brake and throttle aren't being used and return our first observation:
self.episode_start = time.time() self.vehicle.apply_control(carla.VehicleControl(brake=0.0, throttle=0.0)) return self.front_camera
Full code for our CarEnv
so far is:
class CarEnv: SHOW_CAM = SHOW_PREVIEW STEER_AMT = 1.0 im_width = IMG_WIDTH im_height = IMG_HEIGHT actor_list = [] front_camera = None collision_hist = [] def __init__(self): self.client = carla.Client('localhost', 2000) self.client.set_timeout(2.0) # Once we have a client we can retrieve the world that is currently # running. self.world = self.client.get_world() # The world contains the list blueprints that we can use for adding new # actors into the simulation. blueprint_library = self.world.get_blueprint_library() # Now let's filter all the blueprints of type 'vehicle' and choose one # at random. #print(blueprint_library.filter('vehicle')) self.model_3 = blueprint_library.filter('model3')[0] def reset(self): self.collision_hist = [] self.actor_list = [] self.transform = random.choice(self.world.get_map().get_spawn_points()) self.vehicle = self.world.spawn_actor(self.model_3, self.transform) self.actor_list.append(self.vehicle) self.rgb_cam = self.world.get_blueprint_library().find('sensor.camera.rgb') self.rgb_cam.set_attribute('image_size_x', f'{self.im_width}') self.rgb_cam.set_attribute('image_size_y', f'{self.im_height}') self.rgb_cam.set_attribute('fov', '110') transform = carla.Transform(carla.Location(x=2.5, z=0.7)) self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle) self.actor_list.append(self.sensor) self.sensor.listen(lambda data: self.process_img(data)) self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0)) time.sleep(4) # sleep to get things started and to not detect a collision when the car spawns/falls from sky. colsensor = self.world.get_blueprint_library().find('sensor.other.collision') self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle) self.actor_list.append(self.colsensor) self.colsensor.listen(lambda event: self.collision_data(event)) while self.front_camera is None: time.sleep(0.01) self.episode_start = time.time() self.vehicle.apply_control(carla.VehicleControl(brake=0.0, throttle=0.0)) return self.front_camera
Now, let's add our collision_data
and process_img
methods:
def collision_data(self, event): self.collision_hist.append(event) def process_img(self, image): i = np.array(image.raw_data) #np.save("iout.npy", i) i2 = i.reshape((self.im_height, self.im_width, 4)) i3 = i2[:, :, :3] if self.SHOW_CAM: cv2.imshow("",i3) cv2.waitKey(1) self.front_camera = i3
Now we need to do our step
method. This method takes an action, and then returns the observation, reward, done, any_extra_info
as per the usual reinforcement learning paradigm. To begin:
def step(self, action): ''' For now let's just pass steer left, center, right? 0, 1, 2 ''' if action == 0: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=0)) if action == 1: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT)) if action == 2: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT))
Above shows how we take an action based on what was passed as a numerical action to us, now we just need to handle for the observation, possible collision, and reward:
v = self.vehicle.get_velocity() kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2)) if len(self.collision_hist) != 0: done = True reward = -200 elif kmh < 50: done = False reward = -1 else: done = False reward = 1 if self.episode_start + SECONDS_PER_EPISODE < time.time(): done = True return self.front_camera, reward, done, None
We're grabbing the vehicle's speed, converting from velocity to KMH. I am doing this to avoid the agent learning to just drive in a tight circle. If we require a certain speed to get a reward, this should hopefully disincentivize this.
Next, we check to see if we've exhausted our episode time, then we return everything. With that, we're done with our environment!
Full code up to this point:
import glob import os import sys try: sys.path.append(glob.glob('../carla/dist/carla-*%d.%d-%s.egg' % ( sys.version_info.major, sys.version_info.minor, 'win-amd64' if os.name == 'nt' else 'linux-x86_64'))[0]) except IndexError: pass import carla SHOW_PREVIEW = False class CarEnv: SHOW_CAM = SHOW_PREVIEW STEER_AMT = 1.0 im_width = IMG_WIDTH im_height = IMG_HEIGHT actor_list = [] front_camera = None collision_hist = [] def __init__(self): self.client = carla.Client('localhost', 2000) self.client.set_timeout(2.0) # Once we have a client we can retrieve the world that is currently # running. self.world = self.client.get_world() # The world contains the list blueprints that we can use for adding new # actors into the simulation. blueprint_library = self.world.get_blueprint_library() # Now let's filter all the blueprints of type 'vehicle' and choose one # at random. #print(blueprint_library.filter('vehicle')) self.model_3 = blueprint_library.filter('model3')[0] def reset(self): self.collision_hist = [] self.actor_list = [] self.transform = random.choice(self.world.get_map().get_spawn_points()) self.vehicle = self.world.spawn_actor(self.model_3, self.transform) self.actor_list.append(self.vehicle) self.rgb_cam = self.world.get_blueprint_library().find('sensor.camera.rgb') self.rgb_cam.set_attribute('image_size_x', f'{self.im_width}') self.rgb_cam.set_attribute('image_size_y', f'{self.im_height}') self.rgb_cam.set_attribute('fov', '110') transform = carla.Transform(carla.Location(x=2.5, z=0.7)) self.sensor = self.world.spawn_actor(self.rgb_cam, transform, attach_to=self.vehicle) self.actor_list.append(self.sensor) self.sensor.listen(lambda data: self.process_img(data)) self.vehicle.apply_control(carla.VehicleControl(throttle=0.0, brake=0.0)) time.sleep(4) # sleep to get things started and to not detect a collision when the car spawns/falls from sky. colsensor = self.world.get_blueprint_library().find('sensor.other.collision') self.colsensor = self.world.spawn_actor(colsensor, transform, attach_to=self.vehicle) self.actor_list.append(self.colsensor) self.colsensor.listen(lambda event: self.collision_data(event)) while self.front_camera is None: time.sleep(0.01) self.episode_start = time.time() self.vehicle.apply_control(carla.VehicleControl(brake=0.0, throttle=0.0)) return self.front_camera def collision_data(self, event): self.collision_hist.append(event) def process_img(self, image): i = np.array(image.raw_data) #np.save("iout.npy", i) i2 = i.reshape((self.im_height, self.im_width, 4)) i3 = i2[:, :, :3] if self.SHOW_CAM: cv2.imshow("",i3) cv2.waitKey(1) self.front_camera = i3 def step(self, action): ''' For now let's just pass steer left, center, right? 0, 1, 2 ''' if action == 0: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=0)) if action == 1: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=-1*self.STEER_AMT)) if action == 2: self.vehicle.apply_control(carla.VehicleControl(throttle=1.0, steer=1*self.STEER_AMT)) v = self.vehicle.get_velocity() kmh = int(3.6 * math.sqrt(v.x**2 + v.y**2 + v.z**2)) if len(self.collision_hist) != 0: done = True reward = -200 elif kmh < 50: done = False reward = -1 else: done = False reward = 1 if self.episode_start + SECONDS_PER_EPISODE < time.time(): done = True return self.front_camera, reward, done, None
In the next tutorial, we'll code our Agent
class, which will interact with this environment and house our actual reinforcement learning model.