Q Algorithm and Agent (Q-Learning) - Reinforcement Learning w/ Python Tutorial p.2




Welcome to part 2 of the reinforcement learning tutorial series, specifically with Q-Learning. We've built our Q-Table which contains all of our possible discrete states. Next we need a way to update the Q-Values (value per possible action per unique state), which brought us to:

python tutorials

If you're like me, mathematic formulas like that make your head spin. Here's the formula in code:

new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

That's a little more legible to me! The only things now we might not know where they are coming from are:

DISCOUNT

and

max_future_q

The DISCOUNT is a measure of how much we want to care about FUTURE reward rather than immediate reward. Typically, this value will be fairly high, and is between 0 and 1. We want it high because the purpose of Q Learning is indeed to learn a chain of events that ends with a positive outcome, so it's only natural that we put greater importance on long terms gains rather than short term ones.

The max_future_q is grabbed after we've performed our action already, and then we update our previous values based partially on the next-step's best Q value. Over time, once we've reached the objective once, this "reward" value gets slowly back-propagated, one step at a time, per episode. Super basic concept, but pretty neat how it works!

Okay, so now, we literally know everything we need to know to make this work. It's actually not much "algorithmic" code at all, we just need to write the surrounding logic.

We'll start with the following script:

import gym
import numpy as np

env = gym.make("MountainCar-v0")
env.reset()

DISCRETE_OS_SIZE = [20, 20]
discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE

q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n]))

done = False

while not done:
    action = 2
    new_state, reward, done, _ = env.step(action)
    print(reward, new_state)
    env.render()
    #new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

Now, let's add some more constants:

# Q-Learning settings
LEARNING_RATE = 0.1
DISCOUNT = 0.95
EPISODES = 25000

The LEARNING_RATE is between 0 and 1, same for discount. The EPISODES is how many iterations of the game we'd like to run.

Next, we need a quick helper-function that will convert our environment "state," which currently contains continuous values that would wind up making our Q-Table absolutely gigantic and take forever to learn.... to a "discrete" state instead:

def get_discrete_state(state):
    discrete_state = (state - env.observation_space.low)/discrete_os_win_size
    return tuple(discrete_state.astype(np.int))

Full code up to this point:

import gym
import numpy as np

env = gym.make("MountainCar-v0")
env.reset()

LEARNING_RATE = 0.1

DISCOUNT = 0.95
EPISODES = 25000

DISCRETE_OS_SIZE = [20, 20]
discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE


q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n]))


def get_discrete_state(state):
    discrete_state = (state - env.observation_space.low)/discrete_os_win_size
    return tuple(discrete_state.astype(np.int))  # we use this tuple to look up the 3 Q values for the available actions in the q-table


done = False
while not done:
    action = 2
    new_state, reward, done, _ = env.step(action)
    print(reward, new_state)
    env.render()
    #new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

Now, we're ready to actually step through this environment, first, let's move env.reset() to right before the while loop and also grab the initial state values:

discrete_state = get_discrete_state(env.reset())
done = False
while not done:
    ...

Next, we replace action = 2 with:

action = np.argmax(q_table[discrete_state])

Then, we want to grab the new discrete state:

new_discrete_state = get_discrete_state(new_state)

Now, we want to update the Q-value. Note that we're updating the Q value for the action that we *already made*

    # If simulation did not end yet after last step - update Q table
    if not done:

        # Maximum possible Q value in next step (for new state)
        max_future_q = np.max(q_table[new_discrete_state])

        # Current Q value (for current state and performed action)
        current_q = q_table[discrete_state + (action,)]

        # And here's our equation for a new Q value for current state and action
        new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

        # Update Q table with new Q value
        q_table[discrete_state + (action,)] = new_q

If the simulation ended, we want to check to see if it's because we reached the objective:

        # Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly
        elif new_state[0] >= env.goal_position:
            #q_table[discrete_state + (action,)] = reward
            q_table[discrete_state + (action,)] = 0

