Welcome to part 2 of the reinforcement learning tutorial series, specifically with Q-Learning. We've built our Q-Table which contains all of our possible discrete states. Next we need a way to update the Q-Values (value per possible action per unique state), which brought us to:
If you're like me, mathematic formulas like that make your head spin. Here's the formula in code:
new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)
That's a little more legible to me! The only things now we might not know where they are coming from are:
DISCOUNT
max_future_q
The DISCOUNT
is a measure of how much we want to care about FUTURE reward rather than immediate reward. Typically, this value will be fairly high, and is between 0 and 1. We want it high because the purpose of Q Learning is indeed to learn a chain of events that ends with a positive outcome, so it's only natural that we put greater importance on long terms gains rather than short term ones.
The max_future_q
is grabbed after we've performed our action already, and then we update our previous values based partially on the next-step's best Q value. Over time, once we've reached the objective once, this "reward" value gets slowly back-propagated, one step at a time, per episode. Super basic concept, but pretty neat how it works!
Okay, so now, we literally know everything we need to know to make this work. It's actually not much "algorithmic" code at all, we just need to write the surrounding logic.
We'll start with the following script:
import gym import numpy as np env = gym.make("MountainCar-v0") env.reset() DISCRETE_OS_SIZE = [20, 20] discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n])) done = False while not done: action = 2 new_state, reward, done, _ = env.step(action) print(reward, new_state) env.render() #new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)
Now, let's add some more constants:
# Q-Learning settings LEARNING_RATE = 0.1 DISCOUNT = 0.95 EPISODES = 25000
The LEARNING_RATE
is between 0 and 1, same for discount. The EPISODES
is how many iterations of the game we'd like to run.
Next, we need a quick helper-function that will convert our environment "state," which currently contains continuous values that would wind up making our Q-Table absolutely gigantic and take forever to learn.... to a "discrete" state instead:
def get_discrete_state(state): discrete_state = (state - env.observation_space.low)/discrete_os_win_size return tuple(discrete_state.astype(np.int))
Full code up to this point:
import gym import numpy as np env = gym.make("MountainCar-v0") env.reset() LEARNING_RATE = 0.1 DISCOUNT = 0.95 EPISODES = 25000 DISCRETE_OS_SIZE = [20, 20] discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n])) def get_discrete_state(state): discrete_state = (state - env.observation_space.low)/discrete_os_win_size return tuple(discrete_state.astype(np.int)) # we use this tuple to look up the 3 Q values for the available actions in the q-table done = False while not done: action = 2 new_state, reward, done, _ = env.step(action) print(reward, new_state) env.render() #new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)
Now, we're ready to actually step through this environment, first, let's move env.reset()
to right before the while loop and also grab the initial state values:
discrete_state = get_discrete_state(env.reset()) done = False while not done: ...
Next, we replace action = 2
with:
action = np.argmax(q_table[discrete_state])
Then, we want to grab the new discrete state:
new_discrete_state = get_discrete_state(new_state)
Now, we want to update the Q-value. Note that we're updating the Q value for the action that we *already made*
# If simulation did not end yet after last step - update Q table if not done: # Maximum possible Q value in next step (for new state) max_future_q = np.max(q_table[new_discrete_state]) # Current Q value (for current state and performed action) current_q = q_table[discrete_state + (action,)] # And here's our equation for a new Q value for current state and action new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q) # Update Q table with new Q value q_table[discrete_state + (action,)] = new_q
If the simulation ended, we want to check to see if it's because we reached the objective:
# Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly elif new_state[0] >= env.goal_position: #q_table[discrete_state + (action,)] = reward q_table[discrete_state + (action,)] = 0
Now, we need to reset the discrete_state
variable:
discrete_state = new_discrete_state
Finally, we'll end our code with an:
env.close()
Full code up to this point:
import gym import numpy as np env = gym.make("MountainCar-v0") LEARNING_RATE = 0.1 DISCOUNT = 0.95 EPISODES = 25000 DISCRETE_OS_SIZE = [20, 20] discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n])) def get_discrete_state(state): discrete_state = (state - env.observation_space.low)/discrete_os_win_size return tuple(discrete_state.astype(np.int)) # we use this tuple to look up the 3 Q values for the available actions in the q-table discrete_state = get_discrete_state(env.reset()) done = False while not done: action = np.argmax(q_table[discrete_state]) new_state, reward, done, _ = env.step(action) new_discrete_state = get_discrete_state(new_state) env.render() #new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q) # If simulation did not end yet after last step - update Q table if not done: # Maximum possible Q value in next step (for new state) max_future_q = np.max(q_table[new_discrete_state]) # Current Q value (for current state and performed action) current_q = q_table[discrete_state + (action,)] # And here's our equation for a new Q value for current state and action new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q) # Update Q table with new Q value q_table[discrete_state + (action,)] = new_q # Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly elif new_state[0] >= env.goal_position: #q_table[discrete_state + (action,)] = reward q_table[discrete_state + (action,)] = 0 discrete_state = new_discrete_state env.close()
Okay running that, you should see one episode play out. Nothing too special. We need a lot of episodes to train. So now, let's add the loop for the episodes:
for episode in range(EPISODES): discrete_state = get_discrete_state(env.reset()) done = False while not done: action = np.argmax(q_table[discrete_state]) new_state, reward, done, _ = env.step(action) ...
Then, we'd want to remove rendering the environment for now. We need to run many thousands of iterations, and rendering the environment will cause it to take far longer. We can check out the environment every n
steps, or once the model is learning something. For example, let's add the following constant:
SHOW_EVERY = 1000
Then, in our looping code:
for episode in range(EPISODES): discrete_state = get_discrete_state(env.reset()) done = False if episode % SHOW_EVERY == 0: render = True print(episode) else: render = False while not done: action = np.argmax(q_table[discrete_state]) new_state, reward, done, _ = env.step(action) new_discrete_state = get_discrete_state(new_state) if episode % SHOW_EVERY == 0: env.render()
Okay, great, but it doesn't appear that the model is learning. Why not?
So the Q-Table is initialized....randomly. Then, every step has the agent getting rewarded with a -1. The only time the agent gets rewarded with, well, nothing (a 0)... is if they reach the objective. We need the agent to reach the objective some time. If they just reach it once, they will be more likely to reach it again as the reward back propagates. As they get more likely to reach it, they'll reach it again and again...and boom, they will have learned. But, how do we get to reach it the first time?!
Epsilon!
Or, as regular people might call it: random moves.
As an Agent learns an environment, it moves from "exploration" to "exploitation." Right now, our model is greedy and exploiting for max Q values always...but these Q values are worthless right now. We need the agent to explore!
For this, we'll add the following values:
# Exploration settings epsilon = 1 # not a constant, qoing to be decayed START_EPSILON_DECAYING = 1 END_EPSILON_DECAYING = EPISODES//2 epsilon_decay_value = epsilon/(END_EPSILON_DECAYING - START_EPSILON_DECAYING)
Now, we want to decay the epsilon value every episode until we're done decaying it. We'll do this at the end of each episode (basically at the very bottom):
... # Decaying is being done every episode if episode number is within decaying range if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING: epsilon -= epsilon_decay_value env.close()
Now we just need to use epsilon. We'll use np.random.random()
to randomly pick a number 0 to 1. If np.random.random() is greater than the epsilon value, then we'll go based off the max q value as usual. Otherwise, we will just move randomly:
while not done: if np.random.random() > epsilon: # Get action from Q table action = np.argmax(q_table[discrete_state]) else: # Get random action action = np.random.randint(0, env.action_space.n)
Full code up to this point:
# objective is to get the cart to the flag. # for now, let's just move randomly: import gym import numpy as np env = gym.make("MountainCar-v0") LEARNING_RATE = 0.1 DISCOUNT = 0.95 EPISODES = 25000 SHOW_EVERY = 3000 DISCRETE_OS_SIZE = [20, 20] discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE # Exploration settings epsilon = 1 # not a constant, qoing to be decayed START_EPSILON_DECAYING = 1 END_EPSILON_DECAYING = EPISODES//2 epsilon_decay_value = epsilon/(END_EPSILON_DECAYING - START_EPSILON_DECAYING) q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n])) def get_discrete_state(state): discrete_state = (state - env.observation_space.low)/discrete_os_win_size return tuple(discrete_state.astype(np.int)) # we use this tuple to look up the 3 Q values for the available actions in the q-table for episode in range(EPISODES): discrete_state = get_discrete_state(env.reset()) done = False if episode % SHOW_EVERY == 0: render = True print(episode) else: render = False while not done: if np.random.random() > epsilon: # Get action from Q table action = np.argmax(q_table[discrete_state]) else: # Get random action action = np.random.randint(0, env.action_space.n) new_state, reward, done, _ = env.step(action) new_discrete_state = get_discrete_state(new_state) if episode % SHOW_EVERY == 0: env.render() #new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q) # If simulation did not end yet after last step - update Q table if not done: # Maximum possible Q value in next step (for new state) max_future_q = np.max(q_table[new_discrete_state]) # Current Q value (for current state and performed action) current_q = q_table[discrete_state + (action,)] # And here's our equation for a new Q value for current state and action new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q) # Update Q table with new Q value q_table[discrete_state + (action,)] = new_q # Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly elif new_state[0] >= env.goal_position: #q_table[discrete_state + (action,)] = reward q_table[discrete_state + (action,)] = 0 discrete_state = new_discrete_state # Decaying is being done every episode if episode number is within decaying range if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING: epsilon -= epsilon_decay_value env.close()