Skip to content

Final Touches & Analysis Tools

In the previous tutorial, we have created a complete game with autonomous agents. However, we have not addressed the issue of getting custom information in the agent state that might depend on the scenario. Furthermore, it will be good to store a simulation and replay it without having to do all the calculations again. In this tutorial, we will try to address these issues and explore many of the functionalities of GAMMS that we have not used so far.

Custom Sensors

In the scenario that we have constructed, the agents already have information about where the capturable nodes are and what nodes are in the territories of both teams. To make it more realistic, it will be better that the agents can sense this information and make decisions based on how much exploration they have done. We will create two sensors; one for capturable nodes and one for the territory. After that, we will also see how to visualize the outputs from the new sensors in the game.

To create a custom sensor, we need to inherit gamms.typing.ISensor which defines the interface for sensors.

game.py
@ctx.sensor.custom(name="CAPTURE")
class CapturableSensor(gamms.typing.ISensor):
    def __init__(self, ctx, sensor_id, capturable_nodes, range: float = 160.0):
        self.ctx = ctx
        self.capturable_nodes = capturable_nodes
        self._data = {}
        self._sensor_id = sensor_id
        self._range = range
        self._owner = None

    def set_owner(self, owner):
        self._owner = owner

    @property
    def type(self):
        return gamms.sensor.SensorType.CUSTOM

    @property
    def data(self):
        return self._data

    @property
    def sensor_id(self):
        return self._sensor_id

    def sense(self, node_id):
        node = self.ctx.graph.graph.get_node(node_id)
        self._data.clear()
        for cnode_id in self.capturable_nodes:
            capturable_node = self.ctx.graph.graph.get_node(cnode_id)
            dist = (node.x - capturable_node.x)**2 + (node.y - capturable_node.y)**2
            if dist < self._range**2:
                self._data[cnode_id] = dist**0.5


    def update(self, data):
        return

There are multiple methods that are necessary to be defined for a custom sensor to work. All the 5 methods defined are required methods, and if not defined, GAMMS will throw an error when instantiating the sensor. In addition, we have a decorator @ctx.sensor.custom which registers the sensor with the context. The wrapper does somethings behind the scenes and requires a name parameter. As such, there is a generic type custom for custom sensors, but ideally we would like to distinguish between different custom sensors. One of the things that the decorator does for you is extend the SensorType to have a SensorType.CAPTURE and the type of the capturable sensor will be returned as SensorType.CAPTURE. The sense method is called when the data is to be collected. The update method is something that can be used to create active sensors. We will not worry about the update method for now as our sensor just collects environment data. The set_owner method gets called by the agent when the sensor is registered to it. The agent will pass its name as the argument to the set_owner method. The data property is the data that is collected by the sensor and sensor_id is the unique id of the sensor.

Let us remove the capturable_artist that we created previously and in-place modify the draw_capturable_nodes function to define the visualization for the new sensor. The new function will look like this:

game.py
# Capturable nodes artist
def draw_capturable_nodes(ctx, data):
    width = data.get('width', 10)
    height = data.get('height', 10)
    sensor = ctx.sensor.get_sensor(data['sensor'])
    color = data.get('color', (0, 0, 0))
    if sensor is not None:
        return
    for node_id in sensor.data:
        node = ctx.graph.graph.get_node(node_id)
        ctx.visual.render_rectangle(node.x, node.y, width, height, color=color)

The draw_capturable_nodes function now takes an additional parameter sensor which is the sensor sensor_id from which the data can be obtained. We need to create an artist for each copy of CAPTURE sensor we create andd register them to the agents.

game.py
# Create capture sensors
for name in red_team:
    sensor_id = f"capture_{name}"
    sensor = CapturableSensor(ctx, sensor_id, list(blue_start_dict.values()), range=160.0)
    ctx.sensor.add_sensor(sensor)
    ctx.agent.get_agent(name).register_sensor(sensor_id, sensor)
    artist = gamms.visual.Artist(
        ctx,
        drawer=draw_capturable_nodes,
        layer=39,
    )
    artist.data['sensor'] = sensor.sensor_id
    artist.data['width'] = 10.0
    artist.data['height'] = 10.0
    artist.data['color'] = (0, 0, 255)
    ctx.visual.add_artist(sensor_id, artist)

