Improving Scouting - Python AI in StarCraft II p.14

Welcome to part 14 of the AI in StarCraft II series with Python. In this tutorial, we're going to be working on our scouting methods and logic.

The problem up to now is our scouts and logic were pretty silly. What I'd like to do now is the following:

1. Scout with a worker (probe), until we can make actual observer units.
2. Use multiple scouts
3. Place scouts along expansion locations

Another choice in the future will be to place scouts along the main attacking route the enemy might take to give ourselves an early warning. For now, I am going to leave this out.

To begin, we want to keep some track of where our current scouts are, so we know where to send new ones. Beginning with our __init__ method:

def __init__(self, use_model=False):
...
self.scouts_and_spots = {}

Now, we want to completely re-write our scout method.

The way I plan to logically scout is that we want to choose scouting locations in order of their distance from the enemy's starting location. So the starting location is the first place we want to send a scout. Then, with our next scout, we want to check out the expansion that is next-closest to this point, as this is highly likely where they went. Let's begin!

async def scout(self):
# {DISTANCE_TO_ENEMY_START:EXPANSIONLOC}
self.expand_dis_dir = {}

We will begin by logging the expansion distances with a dictionary, where the key is the distance and the value is the expansion. We can populate this with:

for el in self.expansion_locations:
distance_to_enemy_start = el.distance_to(self.enemy_start_locations)
#print(distance_to_enemy_start)
self.expand_dis_dir[distance_to_enemy_start] = el

Now sort the distances:

self.ordered_exp_distances = sorted(k for k in self.expand_dis_dir)

Next, what we want to do is assign scouts to certain expansion areas. Unfortunately, our scouts get discovered and killed from time to time. Thus, we need to cosntantly be checking to see if those scouts still exist. This way, we wont just scout a place once, and instead we will replace scouts in that area when they are killed:

existing_ids = [unit.tag for unit in self.units]
# removing of scouts that are actually dead now.
to_be_removed = []
for noted_scout in self.scouts_and_spots:
if noted_scout not in existing_ids:
to_be_removed.append(noted_scout)

for scout in to_be_removed:
del self.scouts_and_spots[scout]

Okay, great, now we're ready to begin assignments! Before this, let's add in one final check:

if len(self.units(ROBOTICSFACILITY).ready) == 0:
unit_type = PROBE
unit_limit = 1
else:
unit_type = OBSERVER
unit_limit = 15

We want to do this because we have to treat our probes slightly different. If we send a probe out as a scout, we need to keep it moving, otherwise it will be idle and charged with finding minerals again, which we obviously don't want!

Also, we want ALL of our observers...observing. We only want ONE probe scouting.

assign_scout = True

if unit_type == PROBE:
for unit in self.units(PROBE):
if unit.tag in self.scouts_and_spots:
assign_scout = False

If we have a scout to assign, let's do it!

if assign_scout:
if len(self.units(unit_type).idle) > 0:
for obs in self.units(unit_type).idle[:unit_limit]:
if obs.tag not in self.scouts_and_spots:
for dist in self.ordered_exp_distances:
try:
location = next(value for key, value in self.expand_dis_dir.items() if key == dist)
# DICT {UNIT_ID:LOCATION}
active_locations = [self.scouts_and_spots[k] for k in self.scouts_and_spots]

if location not in active_locations:
if unit_type == PROBE:
for unit in self.units(PROBE):
if unit.tag in self.scouts_and_spots:
continue

await self.do(obs.move(location))
self.scouts_and_spots[obs.tag] = location
break
except Exception as e:
pass

So, if we want to assign a scout, we need at least one of our unit types to be idle. Then, we want to iterate over the limit of this type of unit. Then, we want to check to see if this unit's tag is already in our self.scouts_and_spots dict that tracks active scouts and what they're up to. Finally, we iterate over the distances from closest to furthest from the enemy's start location.

For each distance, we look up that location, then we check to see if that location is currently one of the spots we're actively scouting. If that location isn't already there, then we want to send a scout there. Once we've done that, we're done, so we continue or break.

Lastly, we need to keep those probes moving so they don't get assigned to minerals:

for obs in self.units(unit_type):
if obs.tag in self.scouts_and_spots:
if obs in [probe for probe in self.units(PROBE)]:
await self.do(obs.move(self.random_location_variance(self.scouts_and_spots[obs.tag])))

The above uses random_location_variance, which is:

def random_location_variance(self, location):
x = location
y = location

#  FIXED THIS
x += random.randrange(-5,5)
y += random.randrange(-5,5)

if x < 0:
print("x below")
x = 0
if y < 0:
print("y below")
y = 0
if x > self.game_info.map_size:
print("x above")
x = self.game_info.map_size
if y > self.game_info.map_size:
print("y above")
y = self.game_info.map_size

