Data Collection

eecbs_batchrunner.py

This module provides a generic batch runner for Multi-Agent Path Finding (MAPF) solvers, specifically EECBS and Python-based ML models.

The batch runner handles:

  1. Parallel execution of MAPF solver runs across multiple scenarios and maps

  2. Data collection and processing of results

  3. Conversion of results to NPZ format for machine learning applications

Note

Some of the below documentation has been created with the assistance of generative AI and so should be taken with a grain of salt.

Module Constants

mapsToMaxNumAgents

Dictionary mapping map names to maximum number of agents each map can handle

Tmux Session Functions

createTmuxSession(i)

Create a new tmux session with a given index.

Parameters:

i (int) – Index for the tmux session

runCommandWithTmux(i, command)

Run a command in a tmux session with a given index.

Parameters:
  • i (int) – Index of the tmux session

  • command (str) – Command to run in the tmux session

killTmuxSession(i)

Kill a tmux session with a given index.

Parameters:

i (int) – Index of the tmux session to kill

Command Generation

getEECBSCommand(eecbsArgs, outputFolder, outputfile, mapfile, numAgents, scenfile)

Generate the command for running EECBS.

Parameters:
  • eecbsArgs (dict) – Arguments for EECBS

  • outputFolder (str) – Folder for output files

  • outputfile (str) – File for EECBS output

  • mapfile (str) – Path to map file

  • numAgents (int) – Number of agents

  • scenfile (str) – Path to scenario file

Returns:

Command for running EECBS

Return type:

str

getPyModelCommand(runnerArgs, outputFolder, outputfile, mapfile, numAgents, scenfile)

Generate the command for running the Python ML model.

Parameters:
  • runnerArgs (dict) – Arguments for the Python model

  • outputFolder (str) – Folder for output files

  • outputfile (str) – File for model output

  • mapfile (str) – Path to map file

  • numAgents (int) – Number of agents

  • scenfile (str) – Path to scenario file

Returns:

Command for running the Python model

Return type:

str

getCommandForSingleInstance(runnerArgs, outputFolder, outputfile, mapfile, numAgents, scenfile)

Get the command for running a single instance based on the runner type.

Parameters:
  • runnerArgs (dict) – Arguments for the runner

  • outputFolder (str) – Folder for output files

  • outputfile (str) – File for output

  • mapfile (str) – Path to map file

  • numAgents (int) – Number of agents

  • scenfile (str) – Path to scenario file

Returns:

Command for running the instance

Return type:

str

Raises:

ValueError if the command is unknown

Status Detection

detectExistingStatus(runnerArgs, mapfile, aNum, scenfile, df)

Detect if the current configuration has already been run and if it was successful.

Parameters:
  • runnerArgs (dict) – Arguments for the runner

  • mapfile (str) – Path to map file

  • aNum (int) – Number of agents

  • scenfile (str) – Path to scenario file

  • df (pandas.DataFrame or str) – DataFrame or path to CSV file with results

Returns:

Tuple of (has_been_run, success_status)

Return type:

tuple

Raises:

KeyError if a key is not found in the dataframe or the command is unknown

Multi-threaded Execution

runSingleInstanceMT(queue, nameToNumRun, lock, worker_id, idToWorkerOutputFilepath, static_dict, runnerArgs, mapName, curAgentNum, scen)

Run a single instance of the MAPF solver in multi^threading mode.

Parameters:
  • queue (multiprocessing.Queue) – Queue for communication between processes

  • nameToNumRun (dict) – Dictionary mapping map names to number of remaining runs

  • lock (multiprocessing.Lock) – Lock for thread safety

  • worker_id (int) – ID of the worker process

  • idToWorkerOutputFilepath (callable) – Function to get the output file path

  • static_dict (dict) – Dictionary with static information

  • runnerArgs (dict) – Arguments for the runner

  • mapName (str) – Name of the map

  • curAgentNum (int) – Number of agents

  • scen (str) – Path to scenario file

checkIfRunNextAgents(queue, nameToNumRun, lock, num_workers, idToWorkerOutputFilepath, static_dict, eecbsArgs, mapName, curAgentNum)

Check if the next agent numbers should be run after completing all runs for the current agent number.