for name in blue_team:
    sensor_id = f"capture_{name}"
    sensor = CapturableSensor(ctx, sensor_id, list(red_start_dict.values()), range=160.0)
    ctx.sensor.add_sensor(sensor)
    ctx.agent.get_agent(name).register_sensor(sensor_id, sensor)
    artist = gamms.visual.Artist(
        ctx,
        drawer=draw_capturable_nodes,
        layer=39,
    )
    artist.data['sensor'] = sensor.sensor_id
    artist.data['width'] = 10.0
    artist.data['height'] = 10.0
    artist.data['color'] = (255, 0, 0)
    ctx.visual.add_artist(sensor_id, artist)

We are creating the sensors with a range of 160 meters and defining the opposite team as the capturable nodes. The draw_capturable_nodes function will be called by the artist to draw the capturable nodes. The color parameter is set to the color of the opposite team so that the red team will see blue rectangles and the blue team will see red rectangles. Let's remove the artist grouping logic and the turn on/off logic as well. We are calling the ctx.sensor.add_sensor method to keep a reference to the sensor object in the context. The ctx.agent.get_agent(name).register_sensor(sensor_id, sensor) method registers the sensor to the agent.

The ctx.sensor.get_sensor method used in the draw_capturable_nodes function will fail if the sensor is not registered in the context.

Let us now add the territory sensor which will return the territory of both teams in the range of the sensor.

game.py
@ctx.sensor.custom(name="TERRITORY")
class TerritorySensor(gamms.typing.ISensor):
    def __init__(self, ctx, sensor_id, red_nodes, blue_nodes, range: float = 160.0):
        self.ctx = ctx
        self.red_nodes = red_nodes
        self.blue_nodes = blue_nodes
        self._data = {'red': {}, 'blue': {}}
        self._sensor_id = sensor_id
        self._range = range
        self._owner = None

    def set_owner(self, owner):
        self._owner = owner

    @property
    def type(self):
        return gamms.sensor.SensorType.CUSTOM

    @property
    def data(self):
        return self._data

    @property
    def sensor_id(self):
        return self._sensor_id

    def sense(self, node_id):
        node = self.ctx.graph.graph.get_node(node_id)
        self._data.clear()
        self._data['red'] = {}
        self._data['blue'] = {}
        for cnode_id in self.red_nodes:
            territory_node = self.ctx.graph.graph.get_node(cnode_id)
            dist = (node.x - territory_node.x)**2 + (node.y - territory_node.y)**2
            if dist < self._range**2:
                self._data['red'][cnode_id] = dist**0.5
        for cnode_id in self.blue_nodes:
            territory_node = self.ctx.graph.graph.get_node(cnode_id)
            dist = (node.x - territory_node.x)**2 + (node.y - territory_node.y)**2
            if dist < self._range**2:
                self._data['blue'][cnode_id] = dist**0.5

    def update(self, data):
        return

Let us also add the visualization for the territory sensor similar to what we did before.

game.py
# Territory nodes artist
def draw_territory_nodes(ctx, data):
    size = data.get('size', 10)
    sensor = ctx.sensor.get_sensor(data['sensor'])
    for node_id in sensor.data['red']:
        node = ctx.graph.graph.get_node(node_id)
        ctx.visual.render_circle(node.x, node.y, size, color=(255, 0, 0))
    for node_id in sensor.data['blue']:
        node = ctx.graph.graph.get_node(node_id)
        ctx.visual.render_circle(node.x, node.y, size, color=(0, 0, 255))