go_to = position.Point2(position.Pointlike((x,y)))

return go_to

Very similar to before, only this time we just hard code the random movement of 5.

While we're here, let's change the intel method's main_base_names var:

main_base_names = ['nexus', 'commandcenter', 'orbitalcommand', 'planetaryfortress', 'hatchery']

Finally, let's also add a final method: build_scout:

async def build_scout(self):
if len(self.units(OBSERVER)) < math.floor(self.time/3):
for rf in self.units(ROBOTICSFACILITY).ready.noqueue:
print(len(self.units(OBSERVER)), self.time/3)
if self.can_afford(OBSERVER) and self.supply_left > 0:
await self.do(rf.train(OBSERVER))

Basically, we just aim to build and have an observer unit active for every 3 minutes of game that has elapsed. So, 9 minutes in, we should have 3 observers.

Our current on_step method:

async def on_step(self, iteration):

self.time = (self.state.game_loop/22.4) / 60
print('Time:',self.time)
await self.build_scout()
await self.scout()
await self.distribute_workers()
await self.build_workers()
await self.build_pylons()
await self.build_assimilators()
await self.expand()
await self.offensive_force_buildings()
await self.build_offensive_force()
await self.intel()
await self.attack()

Full code up to this point:

import sc2
from sc2 import run_game, maps, Race, Difficulty, Result
from sc2.player import Bot, Computer
from sc2 import position
from sc2.constants import NEXUS, PROBE, PYLON, ASSIMILATOR, GATEWAY, \
CYBERNETICSCORE, STARGATE, VOIDRAY, SCV, DRONE, ROBOTICSFACILITY, OBSERVER
import random
import cv2
import numpy as np
import os
import time
import math
#import keras

#os.environ["SC2PATH"] = '/starcraftstuff/StarCraftII/'

class SentdeBot(sc2.BotAI):
def __init__(self, use_model=False):
self.MAX_WORKERS = 50
self.do_something_after = 0
self.use_model = use_model

###############################
# DICT {UNIT_ID:LOCATION}
# every iteration, make sure that unit id still exists!
self.scouts_and_spots = {}
self.train_data = []
if self.use_model:
print("USING MODEL!")

def on_end(self, game_result):
print('--- on_end called ---')
print(game_result, self.use_model)

with open("gameout-random-vs-medium.txt","a") as f:
if self.use_model:
f.write("Model {}\n".format(game_result))
else:
f.write("Random {}\n".format(game_result))

async def on_step(self, iteration):

self.time = (self.state.game_loop/22.4) / 60
print('Time:',self.time)
await self.build_scout()
await self.scout()
await self.distribute_workers()
await self.build_workers()
await self.build_pylons()
await self.build_assimilators()
await self.expand()
await self.offensive_force_buildings()
await self.build_offensive_force()
await self.intel()
await self.attack()

def random_location_variance(self, location):
x = location
y = location

#  FIXED THIS
x += random.randrange(-5,5)
y += random.randrange(-5,5)

if x < 0:
print("x below")
x = 0
if y < 0:
print("y below")
y = 0
if x > self.game_info.map_size:
print("x above")
x = self.game_info.map_size
if y > self.game_info.map_size:
print("y above")
y = self.game_info.map_size

go_to = position.Point2(position.Pointlike((x,y)))

return go_to

async def build_scout(self):
if len(self.units(OBSERVER)) < math.floor(self.time/3):
for rf in self.units(ROBOTICSFACILITY).ready.noqueue:
print(len(self.units(OBSERVER)), self.time/3)
if self.can_afford(OBSERVER) and self.supply_left > 0:
await self.do(rf.train(OBSERVER))

async def scout(self):

# {DISTANCE_TO_ENEMY_START:EXPANSIONLOC}
self.expand_dis_dir = {}

for el in self.expansion_locations:
distance_to_enemy_start = el.distance_to(self.enemy_start_locations)
#print(distance_to_enemy_start)
self.expand_dis_dir[distance_to_enemy_start] = el

self.ordered_exp_distances = sorted(k for k in self.expand_dis_dir)

existing_ids = [unit.tag for unit in self.units]
# removing of scouts that are actually dead now.
to_be_removed = []
for noted_scout in self.scouts_and_spots:
if noted_scout not in existing_ids:
to_be_removed.append(noted_scout)

for scout in to_be_removed:
del self.scouts_and_spots[scout]
# end removing of scouts that are dead now.

if len(self.units(ROBOTICSFACILITY).ready) == 0:
unit_type = PROBE
unit_limit = 1
else:
unit_type = OBSERVER
unit_limit = 15

assign_scout = True

if unit_type == PROBE:
for unit in self.units(PROBE):
if unit.tag in self.scouts_and_spots:
assign_scout = False

