Data Collection¶
eecbs_batchrunner.py¶
This module provides a generic batch runner for Multi-Agent Path Finding (MAPF) solvers, specifically EECBS and Python-based ML models.
The batch runner handles:
Parallel execution of MAPF solver runs across multiple scenarios and maps
Data collection and processing of results
Conversion of results to NPZ format for machine learning applications
Note
Some of the below documentation has been created with the assistance of generative AI and so should be taken with a grain of salt.
Module Constants¶
- mapsToMaxNumAgents¶
Dictionary mapping map names to maximum number of agents each map can handle
Tmux Session Functions¶
- createTmuxSession(i)¶
Create a new tmux session with a given index.
- Parameters:
i (int) – Index for the tmux session
- runCommandWithTmux(i, command)¶
Run a command in a tmux session with a given index.
- Parameters:
i (int) – Index of the tmux session
command (str) – Command to run in the tmux session
- killTmuxSession(i)¶
Kill a tmux session with a given index.
- Parameters:
i (int) – Index of the tmux session to kill
Command Generation¶
- getEECBSCommand(eecbsArgs, outputFolder, outputfile, mapfile, numAgents, scenfile)¶
Generate the command for running EECBS.
- Parameters:
eecbsArgs (dict) – Arguments for EECBS
outputFolder (str) – Folder for output files
outputfile (str) – File for EECBS output
mapfile (str) – Path to map file
numAgents (int) – Number of agents
scenfile (str) – Path to scenario file
- Returns:
Command for running EECBS
- Return type:
str
- getPyModelCommand(runnerArgs, outputFolder, outputfile, mapfile, numAgents, scenfile)¶
Generate the command for running the Python ML model.
- Parameters:
runnerArgs (dict) – Arguments for the Python model
outputFolder (str) – Folder for output files
outputfile (str) – File for model output
mapfile (str) – Path to map file
numAgents (int) – Number of agents
scenfile (str) – Path to scenario file
- Returns:
Command for running the Python model
- Return type:
str
- getCommandForSingleInstance(runnerArgs, outputFolder, outputfile, mapfile, numAgents, scenfile)¶
Get the command for running a single instance based on the runner type.
- Parameters:
runnerArgs (dict) – Arguments for the runner
outputFolder (str) – Folder for output files
outputfile (str) – File for output
mapfile (str) – Path to map file
numAgents (int) – Number of agents
scenfile (str) – Path to scenario file
- Returns:
Command for running the instance
- Return type:
str
- Raises:
ValueError if the command is unknown
Status Detection¶
- detectExistingStatus(runnerArgs, mapfile, aNum, scenfile, df)¶
Detect if the current configuration has already been run and if it was successful.
- Parameters:
runnerArgs (dict) – Arguments for the runner
mapfile (str) – Path to map file
aNum (int) – Number of agents
scenfile (str) – Path to scenario file
df (pandas.DataFrame or str) – DataFrame or path to CSV file with results
- Returns:
Tuple of (has_been_run, success_status)
- Return type:
tuple
- Raises:
KeyError if a key is not found in the dataframe or the command is unknown
Multi-threaded Execution¶
- runSingleInstanceMT(queue, nameToNumRun, lock, worker_id, idToWorkerOutputFilepath, static_dict, runnerArgs, mapName, curAgentNum, scen)¶
Run a single instance of the MAPF solver in multi^threading mode.
- Parameters:
queue (multiprocessing.Queue) – Queue for communication between processes
nameToNumRun (dict) – Dictionary mapping map names to number of remaining runs
lock (multiprocessing.Lock) – Lock for thread safety
worker_id (int) – ID of the worker process
idToWorkerOutputFilepath (callable) – Function to get the output file path
static_dict (dict) – Dictionary with static information
runnerArgs (dict) – Arguments for the runner
mapName (str) – Name of the map
curAgentNum (int) – Number of agents
scen (str) – Path to scenario file
- checkIfRunNextAgents(queue, nameToNumRun, lock, num_workers, idToWorkerOutputFilepath, static_dict, eecbsArgs, mapName, curAgentNum)¶
Check if the next agent numbers should be run after completing all runs for the current agent number.
- Parameters:
queue (multiprocessing.Queue) – Queue for communication between processes
nameToNumRun (dict) – Dictionary mapping map names to number of remaining runs
lock (multiprocessing.Lock) – Lock for thread safety
num_workers (int) – Number of worker processes
idToWorkerOutputFilepath (callable) – Function to get the output file path
static_dict (dict) – Dictionary with static information
eecbsArgs (dict) – Arguments for EECBS
mapName (str) – Name of the map
curAgentNum (int) – Current number of agents
- worker(queue, nameToNumRun, lock, worker_id, num_workers, static_dict, idToWorkerOutputFilepath)¶
Worker process function that processes tasks from the queue.
- Parameters:
queue (multiprocessing.JoinableQueue) – Queue for communication between processes
nameToNumRun (dict) – Dictionary mapping map names to number of remaining runs
lock (multiprocessing.Lock) – Lock for thread safety
worker_id (int) – ID of the worker process
num_workers (int) – Number of worker processes
static_dict (dict) – Dictionary with static information
idToWorkerOutputFilepath (callable) – Function to get the output file path
- Raises:
ValueError if the function is unknown
- helperRun(command)¶
Helper function to run a command in a shell.
- Parameters:
command (str) – Command to run
Setup and Configuration¶
- specificRunnerDictSetup(args)¶
Set up the runner arguments dictionary based on command type.
- Parameters:
args (argparse.Namespace) – Command line arguments
- Returns:
Runner arguments dictionary
- Return type:
dict
- Raises:
ValueError if the command is unknown
- eecbs_runner_setup(args)¶
Set up the global variables and paths for EECBS runner.
- Parameters:
args (argparse.Namespace) – Command line arguments
- runDataManipulator(args, ct, mapsToScens, static_dict, outputPathNpzFolder, mapsInputFolder, num_workers)¶
Run the data manipulator to convert outputs to NPZ format.
- Parameters:
args (argparse.Namespace) – Command line arguments
ct (CustomTimer) – Timer object for measuring execution time
mapsToScens (dict) – Dictionary mapping map names to scenario files
static_dict (dict) – Dictionary with static information
outputPathNpzFolder (str) – Folder for output NPZ files
mapsInputFolder (str) – Folder with input map files
num_workers (int) – Number of worker processes
- generic_batch_runner(args)¶
Main function for the generic batch runner.
This function handles the overall execution flow, including:
^ Setting up the filesystem ^ Starting worker processes ^ Creating jobs ^ Processing results ^ Running the data manipulator
- Parameters:
args (argparse.Namespace) – Command line arguments
Usage Examples¶
Basic usage with EECBS:
python -m data_collection.eecbs_batchrunner
--mapFolder=data_collection/data/benchmark_data/maps \
--scenFolder=data_collection/data/benchmark_data/scens \
--constantMapAndBDFolder=data_collection/data/benchmark_data/constant_npzs2 \
--outputFolder=data_collection/data/logs/EXP_Test_batch/iter0/eecbs_outputs \
--num_parallel_runs=50 \
"eecbs" \
--outputPathNpzFolder=data_collection/data/logs/EXP_Test_batch/iter0/eecbs_npzs \
--firstIter=false --cutoffTime=5
Basic usage with Python model:
python -m data_collection.eecbs_batchrunner
--mapFolder=data_collection/data/benchmark_data/maps \
--scenFolder=data_collection/data/benchmark_data/scens \
--constantMapAndBDFolder=data_collection/data/benchmark_data/constant_npzs2 \
--outputFolder=data_collection/data/logs/EXP_Test_batch/iter0/pymodel_outputs \
--num_parallel_runs=50 \
"pymodel" \
--modelPath=data_collection/data/logs/EXP_Test2/iter0/models/max_test_acc.pt \
--k=4 --m=5 --maxSteps=100 --shieldType=CS-PIBT
data_manipulator.py¶
This file processes raw data from EECBS solver runs and converts them into NPZ format suitable for machine learning applications. It handles maps, backward Dijkstra (BD) values, and path data.
Key operations:
Parse map files (.map) to NumPy arrays
Process backward Dijkstra (BD) files to NumPy arrays
Convert agent path information to NumPy arrays
Save data in compressed NPZ format for efficient loading
Classes¶
- class PipelineDataset(Dataset)¶
A PyTorch Dataset for loading EECBS instances for training ML models.
- __init__(self, mapFileNpz, bdFileNpz, pathFileNpz, k, size, max_agents, helper_bd_preprocess='middle')¶
- Parameters:
mapFileNpz (str) – Path to NPZ file containing map data
bdFileNpz (str) – Path to NPZ file containing backward Dijkstra data
pathFileNpz (str) – Path to NPZ file containing path data
k (int) – Window size for local observation
size (int) – Maximum size of dataset
max_agents (int) – Maximum number of agents
helper_bd_preprocess (str) – Method to center helper backward Dijkstras (‘middle’, ‘current’, or ‘subtraction’)
- __len__(self)¶
- Returns:
Number of instances in the dataset
- Return type:
int
- __getitem__(self, idx)¶
Retrieves an item from the dataset, providing the local observation window.
- Parameters:
idx (int) – Index of the instance to retrieve
- Returns:
Tuple of (current_locations, one_hot_labels, backward_dijkstra, grid_map, goal_locations)
- Return type:
tuple
- find_instance(self, idx)¶
Finds the specific instance based on the index.
- Parameters:
idx (int) – Index to find
- Returns:
Tuple of (backward_dijkstra, grid_map, paths, timestep, max_timesteps)
- Return type:
tuple
- parse_npz(self, loaded_paths, loaded_maps, loaded_bds)¶
Parses loaded NPZ data and prepares it for dataset access.
- Parameters:
loaded_paths (dict) – Dictionary of path data
loaded_maps (dict) – Dictionary of map data
loaded_bds (dict) – Dictionary of backward Dijkstra data
- parse_npz2(self)¶
Alternative parsing method that filters and validates data.
File Parsing Functions¶
- parse_map(mapfile)¶
Parses a map file into a NumPy array.
- Parameters:
mapfile (str) – Path to map file
- Returns:
2D array where 1 represents obstacles and 0 represents free space
- Return type:
numpy.ndarray
- parse_path(pathfile)¶
Parses a path file containing agent movements over time.
- Parameters:
pathfile (str) – Path to path file
- Returns:
3D array of shape (timesteps, num_agents, 2) containing agent positions
- Return type:
numpy.ndarray
- parse_bd(bdfile)¶
Parses a backward Dijkstra file into a NumPy array.
- Parameters:
bdfile (str) – Path to backward Dijkstra file
- Returns:
3D array of shape (num_agents, height, width) containing distance values
- Return type:
numpy.ndarray
Batch Processing Functions¶
- batch_map(dir, num_parallel)¶
Processes multiple map files in parallel.
- Parameters:
dir (str) – Directory containing map files
num_parallel (int) – Number of parallel processes to use
- Returns:
Dictionary mapping filenames to map arrays
- Return type:
dict
- batch_bd(dir, num_parallel)¶
Processes multiple backward Dijkstra files in parallel.
- Parameters:
dir (str) – Directory containing BD files
num_parallel (int) – Number of parallel processes to use
- Returns:
Dictionary mapping scenario names to BD arrays
- Return type:
dict
- batch_path(dir)¶
Processes multiple path files.
- Parameters:
dir (str) – Directory containing path files
- Returns:
Dictionary mapping key strings to path arrays
- Return type:
dict
Command Line Interface¶
- main()¶
Entry point for the command-line interface. Parses arguments and orchestrates the processing of maps, backward Dijkstra values, and paths.
Usage Examples¶
Example usage from command line:
python -m data_collection.data_manipulator
--pathsIn=data_collection/data/logs/EXP_Collect_BD/iter0/eecbs_outputs/empty_8_8/paths/
--pathOutFile=data_collection/data/logs/EXP_Collect_BD/iter0/eecbs_npzs/empty_8_8_paths.npz
--bdIn=data_collection/data/logs/EXP_Collect_BD/iter0/eecbs_outputs/empty_8_8/bd
--bdOutFile=data_collection/data/benchmark_data/constant_npzs2/empty_8_8_bds.npz
--mapIn=data_collection/data/benchmark_data/maps
--mapOutFile=data_collection/data/benchmark_data/constant_npzs2/empty_8_8_map.npz
--num_parallel=1