Now, we need to reset the discrete_state variable:

        discrete_state = new_discrete_state

Finally, we'll end our code with an:

env.close()

Full code up to this point:

import gym
import numpy as np

env = gym.make("MountainCar-v0")

LEARNING_RATE = 0.1

DISCOUNT = 0.95
EPISODES = 25000

DISCRETE_OS_SIZE = [20, 20]
discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE

q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n]))


def get_discrete_state(state):
    discrete_state = (state - env.observation_space.low)/discrete_os_win_size
    return tuple(discrete_state.astype(np.int))  # we use this tuple to look up the 3 Q values for the available actions in the q-table


discrete_state = get_discrete_state(env.reset())
done = False
while not done:

    action = np.argmax(q_table[discrete_state])
    new_state, reward, done, _ = env.step(action)

    new_discrete_state = get_discrete_state(new_state)

    env.render()
    #new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

    # If simulation did not end yet after last step - update Q table
    if not done:

        # Maximum possible Q value in next step (for new state)
        max_future_q = np.max(q_table[new_discrete_state])

        # Current Q value (for current state and performed action)
        current_q = q_table[discrete_state + (action,)]

        # And here's our equation for a new Q value for current state and action
        new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

        # Update Q table with new Q value
        q_table[discrete_state + (action,)] = new_q


    # Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly
    elif new_state[0] >= env.goal_position:
        #q_table[discrete_state + (action,)] = reward
        q_table[discrete_state + (action,)] = 0

    discrete_state = new_discrete_state


env.close()

Okay running that, you should see one episode play out. Nothing too special. We need a lot of episodes to train. So now, let's add the loop for the episodes:

for episode in range(EPISODES):
    discrete_state = get_discrete_state(env.reset())
    done = False

    while not done:

        action = np.argmax(q_table[discrete_state])
        new_state, reward, done, _ = env.step(action)
        ...

Then, we'd want to remove rendering the environment for now. We need to run many thousands of iterations, and rendering the environment will cause it to take far longer. We can check out the environment every n steps, or once the model is learning something. For example, let's add the following constant:

SHOW_EVERY = 1000

Then, in our looping code:

for episode in range(EPISODES):
    discrete_state = get_discrete_state(env.reset())
    done = False

    if episode % SHOW_EVERY == 0:
        render = True
        print(episode)
    else:
        render = False

    while not done:

        action = np.argmax(q_table[discrete_state])
        new_state, reward, done, _ = env.step(action)

        new_discrete_state = get_discrete_state(new_state)

        if episode % SHOW_EVERY == 0:
            env.render()

Okay, great, but it doesn't appear that the model is learning. Why not?

So the Q-Table is initialized....randomly. Then, every step has the agent getting rewarded with a -1. The only time the agent gets rewarded with, well, nothing (a 0)... is if they reach the objective. We need the agent to reach the objective some time. If they just reach it once, they will be more likely to reach it again as the reward back propagates. As they get more likely to reach it, they'll reach it again and again...and boom, they will have learned. But, how do we get to reach it the first time?!

Epsilon!

Or, as regular people might call it: random moves.

As an Agent learns an environment, it moves from "exploration" to "exploitation." Right now, our model is greedy and exploiting for max Q values always...but these Q values are worthless right now. We need the agent to explore!

For this, we'll add the following values:

# Exploration settings
epsilon = 1  # not a constant, qoing to be decayed
START_EPSILON_DECAYING = 1
END_EPSILON_DECAYING = EPISODES//2
epsilon_decay_value = epsilon/(END_EPSILON_DECAYING - START_EPSILON_DECAYING)

Now, we want to decay the epsilon value every episode until we're done decaying it. We'll do this at the end of each episode (basically at the very bottom):

    ...
    # Decaying is being done every episode if episode number is within decaying range
    if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING:
        epsilon -= epsilon_decay_value