if assign_scout:
if len(self.units(unit_type).idle) > 0:
for obs in self.units(unit_type).idle[:unit_limit]:
if obs.tag not in self.scouts_and_spots:
for dist in self.ordered_exp_distances:
try:
location = next(value for key, value in self.expand_dis_dir.items() if key == dist)
# DICT {UNIT_ID:LOCATION}
active_locations = [self.scouts_and_spots[k] for k in self.scouts_and_spots]

if location not in active_locations:
if unit_type == PROBE:
for unit in self.units(PROBE):
if unit.tag in self.scouts_and_spots:
continue

await self.do(obs.move(location))
self.scouts_and_spots[obs.tag] = location
break
except Exception as e:
pass

for obs in self.units(unit_type):
if obs.tag in self.scouts_and_spots:
if obs in [probe for probe in self.units(PROBE)]:
await self.do(obs.move(self.random_location_variance(self.scouts_and_spots[obs.tag])))

async def intel(self):

game_data = np.zeros((self.game_info.map_size, self.game_info.map_size, 3), np.uint8)

draw_dict = {
NEXUS: [15, (0, 255, 0)],
PYLON: [3, (20, 235, 0)],
PROBE: [1, (55, 200, 0)],
ASSIMILATOR: [2, (55, 200, 0)],
GATEWAY: [3, (200, 100, 0)],
CYBERNETICSCORE: [3, (150, 150, 0)],
STARGATE: [5, (255, 0, 0)],
ROBOTICSFACILITY: [5, (215, 155, 0)],
#VOIDRAY: [3, (255, 100, 0)],
}

for unit_type in draw_dict:
for unit in self.units(unit_type).ready:
pos = unit.position
cv2.circle(game_data, (int(pos), int(pos)), draw_dict[unit_type], draw_dict[unit_type], -1)

# from Александр Тимофеев via YT
main_base_names = ['nexus', 'commandcenter', 'orbitalcommand', 'planetaryfortress', 'hatchery']
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
if enemy_building.name.lower() not in main_base_names:
cv2.circle(game_data, (int(pos), int(pos)), 5, (200, 50, 212), -1)
for enemy_building in self.known_enemy_structures:
pos = enemy_building.position
if enemy_building.name.lower() in main_base_names:
cv2.circle(game_data, (int(pos), int(pos)), 15, (0, 0, 255), -1)

for enemy_unit in self.known_enemy_units:

if not enemy_unit.is_structure:
worker_names = ["probe",
"scv",
"drone"]
# if that unit is a PROBE, SCV, or DRONE... it's a worker
pos = enemy_unit.position
if enemy_unit.name.lower() in worker_names:
cv2.circle(game_data, (int(pos), int(pos)), 1, (55, 0, 155), -1)
else:
cv2.circle(game_data, (int(pos), int(pos)), 3, (50, 0, 215), -1)

for obs in self.units(OBSERVER).ready:
pos = obs.position
cv2.circle(game_data, (int(pos), int(pos)), 1, (255, 255, 255), -1)

for vr in self.units(VOIDRAY).ready:
pos = vr.position
cv2.circle(game_data, (int(pos), int(pos)), 3, (255, 100, 0), -1)

line_max = 50
mineral_ratio = self.minerals / 1500
if mineral_ratio > 1.0:
mineral_ratio = 1.0

vespene_ratio = self.vespene / 1500
if vespene_ratio > 1.0:
vespene_ratio = 1.0

population_ratio = self.supply_left / self.supply_cap
if population_ratio > 1.0:
population_ratio = 1.0

plausible_supply = self.supply_cap / 200.0

military_weight = len(self.units(VOIDRAY)) / (self.supply_cap-self.supply_left)
if military_weight > 1.0:
military_weight = 1.0

cv2.line(game_data, (0, 19), (int(line_max*military_weight), 19), (250, 250, 200), 3)  # worker/supply ratio
cv2.line(game_data, (0, 15), (int(line_max*plausible_supply), 15), (220, 200, 200), 3)  # plausible supply (supply/200.0)
cv2.line(game_data, (0, 11), (int(line_max*population_ratio), 11), (150, 150, 150), 3)  # population ratio (supply_left/supply)
cv2.line(game_data, (0, 7), (int(line_max*vespene_ratio), 7), (210, 200, 0), 3)  # gas / 1500
cv2.line(game_data, (0, 3), (int(line_max*mineral_ratio), 3), (0, 255, 25), 3)  # minerals minerals/1500

# flip horizontally to make our final fix in visual representation:
self.flipped = cv2.flip(game_data, 0)
resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2)

if self.use_model:
cv2.imshow('Model Intel', resized)
cv2.waitKey(1)
else:
cv2.imshow('Random Intel', resized)
cv2.waitKey(1)