We are drawing circles for the territory nodes. The size parameter is the radius of the circle to be drawn. We will create the sensor and register it to the agents in a similar way as we did for the capturable sensor. After registering the sensors, we will see an overlay of red and blue nodes on the output of the map sensors showing the various territories.

game.py
# Create territory sensors
for name in red_team:
    sensor_id = f"territory_{name}"
    sensor = TerritorySensor(ctx, sensor_id, list(red_territory), list(blue_territory), range=160.0)
    ctx.sensor.add_sensor(sensor)
    ctx.agent.get_agent(name).register_sensor(sensor_id, sensor)
    artist = gamms.visual.Artist(
        ctx,
        drawer=draw_territory_nodes,
        layer=39,
    )
    artist.data['sensor'] = sensor.sensor_id
    artist.data['size'] = 10.0
    ctx.visual.add_artist(sensor_id, artist)

for name in blue_team:
    sensor_id = f"territory_{name}"
    sensor = TerritorySensor(ctx, sensor_id, list(blue_territory), list(red_territory), range=160.0)
    ctx.sensor.add_sensor(sensor)
    ctx.agent.get_agent(name).register_sensor(sensor_id, sensor)
    artist = gamms.visual.Artist(
        ctx,
        drawer=draw_territory_nodes,
        layer=39,
    )
    artist.data['sensor'] = sensor.sensor_id
    artist.data['size'] = 10.0
    ctx.visual.add_artist(sensor_id, artist)

Territory Sensor Visualization Visualization of territories detected by the territory sensor, displayed as colored circles

Now that we have the input from these sensors, we can remove the hard coded territory and capturable nodes from the agent state. The agent will now be able to figure out the information about the territory and capturable nodes from the sensors. Let's look at the changes to the blue_strategy.py file.

blue_strategy.py
        capture_sensor_data = sensor_data[f'capture_{self.name}'][1]
        # Get the territory sensor data
        territory_sensor_data = sensor_data[f'territory_{self.name}'][1]

        for node_id in capture_sensor_data:
            capturable_nodes.add(node_id)
        for node_id in territory_sensor_data['blue']: # change according to the team
            self_territory.add(node_id)
        for node_id in territory_sensor_data['red']: # change according to the team
            opposite_territory.add(node_id)

The capture_sensor_data and territory_sensor_data variables are the data collected by the sensors. The self_territory and opposite_territory variables are updated with the data from the sensors. The capturable_nodes variable is updated with the data from the capture_sensor_data. We remove the arguments from the mapper function as we are not using them anymore. The mapper function will look like this:

blue_strategy.py
def mapper(agent_names):
    ret = {}
    for name in agent_names:
        team_names.add(name)
        agent = Agent(name)
        ret[name] = agent.strategy

    return ret

We make similar changes to the red_strategy.py file where the only difference is that the self_territory and opposite_territory variables are swapped. We also remove the arguments passed to mapper in game.py.

Final changed files at snippets/custom_sensors

Recording System

Until now, we have relied on the visual engine to show us the output of the simulation. But there is quite an overhead in visualizing everything . When we have human input, it makes sense to visualize things but when we have autonomous agents, we can run the simulation without any visualization and then replay the simulation. Furthermore, we need a way to record a simulation to replay it and do analysis on it. GAMMS provides a Recorder which allows you to record a simulation and then replay it. To record a game, we only need to start the recording and there is no need to change anything in the game file. To record a game, we will add the following lines to the game.py file:

game.py
# Record
ctx.record.start(path="recording")

The start method takes a path parameter which is the path where the recording will be saved. The recording will be saved to recording.ggr as a binary file. The recorded files differentailly stores the data for each step. In the current version, the following events are recorded by default:

  1. Agent creation and deletion
  2. Agent position changes
  3. When a sensor is registered or deregistered from an agent
  4. Every time the control is given to the visual engine (Calls to ctx.visual.simulate())
  5. When the game is terminated

The recording will only start after the call to ctx.record.start() and will stop when the game is terminated. Anything before will not be recorded. There are also play, pasue and stop methods to control the recording process. The mmethod ctx.record.record() returns a True/False based on whether recording is in progress or not.