env.close()

Now we just need to use epsilon. We'll use np.random.random() to randomly pick a number 0 to 1. If np.random.random() is greater than the epsilon value, then we'll go based off the max q value as usual. Otherwise, we will just move randomly:

    while not done:

        if np.random.random() > epsilon:
            # Get action from Q table
            action = np.argmax(q_table[discrete_state])
        else:
            # Get random action
            action = np.random.randint(0, env.action_space.n)

Full code up to this point:

# objective is to get the cart to the flag.
# for now, let's just move randomly:

import gym
import numpy as np

env = gym.make("MountainCar-v0")

LEARNING_RATE = 0.1

DISCOUNT = 0.95
EPISODES = 25000
SHOW_EVERY = 3000

DISCRETE_OS_SIZE = [20, 20]
discrete_os_win_size = (env.observation_space.high - env.observation_space.low)/DISCRETE_OS_SIZE

# Exploration settings
epsilon = 1  # not a constant, qoing to be decayed
START_EPSILON_DECAYING = 1
END_EPSILON_DECAYING = EPISODES//2
epsilon_decay_value = epsilon/(END_EPSILON_DECAYING - START_EPSILON_DECAYING)


q_table = np.random.uniform(low=-2, high=0, size=(DISCRETE_OS_SIZE + [env.action_space.n]))


def get_discrete_state(state):
    discrete_state = (state - env.observation_space.low)/discrete_os_win_size
    return tuple(discrete_state.astype(np.int))  # we use this tuple to look up the 3 Q values for the available actions in the q-table


for episode in range(EPISODES):
    discrete_state = get_discrete_state(env.reset())
    done = False

    if episode % SHOW_EVERY == 0:
        render = True
        print(episode)
    else:
        render = False

    while not done:

        if np.random.random() > epsilon:
            # Get action from Q table
            action = np.argmax(q_table[discrete_state])
        else:
            # Get random action
            action = np.random.randint(0, env.action_space.n)


        new_state, reward, done, _ = env.step(action)

        new_discrete_state = get_discrete_state(new_state)

        if episode % SHOW_EVERY == 0:
            env.render()
        #new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

        # If simulation did not end yet after last step - update Q table
        if not done:

            # Maximum possible Q value in next step (for new state)
            max_future_q = np.max(q_table[new_discrete_state])

            # Current Q value (for current state and performed action)
            current_q = q_table[discrete_state + (action,)]

            # And here's our equation for a new Q value for current state and action
            new_q = (1 - LEARNING_RATE) * current_q + LEARNING_RATE * (reward + DISCOUNT * max_future_q)

            # Update Q table with new Q value
            q_table[discrete_state + (action,)] = new_q


        # Simulation ended (for any reson) - if goal position is achived - update Q value with reward directly
        elif new_state[0] >= env.goal_position:
            #q_table[discrete_state + (action,)] = reward
            q_table[discrete_state + (action,)] = 0

        discrete_state = new_discrete_state

    # Decaying is being done every episode if episode number is within decaying range
    if END_EPSILON_DECAYING >= episode >= START_EPSILON_DECAYING:
        epsilon -= epsilon_decay_value


env.close()

The next tutorial:





  • Q-Learning introduction and Q Table - Reinforcement Learning w/ Python Tutorial p.1
  • Q Algorithm and Agent (Q-Learning) - Reinforcement Learning w/ Python Tutorial p.2
  • Q-Learning Analysis - Reinforcement Learning w/ Python Tutorial p.3
  • Q-Learning In Our Own Custom Environment - Reinforcement Learning w/ Python Tutorial p.4
  • Deep Q Learning and Deep Q Networks (DQN) Intro and Agent - Reinforcement Learning w/ Python Tutorial p.5
  • Training Deep Q Learning and Deep Q Networks (DQN) Intro and Agent - Reinforcement Learning w/ Python Tutorial p.6