async def build_workers(self):
if (len(self.units(NEXUS)) * 16) > len(self.units(PROBE)) and len(self.units(PROBE)) < self.MAX_WORKERS:
for nexus in self.units(NEXUS).ready.noqueue:
if self.can_afford(PROBE):
await self.do(nexus.train(PROBE))

async def build_pylons(self):
if self.supply_left < 5 and not self.already_pending(PYLON):
if nexuses.exists:
if self.can_afford(PYLON):
await self.build(PYLON, near=nexuses.first)

async def build_assimilators(self):
for nexus in self.units(NEXUS).ready:
vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus)
for vaspene in vaspenes:
if not self.can_afford(ASSIMILATOR):
break
worker = self.select_build_worker(vaspene.position)
if worker is None:
break
if not self.units(ASSIMILATOR).closer_than(1.0, vaspene).exists:
await self.do(worker.build(ASSIMILATOR, vaspene))

async def expand(self):
try:
if self.units(NEXUS).amount < self.time/2 and self.can_afford(NEXUS):
await self.expand_now()
except Exception as e:
print(str(e))

async def offensive_force_buildings(self):

if self.units(GATEWAY).ready.exists and not self.units(CYBERNETICSCORE):
if self.can_afford(CYBERNETICSCORE) and not self.already_pending(CYBERNETICSCORE):
await self.build(CYBERNETICSCORE, near=pylon)

elif len(self.units(GATEWAY)) < 1:
if self.can_afford(GATEWAY) and not self.already_pending(GATEWAY):
await self.build(GATEWAY, near=pylon)

if len(self.units(ROBOTICSFACILITY)) < 1:
if self.can_afford(ROBOTICSFACILITY) and not self.already_pending(ROBOTICSFACILITY):
await self.build(ROBOTICSFACILITY, near=pylon)

if len(self.units(STARGATE)) < self.time:
if self.can_afford(STARGATE) and not self.already_pending(STARGATE):
await self.build(STARGATE, near=pylon)

async def build_offensive_force(self):
for sg in self.units(STARGATE).ready.noqueue:
if self.can_afford(VOIDRAY) and self.supply_left > 0:
await self.do(sg.train(VOIDRAY))

def find_target(self, state):
if len(self.known_enemy_units) > 0:
return random.choice(self.known_enemy_units)
elif len(self.known_enemy_structures) > 0:
return random.choice(self.known_enemy_structures)
else:
return self.enemy_start_locations

async def attack(self):

if len(self.units(VOIDRAY).idle) > 0:

target = False
if self.time > self.do_something_after:
if self.use_model:
prediction = self.model.predict([self.flipped.reshape([-1, 176, 200, 3])])
choice = np.argmax(prediction)
else:
choice = random.randrange(0, 4)

if choice == 0:
# no attack
wait = random.randrange(7, 100)/100
self.do_something_after = self.time + wait

elif choice == 1:
#attack_unit_closest_nexus
if len(self.known_enemy_units) > 0:
target = self.known_enemy_units.closest_to(random.choice(self.units(NEXUS)))

elif choice == 2:
#attack enemy structures
if len(self.known_enemy_structures) > 0:
target = random.choice(self.known_enemy_structures)

elif choice == 3:
#attack_enemy_start
target = self.enemy_start_locations

if target:
for vr in self.units(VOIDRAY).idle:
await self.do(vr.attack(target))

y = np.zeros(4)
y[choice] = 1
self.train_data.append([y, self.flipped])

run_game(maps.get("AbyssalReefLE"), [
Bot(Race.Protoss, SentdeBot(use_model=False)),
Computer(Race.Protoss, Difficulty.Medium),
], realtime=False)

The next tutorial: • Introduction and Collecting Minerals - Python AI in StarCraft II p.1

• Workers and Pylons - Python AI in StarCraft II p.2

• Geysers and Expanding - Python AI in StarCraft II p.3

• Building an AI Army - Python AI in StarCraft II p.4

• Commanding your AI Army - Python AI in StarCraft II p.5

• Defeating Hard AI - Python AI in StarCraft II p.6

• Deep Learning with SC2 Intro - Python AI in StarCraft II p.7

• Scouting and more Visual inputs - Python AI in StarCraft II p.8

• Building our training data - Python AI in StarCraft II p.9

• Building Neural Network Model - Python AI in StarCraft II p.10

• Training Neural Network Model - Python AI in StarCraft II p.11

• Using Neural Network Model - Python AI in StarCraft II p.12

• Version 2 Changes - Python AI in StarCraft II p.13

• Improving Scouting - Python AI in StarCraft II p.14
• Adding Choices - Python AI in StarCraft II p.15

• Visualization Changes - Python AI in StarCraft II p.16

• More Training and Findings - Python AI in StarCraft II p.17