Parameters:
  • queue (multiprocessing.Queue) – Queue for communication between processes

  • nameToNumRun (dict) – Dictionary mapping map names to number of remaining runs

  • lock (multiprocessing.Lock) – Lock for thread safety

  • num_workers (int) – Number of worker processes

  • idToWorkerOutputFilepath (callable) – Function to get the output file path

  • static_dict (dict) – Dictionary with static information

  • eecbsArgs (dict) – Arguments for EECBS

  • mapName (str) – Name of the map

  • curAgentNum (int) – Current number of agents

worker(queue, nameToNumRun, lock, worker_id, num_workers, static_dict, idToWorkerOutputFilepath)

Worker process function that processes tasks from the queue.

Parameters:
  • queue (multiprocessing.JoinableQueue) – Queue for communication between processes

  • nameToNumRun (dict) – Dictionary mapping map names to number of remaining runs

  • lock (multiprocessing.Lock) – Lock for thread safety

  • worker_id (int) – ID of the worker process

  • num_workers (int) – Number of worker processes

  • static_dict (dict) – Dictionary with static information

  • idToWorkerOutputFilepath (callable) – Function to get the output file path

Raises:

ValueError if the function is unknown

helperRun(command)

Helper function to run a command in a shell.

Parameters:

command (str) – Command to run

Setup and Configuration

specificRunnerDictSetup(args)

Set up the runner arguments dictionary based on command type.

Parameters:

args (argparse.Namespace) – Command line arguments

Returns:

Runner arguments dictionary

Return type:

dict

Raises:

ValueError if the command is unknown

eecbs_runner_setup(args)

Set up the global variables and paths for EECBS runner.

Parameters:

args (argparse.Namespace) – Command line arguments

runDataManipulator(args, ct, mapsToScens, static_dict, outputPathNpzFolder, mapsInputFolder, num_workers)

Run the data manipulator to convert outputs to NPZ format.

Parameters:
  • args (argparse.Namespace) – Command line arguments

  • ct (CustomTimer) – Timer object for measuring execution time

  • mapsToScens (dict) – Dictionary mapping map names to scenario files

  • static_dict (dict) – Dictionary with static information

  • outputPathNpzFolder (str) – Folder for output NPZ files

  • mapsInputFolder (str) – Folder with input map files

  • num_workers (int) – Number of worker processes

generic_batch_runner(args)

Main function for the generic batch runner.

This function handles the overall execution flow, including:

^ Setting up the filesystem ^ Starting worker processes ^ Creating jobs ^ Processing results ^ Running the data manipulator

Parameters:

args (argparse.Namespace) – Command line arguments

Usage Examples

Basic usage with EECBS:

python -m data_collection.eecbs_batchrunner
  --mapFolder=data_collection/data/benchmark_data/maps \
  --scenFolder=data_collection/data/benchmark_data/scens \
  --constantMapAndBDFolder=data_collection/data/benchmark_data/constant_npzs2 \
  --outputFolder=data_collection/data/logs/EXP_Test_batch/iter0/eecbs_outputs \
  --num_parallel_runs=50 \
  "eecbs" \
  --outputPathNpzFolder=data_collection/data/logs/EXP_Test_batch/iter0/eecbs_npzs \
  --firstIter=false --cutoffTime=5

Basic usage with Python model:

python -m data_collection.eecbs_batchrunner
  --mapFolder=data_collection/data/benchmark_data/maps \
  --scenFolder=data_collection/data/benchmark_data/scens \
  --constantMapAndBDFolder=data_collection/data/benchmark_data/constant_npzs2 \
  --outputFolder=data_collection/data/logs/EXP_Test_batch/iter0/pymodel_outputs \
  --num_parallel_runs=50 \
  "pymodel" \
  --modelPath=data_collection/data/logs/EXP_Test2/iter0/models/max_test_acc.pt \
  --k=4 --m=5 --maxSteps=100 --shieldType=CS-PIBT

data_manipulator.py

This file processes raw data from EECBS solver runs and converts them into NPZ format suitable for machine learning applications. It handles maps, backward Dijkstra (BD) values, and path data.

Key operations:

  • Parse map files (.map) to NumPy arrays

  • Process backward Dijkstra (BD) files to NumPy arrays

  • Convert agent path information to NumPy arrays

  • Save data in compressed NPZ format for efficient loading

