Help me understand the following code, it is ACT: Action Chunking with Transformers. it is used to train a robot arm to learn to accomplish tasks.
train.py
from config.config import POLICY_CONFIG, TASK_CONFIG, TRAIN_CONFIG # must import first
import os
import pickle
import argparse
from copy import deepcopy
import matplotlib.pyplot as plt
from training.utils import *
# parse the task name via command line
parser = argparse.ArgumentParser()
parser.add_argument('--task', type=str, default='task1')
args = parser.parse_args()
task = args.task
# configs
task_cfg = TASK_CONFIG
train_cfg = TRAIN_CONFIG
policy_config = POLICY_CONFIG
checkpoint_dir = os.path.join(train_cfg['checkpoint_dir'], task)
# device
device = os.environ['DEVICE']
def forward_pass(data, policy):
image_data, qpos_data, action_data, is_pad = data
image_data, qpos_data, action_data, is_pad = image_data.to(device), qpos_data.to(device), action_data.to(device), is_pad.to(device)
return policy(qpos_data, image_data, action_data, is_pad) # TODO remove None
def plot_history(train_history, validation_history, num_epochs, ckpt_dir, seed):
# save training curves
for key in train_history[0]:
plot_path = os.path.join(ckpt_dir, f'train_val_{key}_seed_{seed}.png')
plt.figure()
train_values = [summary[key].item() for summary in train_history]
val_values = [summary[key].item() for summary in validation_history]
plt.plot(np.linspace(0, num_epochs-1, len(train_history)), train_values, label='train')
plt.plot(np.linspace(0, num_epochs-1, len(validation_history)), val_values, label='validation')
# plt.ylim([-0.1, 1])
plt.tight_layout()
plt.legend()
plt.title(key)
plt.savefig(plot_path)
print(f'Saved plots to {ckpt_dir}')
def train_bc(train_dataloader, val_dataloader, policy_config):
# load policy
policy = make_policy(policy_config['policy_class'], policy_config)
policy.to(device)
# load optimizer
optimizer = make_optimizer(policy_config['policy_class'], policy)
# create checkpoint dir if not exists
os.makedirs(checkpoint_dir, exist_ok=True)
train_history = []
validation_history = []
min_val_loss = np.inf
best_ckpt_info = None
for epoch in range(train_cfg['num_epochs']):
print(f'\nEpoch {epoch}')
# validation
with torch.inference_mode():
policy.eval()
epoch_dicts = []
for batch_idx, data in enumerate(val_dataloader):
forward_dict = forward_pass(data, policy)
epoch_dicts.append(forward_dict)
epoch_summary = compute_dict_mean(epoch_dicts)
validation_history.append(epoch_summary)
epoch_val_loss = epoch_summary['loss']
if epoch_val_loss < min_val_loss:
min_val_loss = epoch_val_loss
best_ckpt_info = (epoch, min_val_loss, deepcopy(policy.state_dict()))
print(f'Val loss: {epoch_val_loss:.5f}')
summary_string = ''
for k, v in epoch_summary.items():
summary_string += f'{k}: {v.item():.3f} '
print(summary_string)
# training
policy.train()
optimizer.zero_grad()
for batch_idx, data in enumerate(train_dataloader):
forward_dict = forward_pass(data, policy)
# backward
loss = forward_dict['loss']
loss.backward()
optimizer.step()
optimizer.zero_grad()
train_history.append(detach_dict(forward_dict))
epoch_summary = compute_dict_mean(train_history[(batch_idx+1)*epoch:(batch_idx+1)*(epoch+1)])
epoch_train_loss = epoch_summary['loss']
print(f'Train loss: {epoch_train_loss:.5f}')
summary_string = ''
for k, v in epoch_summary.items():
summary_string += f'{k}: {v.item():.3f} '
print(summary_string)
if epoch % 200 == 0:
ckpt_path = os.path.join(checkpoint_dir, f"policy_epoch_{epoch}_seed_{train_cfg['seed']}.ckpt")
torch.save(policy.state_dict(), ckpt_path)
plot_history(train_history, validation_history, epoch, checkpoint_dir, train_cfg['seed'])
ckpt_path = os.path.join(checkpoint_dir, f'policy_last.ckpt')
torch.save(policy.state_dict(), ckpt_path)
if __name__ == '__main__':
# set seed
set_seed(train_cfg['seed'])
# create ckpt dir if not exists
os.makedirs(checkpoint_dir, exist_ok=True)
# number of training episodes
data_dir = os.path.join(task_cfg['dataset_dir'], task)
num_episodes = len(os.listdir(data_dir))
# load data
train_dataloader, val_dataloader, stats, _ = load_data(data_dir, num_episodes, task_cfg['camera_names'],
train_cfg['batch_size_train'], train_cfg['batch_size_val'])
# save stats
stats_path = os.path.join(checkpoint_dir, f'dataset_stats.pkl')
with open(stats_path, 'wb') as f:
pickle.dump(stats, f)
# train
train_bc(train_dataloader, val_dataloader, policy_config)
training/utils.py
import os
import h5py
import torch
import numpy as np
from einops import rearrange
from torch.utils.data import DataLoader
from training.policy import ACTPolicy, CNNMLPPolicy
import IPython
e = IPython.embed
class EpisodicDataset(torch.utils.data.Dataset):
def __init__(self, episode_ids, dataset_dir, camera_names, norm_stats):
super(EpisodicDataset).__init__()
self.episode_ids = episode_ids
self.dataset_dir = dataset_dir
self.camera_names = camera_names
self.norm_stats = norm_stats
self.is_sim = None
#self.__getitem__(0) # initialize self.is_sim
def __len__(self):
return len(self.episode_ids)
def __getitem__(self, index):
sample_full_episode = False # hardcode
episode_id = self.episode_ids[index]
dataset_path = os.path.join(self.dataset_dir, f'episode_{episode_id}.hdf5')
with h5py.File(dataset_path, 'r') as root:
is_sim = root.attrs['sim']
original_action_shape = root['/action'].shape
episode_len = original_action_shape[0]
if sample_full_episode:
start_ts = 0
else:
start_ts = np.random.choice(episode_len)
# get observation at start_ts only
qpos = root['/observations/qpos'][start_ts]
qvel = root['/observations/qvel'][start_ts]
image_dict = dict()
for cam_name in self.camera_names:
image_dict[cam_name] = root[f'/observations/images/{cam_name}'][start_ts]
# get all actions after and including start_ts
if is_sim:
action = root['/action'][start_ts:]
action_len = episode_len - start_ts
else:
action = root['/action'][max(0, start_ts - 1):] # hack, to make timesteps more aligned
action_len = episode_len - max(0, start_ts - 1) # hack, to make timesteps more aligned
self.is_sim = is_sim
padded_action = np.zeros(original_action_shape, dtype=np.float32)
padded_action[:action_len] = action
is_pad = np.zeros(episode_len)
is_pad[action_len:] = 1
# new axis for different cameras
all_cam_images = []
for cam_name in self.camera_names:
all_cam_images.append(image_dict[cam_name])
all_cam_images = np.stack(all_cam_images, axis=0)
# construct observations
image_data = torch.from_numpy(all_cam_images)
qpos_data = torch.from_numpy(qpos).float()
action_data = torch.from_numpy(padded_action).float()
is_pad = torch.from_numpy(is_pad).bool()
# channel last
image_data = torch.einsum('k h w c -> k c h w', image_data)
# normalize image and change dtype to float
image_data = image_data / 255.0
action_data = (action_data - self.norm_stats["action_mean"]) / self.norm_stats["action_std"]
qpos_data = (qpos_data - self.norm_stats["qpos_mean"]) / self.norm_stats["qpos_std"]
return image_data, qpos_data, action_data, is_pad
def get_norm_stats(dataset_dir, num_episodes):
all_qpos_data = []
all_action_data = []
for episode_idx in range(num_episodes):
dataset_path = os.path.join(dataset_dir, f'episode_{episode_idx}.hdf5')
with h5py.File(dataset_path, 'r') as root:
qpos = root['/observations/qpos'][()]
qvel = root['/observations/qvel'][()]
action = root['/action'][()]
all_qpos_data.append(torch.from_numpy(qpos))
all_action_data.append(torch.from_numpy(action))
all_qpos_data = torch.stack(all_qpos_data)
all_action_data = torch.stack(all_action_data)
all_action_data = all_action_data
# normalize action data
action_mean = all_action_data.mean(dim=[0, 1], keepdim=True)
action_std = all_action_data.std(dim=[0, 1], keepdim=True)
action_std = torch.clip(action_std, 1e-2, np.inf) # clipping
# normalize qpos data
qpos_mean = all_qpos_data.mean(dim=[0, 1], keepdim=True)
qpos_std = all_qpos_data.std(dim=[0, 1], keepdim=True)
qpos_std = torch.clip(qpos_std, 1e-2, np.inf) # clipping
stats = {"action_mean": action_mean.numpy().squeeze(), "action_std": action_std.numpy().squeeze(),
"qpos_mean": qpos_mean.numpy().squeeze(), "qpos_std": qpos_std.numpy().squeeze(),
"example_qpos": qpos}
return stats
def load_data(dataset_dir, num_episodes, camera_names, batch_size_train, batch_size_val):
print(f'\nData from: {dataset_dir}\n')
# obtain train test split
train_ratio = 0.8
shuffled_indices = np.random.permutation(num_episodes)
train_indices = shuffled_indices[:int(train_ratio * num_episodes)]
val_indices = shuffled_indices[int(train_ratio * num_episodes):]
# obtain normalization stats for qpos and action
norm_stats = get_norm_stats(dataset_dir, num_episodes)
# construct dataset and dataloader
train_dataset = EpisodicDataset(train_indices, dataset_dir, camera_names, norm_stats)
val_dataset = EpisodicDataset(val_indices, dataset_dir, camera_names, norm_stats)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size_train, shuffle=True, pin_memory=True, num_workers=1, prefetch_factor=1)
val_dataloader = DataLoader(val_dataset, batch_size=batch_size_val, shuffle=True, pin_memory=True, num_workers=1, prefetch_factor=1)
return train_dataloader, val_dataloader, norm_stats, train_dataset.is_sim
def make_policy(policy_class, policy_config):
if policy_class == "ACT":
policy = ACTPolicy(policy_config)
elif policy_class == "CNNMLP":
policy = CNNMLPPolicy(policy_config)
else:
raise ValueError(f"Unknown policy class: {policy_class}")
return policy
def make_optimizer(policy_class, policy):
if policy_class == 'ACT':
optimizer = policy.configure_optimizers()
elif policy_class == 'CNNMLP':
optimizer = policy.configure_optimizers()
else:
raise ValueError(f"Unknown policy class: {policy_class}")
return optimizer
### env utils
def sample_box_pose():
x_range = [0.0, 0.2]
y_range = [0.4, 0.6]
z_range = [0.05, 0.05]
ranges = np.vstack([x_range, y_range, z_range])
cube_position = np.random.uniform(ranges[:, 0], ranges[:, 1])
cube_quat = np.array([1, 0, 0, 0])
return np.concatenate([cube_position, cube_quat])
def sample_insertion_pose():
# Peg
x_range = [0.1, 0.2]
y_range = [0.4, 0.6]
z_range = [0.05, 0.05]
ranges = np.vstack([x_range, y_range, z_range])
peg_position = np.random.uniform(ranges[:, 0], ranges[:, 1])
peg_quat = np.array([1, 0, 0, 0])
peg_pose = np.concatenate([peg_position, peg_quat])
# Socket
x_range = [-0.2, -0.1]
y_range = [0.4, 0.6]
z_range = [0.05, 0.05]
ranges = np.vstack([x_range, y_range, z_range])
socket_position = np.random.uniform(ranges[:, 0], ranges[:, 1])
socket_quat = np.array([1, 0, 0, 0])
socket_pose = np.concatenate([socket_position, socket_quat])
return peg_pose, socket_pose
### helper functions
def get_image(images, camera_names, device='cpu'):
curr_images = []
for cam_name in camera_names:
curr_image = rearrange(images[cam_name], 'h w c -> c h w')
curr_images.append(curr_image)
curr_image = np.stack(curr_images, axis=0)
curr_image = torch.from_numpy(curr_image / 255.0).float().to(device).unsqueeze(0)
return curr_image
def compute_dict_mean(epoch_dicts):
result = {k: None for k in epoch_dicts[0]}
num_items = len(epoch_dicts)
for k in result:
value_sum = 0
for epoch_dict in epoch_dicts:
value_sum += epoch_dict[k]
result[k] = value_sum / num_items
return result
def detach_dict(d):
new_d = dict()
for k, v in d.items():
new_d[k] = v.detach()
return new_d
def set_seed(seed):
torch.manual_seed(seed)
np.random.seed(seed)
def pos2pwm(pos:np.ndarray) -> np.ndarray:
"""
:param pos: numpy array of joint positions in range [-pi, pi]
:return: numpy array of pwm values in range [0, 4096]
"""
return (pos / 3.14 + 1.) * 2048
def pwm2pos(pwm:np.ndarray) -> np.ndarray:
"""
:param pwm: numpy array of pwm values in range [0, 4096]
:return: numpy array of joint positions in range [-pi, pi]
"""
return (pwm / 2048 - 1) * 3.14
def pwm2vel(pwm:np.ndarray) -> np.ndarray:
"""
:param pwm: numpy array of pwm/s joint velocities
:return: numpy array of rad/s joint velocities
"""
return pwm * 3.14 / 2048
def vel2pwm(vel:np.ndarray) -> np.ndarray:
"""
:param vel: numpy array of rad/s joint velocities
:return: numpy array of pwm/s joint velocities
"""
return vel * 2048 / 3.14
def pwm2norm(x:np.ndarray) -> np.ndarray:
"""
:param x: numpy array of pwm values in range [0, 4096]
:return: numpy array of values in range [0, 1]
"""
return x / 4096
def norm2pwm(x:np.ndarray) -> np.ndarray:
"""
:param x: numpy array of values in range [0, 1]
:return: numpy array of pwm values in range [0, 4096]
"""
return x * 4096
training/policy.py
import torch.nn as nn
from torch.nn import functional as F
import torchvision.transforms as transforms
from detr.main import build_ACT_model_and_optimizer, build_CNNMLP_model_and_optimizer
import IPython
e = IPython.embed
class ACTPolicy(nn.Module):
def __init__(self, args_override):
super().__init__()
model, optimizer = build_ACT_model_and_optimizer(args_override)
self.model = model # CVAE decoder
self.optimizer = optimizer
self.kl_weight = args_override['kl_weight']
print(f'KL Weight {self.kl_weight}')
def __call__(self, qpos, image, actions=None, is_pad=None):
env_state = None
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
image = normalize(image)
if actions is not None: # training time
actions = actions[:, :self.model.num_queries]
is_pad = is_pad[:, :self.model.num_queries]
a_hat, is_pad_hat, (mu, logvar) = self.model(qpos, image, env_state, actions, is_pad)
total_kld, dim_wise_kld, mean_kld = kl_divergence(mu, logvar)
loss_dict = dict()
all_l1 = F.l1_loss(actions, a_hat, reduction='none')
l1 = (all_l1 * ~is_pad.unsqueeze(-1)).mean()
loss_dict['l1'] = l1
loss_dict['kl'] = total_kld[0]
loss_dict['loss'] = loss_dict['l1'] + loss_dict['kl'] * self.kl_weight
return loss_dict
else: # inference time
a_hat, _, (_, _) = self.model(qpos, image, env_state) # no action, sample from prior
return a_hat
def configure_optimizers(self):
return self.optimizer
class CNNMLPPolicy(nn.Module):
def __init__(self, args_override):
super().__init__()
model, optimizer = build_CNNMLP_model_and_optimizer(args_override)
self.model = model # decoder
self.optimizer = optimizer
def __call__(self, qpos, image, actions=None, is_pad=None):
env_state = None # TODO
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225])
image = normalize(image)
if actions is not None: # training time
actions = actions[:, 0]
a_hat = self.model(qpos, image, env_state, actions)
mse = F.mse_loss(actions, a_hat)
loss_dict = dict()
loss_dict['mse'] = mse
loss_dict['loss'] = loss_dict['mse']
return loss_dict
else: # inference time
a_hat = self.model(qpos, image, env_state) # no action, sample from prior
return a_hat
def configure_optimizers(self):
return self.optimizer
def kl_divergence(mu, logvar):
batch_size = mu.size(0)
assert batch_size != 0
if mu.data.ndimension() == 4:
mu = mu.view(mu.size(0), mu.size(1))
if logvar.data.ndimension() == 4:
logvar = logvar.view(logvar.size(0), logvar.size(1))
klds = -0.5 * (1 + logvar - mu.pow(2) - logvar.exp())
total_kld = klds.sum(1).mean(0, True)
dimension_wise_kld = klds.mean(0)
mean_kld = klds.mean(1).mean(0, True)
return total_kld, dimension_wise_kld, mean_kld
The provided code is a Python training script for training robot arm tasks using the Action Chunking with Transformers (ACT) technique. Below I'll explain the key components and flow of the script:
Configuration and Argument Parsing:
config/config.py
.argparse
to receive a task name from the command line.Device Setup:
Training and Validation Functions:
forward_pass
: This function processes a batch of data through the policy model.plot_history
: Plots and saves the training and validation losses over epochs.train_bc
: The main training loop for the behavior cloning approach, iterating over epochs, managing training and validation steps, and handling model saving.Data Loading:
EpisodicDataset
, which handles data loading and preprocessing.Model Definitions:
ACTPolicy
or CNNMLPPolicy
is used.training/policy.py
and involve neural network models that predict robot actions based on input states and images.Set Up:
Training:
Utilities and Helpers:
training/utils.py
contains helper functions and classes for data management (loading and preprocessing), policy instantiation, and optimizer setup.training/policy.py
defines the policy models (ACTPolicy
and CNNMLPPolicy
), including their forward passes, loss computations, and optimizer configurations.To run the training, you would typically execute the script from the command line, specifying the task and optionally other configurations. For example:
python train.py --task task1
This triggers the training process for the specified task using the configurations and data specified in the script and associated modules.
how can I modify the code to use ACT to accomplish tasks in a video game:
Training data is a bunch of screenshots of a game and corresponding keystrokes. e.g. images in screenshots
folder screenshots/a/***.png
corresponding to screenshots where player press a
To modify the existing code to train an ACT model for a video game using screenshots as inputs and keystrokes as outputs, you'll need to adjust several parts of the codebase. This involves modifying the data loading process to handle your specific data format, updating the models to deal with keystroke predictions, and ensuring the data is correctly preprocessed and normalized.
Here's a step-by-step guide to adapt the code:
Modify the EpisodicDataset
class to handle game screenshots and corresponding keystrokes. You'll need to change the way data is loaded and processed in the __getitem__
method.
import glob
import torch
from torchvision.io import read_image
from torch.utils.data import Dataset
class GameDataset(Dataset):
def __init__(self, data_dir, transform=None):
self.data_dir = data_dir
self.transform = transform
self.data_files = glob.glob(f"{data_dir}/**/*.png", recursive=True)
self.labels = [file.split('/')[-2] for file in self.data_files] # Assuming folder names are labels
def __len__(self):
return len(self.data_files)
def __getitem__(self, idx):
image_path = self.data_files[idx]
image = read_image(image_path) # Load image
image = image.float() / 255.0 # Normalize to [0, 1]
# Apply transformations if any
if self.transform:
image = self.transform(image)
label = self.labels[idx]
return image, label
Modify the existing policy models to handle game screenshots and predict keystrokes. This might involve changing the architecture to better suit the input-output structure for your task.
import torch.nn as nn
class GamePolicy(nn.Module):
def __init__(self):
super(GamePolicy, self).__init__()
self.cnn = nn.Sequential(
nn.Conv2d(3, 32, kernel_size=3, stride=2, padding=1),
nn.ReLU(),
nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
nn.ReLU(),
nn.Flatten(),
nn.Linear(64 * (image_size // 4) * (image_size // 4), 256),
nn.ReLU(),
nn.Linear(256, num_classes) # num_classes should be the number of possible keystrokes
)
def forward(self, x):
return self.cnn(x)
Update the train_bc
function to handle the new dataset and policy model. This includes adjusting how the loss is calculated, as you might be dealing with a classification problem now (predicting keystrokes).
import torch.nn.functional as F
def train_epoch(model, dataloader, optimizer, device):
model.train()
total_loss = 0
for images, labels in dataloader:
images, labels = images.to(device), labels.to(device)
optimizer.zero_grad()
outputs = model(images)
loss = F.cross_entropy(outputs, labels)
loss.backward()
optimizer.step()
total_loss += loss.item()
return total_loss / len(dataloader)
Ensure that you are loading your data correctly, setting up the train and validation splits, and using the correct policy and optimizer.
from torch.utils.data import DataLoader, random_split
# Load the dataset
transform = transforms.Compose([
transforms.Resize((256, 256)),
transforms.ToTensor()
])
dataset = GameDataset(data_dir="screenshots", transform=transform)
train_size = int(0.8 * len(dataset))
val_size = len(dataset) - train_size
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=False)
# Initialize the model
model = GamePolicy().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# Train the model
for epoch in range(num_epochs):
train_loss = train_epoch(model, train_loader, optimizer, device)
print(f"Epoch {epoch}, Train Loss: {train_loss}")
You might need a mapping from keystrokes to label indices if your dataset labels are keystrokes. This can be managed using a dictionary mapping each keystroke to a unique integer.
This setup should give you a solid foundation to modify the given codebase to work with game screenshots and keystrokes. Make sure to adjust paths, hyperparameters, and model details according to your specific requirements and data.