To speed up the simulation, we can also turn off the visual engine. To do this, we will switch the visual engine to NO_VIS.

config.py
VIS_ENGINE = gamms.visual.Engine.NO_VIS # visual engine to use

The game will run but as we are not visualizing anything, it is okay to remove all the artists related setups from the game.py file. It will not make any difference in the game speed but it will make the code cleaner and easier to read. Now we can create a file called replay.py which will contain the code to replay the simulation.

replay.py
import gamms, gamms.osm
import config

ctx = gamms.create_context(
    vis_engine=gamms.visual.Engine.PYGAME,
    vis_kwargs={'simulation_time_constant': 0.3},
    logger_config={'level': 'WARNING'},
) # create a context with PYGAME as the visual engine

G = gamms.osm.graph_from_xml(
    config.XML_FILE,
    resolution=config.RESOLUTION,
    tolerance=config.TOLERANCE,
    bidirectional=config.BIDIRECTIONAL
)
ctx.graph.attach_networkx_graph(G) # attach the graph to the context

# Create the graph visualization

graph_artist = ctx.visual.set_graph_visual(**config.graph_vis_config) # set the graph visualization with width 1980 and height 1080

# Create all agents visualization
for name, vis_config in config.agent_vis_config.items():
    artist = ctx.visual.set_agent_visual(name, **vis_config)

for _ in ctx.record.replay("recording"):
    continue

ctx.terminate() # terminate the context

We are setting up the visualizations for the graph and the agents and then playing the recording. The ctx.record.replay method takes the path to the recording file and returns a generator which yields the events in the recording. We are not doing anything for now but internally the event is yeilded after executing the event. We will look into details of the replay system in the next section. We pass the simulation_time_constant parameter to the visual engine to control the speed of the simulation. Here, we are speeding it up to 0.3 seconds instead of the deafult 2 seconds. The logger level sets the level of logging to WARNING. The recorder will log all events on the deafult INFO level.

Warnings will appear that the sensors are being ignored. This is because no sensors are defined. If the sensors are defined, the data will be populated during replay. Similarly, if a visualization for populated sensor data is defined, it will be drawn during replay.

In the scenario we have created, it will be good if we can keep a track of the team scores and access them while replaying. To do this, we need to create a recorded component.

game.py
# Create recorded component
@ctx.record.component(struct={
    'step': int,
    'max_steps': int,
    'red_tag_score': int,
    'blue_tag_score': int,
    'red_capture_score': int,
    'blue_capture_score': int,
    }
)
class ReportCard:
    def __init__(self):
        self.step = 0
        self.max_steps = config.MIN_SIM_STEPS
        self.red_tag_score = 0
        self.blue_tag_score = 0
        self.red_capture_score = 0
        self.blue_capture_score = 0


report_card = ReportCard(name="report_card")

The ReportCard class is a recorded component which will be used to keep track of the scores. The struct parameter is a dictionary which defines the structure of the component. The step and max_steps parameters are used to keep track of the current step and the maximum number of steps. We have split the scores into tag_score and capture_score for both teams. The ctx.record.component decorator registers the component type with the context. The name parameter is directly added by the decorator and it is required to be passed when creating the component. The name needs to be unique for each component.

The keyword name is reserved and should not be used as a variable in the component. Other internal parameters can be created as a normal object but only the ones mentioned in the struct will be recorded. The type of all the parameters in the struct should be json serializable -- Type definition: JsonType = Union[None, int, str, bool, List["JsonType"], Dict[str, "JsonType"]]

We can replace all the different score variables with a reference to the report_card object and update the scores in the capture_rule and tag_rule functions. We also need to update the counter updates and in the main loop. The step_counter variable is now replaced with report_card.step and the max_steps variable is replaced with report_card.max_steps.

Final changed files at snippets/recording_system

Replay Analysis

Under Construction