Classes

class PipelineDataset(Dataset)

A PyTorch Dataset for loading EECBS instances for training ML models.

__init__(self, mapFileNpz, bdFileNpz, pathFileNpz, k, size, max_agents, helper_bd_preprocess='middle')
Parameters:
  • mapFileNpz (str) – Path to NPZ file containing map data

  • bdFileNpz (str) – Path to NPZ file containing backward Dijkstra data

  • pathFileNpz (str) – Path to NPZ file containing path data

  • k (int) – Window size for local observation

  • size (int) – Maximum size of dataset

  • max_agents (int) – Maximum number of agents

  • helper_bd_preprocess (str) – Method to center helper backward Dijkstras (‘middle’, ‘current’, or ‘subtraction’)

__len__(self)
Returns:

Number of instances in the dataset

Return type:

int

__getitem__(self, idx)

Retrieves an item from the dataset, providing the local observation window.

Parameters:

idx (int) – Index of the instance to retrieve

Returns:

Tuple of (current_locations, one_hot_labels, backward_dijkstra, grid_map, goal_locations)

Return type:

tuple

find_instance(self, idx)

Finds the specific instance based on the index.

Parameters:

idx (int) – Index to find

Returns:

Tuple of (backward_dijkstra, grid_map, paths, timestep, max_timesteps)

Return type:

tuple

parse_npz(self, loaded_paths, loaded_maps, loaded_bds)

Parses loaded NPZ data and prepares it for dataset access.

Parameters:
  • loaded_paths (dict) – Dictionary of path data

  • loaded_maps (dict) – Dictionary of map data

  • loaded_bds (dict) – Dictionary of backward Dijkstra data

parse_npz2(self)

Alternative parsing method that filters and validates data.

File Parsing Functions

parse_map(mapfile)

Parses a map file into a NumPy array.

Parameters:

mapfile (str) – Path to map file

Returns:

2D array where 1 represents obstacles and 0 represents free space

Return type:

numpy.ndarray

parse_path(pathfile)

Parses a path file containing agent movements over time.

Parameters:

pathfile (str) – Path to path file

Returns:

3D array of shape (timesteps, num_agents, 2) containing agent positions

Return type:

numpy.ndarray

parse_bd(bdfile)

Parses a backward Dijkstra file into a NumPy array.

Parameters:

bdfile (str) – Path to backward Dijkstra file

Returns:

3D array of shape (num_agents, height, width) containing distance values

Return type:

numpy.ndarray

Batch Processing Functions

batch_map(dir, num_parallel)

Processes multiple map files in parallel.

Parameters:
  • dir (str) – Directory containing map files

  • num_parallel (int) – Number of parallel processes to use

Returns:

Dictionary mapping filenames to map arrays

Return type:

dict

batch_bd(dir, num_parallel)

Processes multiple backward Dijkstra files in parallel.

Parameters:
  • dir (str) – Directory containing BD files

  • num_parallel (int) – Number of parallel processes to use

Returns:

Dictionary mapping scenario names to BD arrays

Return type:

dict

batch_path(dir)

Processes multiple path files.

Parameters:

dir (str) – Directory containing path files

Returns:

Dictionary mapping key strings to path arrays

Return type:

dict

Command Line Interface

main()

Entry point for the command-line interface. Parses arguments and orchestrates the processing of maps, backward Dijkstra values, and paths.

Usage Examples

Example usage from command line:

python -m data_collection.data_manipulator
  --pathsIn=data_collection/data/logs/EXP_Collect_BD/iter0/eecbs_outputs/empty_8_8/paths/
  --pathOutFile=data_collection/data/logs/EXP_Collect_BD/iter0/eecbs_npzs/empty_8_8_paths.npz
  --bdIn=data_collection/data/logs/EXP_Collect_BD/iter0/eecbs_outputs/empty_8_8/bd
  --bdOutFile=data_collection/data/benchmark_data/constant_npzs2/empty_8_8_bds.npz
  --mapIn=data_collection/data/benchmark_data/maps
  --mapOutFile=data_collection/data/benchmark_data/constant_npzs2/empty_8_8_map.npz
  --num_parallel=1