Welcome to part 14 of the AI in StarCraft II series with Python. In this tutorial, we're going to be working on our scouting methods and logic.
The problem up to now is our scouts and logic were pretty silly. What I'd like to do now is the following:
Another choice in the future will be to place scouts along the main attacking route the enemy might take to give ourselves an early warning. For now, I am going to leave this out.
To begin, we want to keep some track of where our current scouts are, so we know where to send new ones. Beginning with our __init__
method:
def __init__(self, use_model=False): ... self.scouts_and_spots = {}
Now, we want to completely re-write our scout
method.
The way I plan to logically scout is that we want to choose scouting locations in order of their distance from the enemy's starting location. So the starting location is the first place we want to send a scout. Then, with our next scout, we want to check out the expansion that is next-closest to this point, as this is highly likely where they went. Let's begin!
async def scout(self): # {DISTANCE_TO_ENEMY_START:EXPANSIONLOC} self.expand_dis_dir = {}
We will begin by logging the expansion distances with a dictionary, where the key is the distance and the value is the expansion. We can populate this with:
for el in self.expansion_locations: distance_to_enemy_start = el.distance_to(self.enemy_start_locations[0]) #print(distance_to_enemy_start) self.expand_dis_dir[distance_to_enemy_start] = el
Now sort the distances:
self.ordered_exp_distances = sorted(k for k in self.expand_dis_dir)
Next, what we want to do is assign scouts to certain expansion areas. Unfortunately, our scouts get discovered and killed from time to time. Thus, we need to cosntantly be checking to see if those scouts still exist. This way, we wont just scout a place once, and instead we will replace scouts in that area when they are killed:
existing_ids = [unit.tag for unit in self.units] # removing of scouts that are actually dead now. to_be_removed = [] for noted_scout in self.scouts_and_spots: if noted_scout not in existing_ids: to_be_removed.append(noted_scout) for scout in to_be_removed: del self.scouts_and_spots[scout]
Okay, great, now we're ready to begin assignments! Before this, let's add in one final check:
if len(self.units(ROBOTICSFACILITY).ready) == 0: unit_type = PROBE unit_limit = 1 else: unit_type = OBSERVER unit_limit = 15
We want to do this because we have to treat our probes slightly different. If we send a probe out as a scout, we need to keep it moving, otherwise it will be idle and charged with finding minerals again, which we obviously don't want!
Also, we want ALL of our observers...observing. We only want ONE probe scouting.
assign_scout = True if unit_type == PROBE: for unit in self.units(PROBE): if unit.tag in self.scouts_and_spots: assign_scout = False
If we have a scout to assign, let's do it!
if assign_scout: if len(self.units(unit_type).idle) > 0: for obs in self.units(unit_type).idle[:unit_limit]: if obs.tag not in self.scouts_and_spots: for dist in self.ordered_exp_distances: try: location = next(value for key, value in self.expand_dis_dir.items() if key == dist) # DICT {UNIT_ID:LOCATION} active_locations = [self.scouts_and_spots[k] for k in self.scouts_and_spots] if location not in active_locations: if unit_type == PROBE: for unit in self.units(PROBE): if unit.tag in self.scouts_and_spots: continue await self.do(obs.move(location)) self.scouts_and_spots[obs.tag] = location break except Exception as e: pass
So, if we want to assign a scout, we need at least one of our unit types to be idle. Then, we want to iterate over the limit of this type of unit. Then, we want to check to see if this unit's tag is already in our self.scouts_and_spots
dict that tracks active scouts and what they're up to. Finally, we iterate over the distances from closest to furthest from the enemy's start location.
For each distance, we look up that location, then we check to see if that location is currently one of the spots we're actively scouting. If that location isn't already there, then we want to send a scout there. Once we've done that, we're done, so we continue or break.
Lastly, we need to keep those probes moving so they don't get assigned to minerals:
for obs in self.units(unit_type): if obs.tag in self.scouts_and_spots: if obs in [probe for probe in self.units(PROBE)]: await self.do(obs.move(self.random_location_variance(self.scouts_and_spots[obs.tag])))
The above uses random_location_variance
, which is:
def random_location_variance(self, location): x = location[0] y = location[1] # FIXED THIS x += random.randrange(-5,5) y += random.randrange(-5,5) if x < 0: print("x below") x = 0 if y < 0: print("y below") y = 0 if x > self.game_info.map_size[0]: print("x above") x = self.game_info.map_size[0] if y > self.game_info.map_size[1]: print("y above") y = self.game_info.map_size[1] go_to = position.Point2(position.Pointlike((x,y))) return go_to
Very similar to before, only this time we just hard code the random movement of 5.
While we're here, let's change the intel
method's main_base_names
var:
main_base_names = ['nexus', 'commandcenter', 'orbitalcommand', 'planetaryfortress', 'hatchery']
Finally, let's also add a final method: build_scout
:
async def build_scout(self): if len(self.units(OBSERVER)) < math.floor(self.time/3): for rf in self.units(ROBOTICSFACILITY).ready.noqueue: print(len(self.units(OBSERVER)), self.time/3) if self.can_afford(OBSERVER) and self.supply_left > 0: await self.do(rf.train(OBSERVER))
Basically, we just aim to build and have an observer unit active for every 3 minutes of game that has elapsed. So, 9 minutes in, we should have 3 observers.
Our current on_step
method:
async def on_step(self, iteration): self.time = (self.state.game_loop/22.4) / 60 print('Time:',self.time) await self.build_scout() await self.scout() await self.distribute_workers() await self.build_workers() await self.build_pylons() await self.build_assimilators() await self.expand() await self.offensive_force_buildings() await self.build_offensive_force() await self.intel() await self.attack()
Full code up to this point:
import sc2 from sc2 import run_game, maps, Race, Difficulty, Result from sc2.player import Bot, Computer from sc2 import position from sc2.constants import NEXUS, PROBE, PYLON, ASSIMILATOR, GATEWAY, \ CYBERNETICSCORE, STARGATE, VOIDRAY, SCV, DRONE, ROBOTICSFACILITY, OBSERVER import random import cv2 import numpy as np import os import time import math #import keras #os.environ["SC2PATH"] = '/starcraftstuff/StarCraftII/' HEADLESS = False class SentdeBot(sc2.BotAI): def __init__(self, use_model=False): self.MAX_WORKERS = 50 self.do_something_after = 0 self.use_model = use_model ############################### # DICT {UNIT_ID:LOCATION} # every iteration, make sure that unit id still exists! self.scouts_and_spots = {} self.train_data = [] if self.use_model: print("USING MODEL!") self.model = keras.models.load_model("BasicCNN-30-epochs-0.0001-LR-4.2") def on_end(self, game_result): print('--- on_end called ---') print(game_result, self.use_model) with open("gameout-random-vs-medium.txt","a") as f: if self.use_model: f.write("Model {}\n".format(game_result)) else: f.write("Random {}\n".format(game_result)) async def on_step(self, iteration): self.time = (self.state.game_loop/22.4) / 60 print('Time:',self.time) await self.build_scout() await self.scout() await self.distribute_workers() await self.build_workers() await self.build_pylons() await self.build_assimilators() await self.expand() await self.offensive_force_buildings() await self.build_offensive_force() await self.intel() await self.attack() def random_location_variance(self, location): x = location[0] y = location[1] # FIXED THIS x += random.randrange(-5,5) y += random.randrange(-5,5) if x < 0: print("x below") x = 0 if y < 0: print("y below") y = 0 if x > self.game_info.map_size[0]: print("x above") x = self.game_info.map_size[0] if y > self.game_info.map_size[1]: print("y above") y = self.game_info.map_size[1] go_to = position.Point2(position.Pointlike((x,y))) return go_to async def build_scout(self): if len(self.units(OBSERVER)) < math.floor(self.time/3): for rf in self.units(ROBOTICSFACILITY).ready.noqueue: print(len(self.units(OBSERVER)), self.time/3) if self.can_afford(OBSERVER) and self.supply_left > 0: await self.do(rf.train(OBSERVER)) async def scout(self): # {DISTANCE_TO_ENEMY_START:EXPANSIONLOC} self.expand_dis_dir = {} for el in self.expansion_locations: distance_to_enemy_start = el.distance_to(self.enemy_start_locations[0]) #print(distance_to_enemy_start) self.expand_dis_dir[distance_to_enemy_start] = el self.ordered_exp_distances = sorted(k for k in self.expand_dis_dir) existing_ids = [unit.tag for unit in self.units] # removing of scouts that are actually dead now. to_be_removed = [] for noted_scout in self.scouts_and_spots: if noted_scout not in existing_ids: to_be_removed.append(noted_scout) for scout in to_be_removed: del self.scouts_and_spots[scout] # end removing of scouts that are dead now. if len(self.units(ROBOTICSFACILITY).ready) == 0: unit_type = PROBE unit_limit = 1 else: unit_type = OBSERVER unit_limit = 15 assign_scout = True if unit_type == PROBE: for unit in self.units(PROBE): if unit.tag in self.scouts_and_spots: assign_scout = False if assign_scout: if len(self.units(unit_type).idle) > 0: for obs in self.units(unit_type).idle[:unit_limit]: if obs.tag not in self.scouts_and_spots: for dist in self.ordered_exp_distances: try: location = next(value for key, value in self.expand_dis_dir.items() if key == dist) # DICT {UNIT_ID:LOCATION} active_locations = [self.scouts_and_spots[k] for k in self.scouts_and_spots] if location not in active_locations: if unit_type == PROBE: for unit in self.units(PROBE): if unit.tag in self.scouts_and_spots: continue await self.do(obs.move(location)) self.scouts_and_spots[obs.tag] = location break except Exception as e: pass for obs in self.units(unit_type): if obs.tag in self.scouts_and_spots: if obs in [probe for probe in self.units(PROBE)]: await self.do(obs.move(self.random_location_variance(self.scouts_and_spots[obs.tag]))) async def intel(self): game_data = np.zeros((self.game_info.map_size[1], self.game_info.map_size[0], 3), np.uint8) draw_dict = { NEXUS: [15, (0, 255, 0)], PYLON: [3, (20, 235, 0)], PROBE: [1, (55, 200, 0)], ASSIMILATOR: [2, (55, 200, 0)], GATEWAY: [3, (200, 100, 0)], CYBERNETICSCORE: [3, (150, 150, 0)], STARGATE: [5, (255, 0, 0)], ROBOTICSFACILITY: [5, (215, 155, 0)], #VOIDRAY: [3, (255, 100, 0)], } for unit_type in draw_dict: for unit in self.units(unit_type).ready: pos = unit.position cv2.circle(game_data, (int(pos[0]), int(pos[1])), draw_dict[unit_type][0], draw_dict[unit_type][1], -1) # from Александр Тимофеев via YT main_base_names = ['nexus', 'commandcenter', 'orbitalcommand', 'planetaryfortress', 'hatchery'] for enemy_building in self.known_enemy_structures: pos = enemy_building.position if enemy_building.name.lower() not in main_base_names: cv2.circle(game_data, (int(pos[0]), int(pos[1])), 5, (200, 50, 212), -1) for enemy_building in self.known_enemy_structures: pos = enemy_building.position if enemy_building.name.lower() in main_base_names: cv2.circle(game_data, (int(pos[0]), int(pos[1])), 15, (0, 0, 255), -1) for enemy_unit in self.known_enemy_units: if not enemy_unit.is_structure: worker_names = ["probe", "scv", "drone"] # if that unit is a PROBE, SCV, or DRONE... it's a worker pos = enemy_unit.position if enemy_unit.name.lower() in worker_names: cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (55, 0, 155), -1) else: cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (50, 0, 215), -1) for obs in self.units(OBSERVER).ready: pos = obs.position cv2.circle(game_data, (int(pos[0]), int(pos[1])), 1, (255, 255, 255), -1) for vr in self.units(VOIDRAY).ready: pos = vr.position cv2.circle(game_data, (int(pos[0]), int(pos[1])), 3, (255, 100, 0), -1) line_max = 50 mineral_ratio = self.minerals / 1500 if mineral_ratio > 1.0: mineral_ratio = 1.0 vespene_ratio = self.vespene / 1500 if vespene_ratio > 1.0: vespene_ratio = 1.0 population_ratio = self.supply_left / self.supply_cap if population_ratio > 1.0: population_ratio = 1.0 plausible_supply = self.supply_cap / 200.0 military_weight = len(self.units(VOIDRAY)) / (self.supply_cap-self.supply_left) if military_weight > 1.0: military_weight = 1.0 cv2.line(game_data, (0, 19), (int(line_max*military_weight), 19), (250, 250, 200), 3) # worker/supply ratio cv2.line(game_data, (0, 15), (int(line_max*plausible_supply), 15), (220, 200, 200), 3) # plausible supply (supply/200.0) cv2.line(game_data, (0, 11), (int(line_max*population_ratio), 11), (150, 150, 150), 3) # population ratio (supply_left/supply) cv2.line(game_data, (0, 7), (int(line_max*vespene_ratio), 7), (210, 200, 0), 3) # gas / 1500 cv2.line(game_data, (0, 3), (int(line_max*mineral_ratio), 3), (0, 255, 25), 3) # minerals minerals/1500 # flip horizontally to make our final fix in visual representation: self.flipped = cv2.flip(game_data, 0) resized = cv2.resize(self.flipped, dsize=None, fx=2, fy=2) if not HEADLESS: if self.use_model: cv2.imshow('Model Intel', resized) cv2.waitKey(1) else: cv2.imshow('Random Intel', resized) cv2.waitKey(1) async def build_workers(self): if (len(self.units(NEXUS)) * 16) > len(self.units(PROBE)) and len(self.units(PROBE)) < self.MAX_WORKERS: for nexus in self.units(NEXUS).ready.noqueue: if self.can_afford(PROBE): await self.do(nexus.train(PROBE)) async def build_pylons(self): if self.supply_left < 5 and not self.already_pending(PYLON): nexuses = self.units(NEXUS).ready if nexuses.exists: if self.can_afford(PYLON): await self.build(PYLON, near=nexuses.first) async def build_assimilators(self): for nexus in self.units(NEXUS).ready: vaspenes = self.state.vespene_geyser.closer_than(15.0, nexus) for vaspene in vaspenes: if not self.can_afford(ASSIMILATOR): break worker = self.select_build_worker(vaspene.position) if worker is None: break if not self.units(ASSIMILATOR).closer_than(1.0, vaspene).exists: await self.do(worker.build(ASSIMILATOR, vaspene)) async def expand(self): try: if self.units(NEXUS).amount < self.time/2 and self.can_afford(NEXUS): await self.expand_now() except Exception as e: print(str(e)) async def offensive_force_buildings(self): if self.units(PYLON).ready.exists: pylon = self.units(PYLON).ready.random if self.units(GATEWAY).ready.exists and not self.units(CYBERNETICSCORE): if self.can_afford(CYBERNETICSCORE) and not self.already_pending(CYBERNETICSCORE): await self.build(CYBERNETICSCORE, near=pylon) elif len(self.units(GATEWAY)) < 1: if self.can_afford(GATEWAY) and not self.already_pending(GATEWAY): await self.build(GATEWAY, near=pylon) if self.units(CYBERNETICSCORE).ready.exists: if len(self.units(ROBOTICSFACILITY)) < 1: if self.can_afford(ROBOTICSFACILITY) and not self.already_pending(ROBOTICSFACILITY): await self.build(ROBOTICSFACILITY, near=pylon) if self.units(CYBERNETICSCORE).ready.exists: if len(self.units(STARGATE)) < self.time: if self.can_afford(STARGATE) and not self.already_pending(STARGATE): await self.build(STARGATE, near=pylon) async def build_offensive_force(self): for sg in self.units(STARGATE).ready.noqueue: if self.can_afford(VOIDRAY) and self.supply_left > 0: await self.do(sg.train(VOIDRAY)) def find_target(self, state): if len(self.known_enemy_units) > 0: return random.choice(self.known_enemy_units) elif len(self.known_enemy_structures) > 0: return random.choice(self.known_enemy_structures) else: return self.enemy_start_locations[0] async def attack(self): if len(self.units(VOIDRAY).idle) > 0: target = False if self.time > self.do_something_after: if self.use_model: prediction = self.model.predict([self.flipped.reshape([-1, 176, 200, 3])]) choice = np.argmax(prediction[0]) else: choice = random.randrange(0, 4) if choice == 0: # no attack wait = random.randrange(7, 100)/100 self.do_something_after = self.time + wait elif choice == 1: #attack_unit_closest_nexus if len(self.known_enemy_units) > 0: target = self.known_enemy_units.closest_to(random.choice(self.units(NEXUS))) elif choice == 2: #attack enemy structures if len(self.known_enemy_structures) > 0: target = random.choice(self.known_enemy_structures) elif choice == 3: #attack_enemy_start target = self.enemy_start_locations[0] if target: for vr in self.units(VOIDRAY).idle: await self.do(vr.attack(target)) y = np.zeros(4) y[choice] = 1 self.train_data.append([y, self.flipped]) run_game(maps.get("AbyssalReefLE"), [ Bot(Race.Protoss, SentdeBot(use_model=False)), Computer(Race.Protoss, Difficulty.Medium), ], realtime=False)