Reward, Cost, Termination, and Step Information
Following the standard OpenAI Gym API, after each step of the environment env.step(...), the environment will return a tuple containing five items: (obs, reward, terminated, truncated, info). In this page, we discuss the design of reward function reward, cost function info["cost"], termination criterion terminated in various settings, truncation information truncated, and the details of step information info.
Reward Function
For all environments, reward functions consist of generally a dense driving reward and a sparse terminal reward. The dense reward is the longitudinal movement along the reference line or lane toward destination. When the episode is terminated due to, i.e. arriving the destination or driving out of the road, a sparse reward will be added to the dense reward. In practice, the concrete implementations of reward function are slightly different across all environments.
The reward functions are implemented as follows.
from metaurban.envs import SidewalkStaticMetaUrbanEnv
from metaurban.utils import print_source
print_source(SidewalkStaticMetaUrbanEnv.reward_function)
def reward_function(self, vehicle_id: str):
"""
Override this func to get a new reward function
:param vehicle_id: id of BaseVehicle
:return: reward
"""
vehicle = self.agents[vehicle_id]
step_info = dict()
# Reward for moving forward in current lane
current_lane = vehicle.lane
long_last = vehicle.navigation.last_longitude
long_now = vehicle.navigation.current_longitude
lateral_now = vehicle.navigation.current_lateral
# dense driving reward
reward = 0
reward += self.config["driving_reward"] * (long_now - long_last)
# reward for lane keeping, without it vehicle can learn to overtake but fail to keep in lane
lateral_factor = abs(lateral_now) / self.config["max_lateral_dist"]
lateral_penalty = -lateral_factor * self.config["lateral_penalty"]
reward += lateral_penalty
# heading diff
ref_line_heading = vehicle.navigation.current_heading_theta_at_long
heading_diff = abs(wrap_to_pi(vehicle.heading_theta - ref_line_heading)) / np.pi
heading_penalty = -heading_diff * self.config["heading_penalty"]
reward += heading_penalty
# steering_range
steering = abs(vehicle.current_action[0])
allowed_steering = (1 / max(vehicle.speed, 1e-2))
overflowed_steering = min((allowed_steering - steering), 0)
steering_range_penalty = overflowed_steering * self.config["steering_range_penalty"]
reward += steering_range_penalty
# steering smoothness
steering_reward = 0
if vehicle_id not in self.previous_agent_actions or "steering_penalty" not in self.config or self.config[
"steering_penalty"] == 0:
steering_reward = 0
else:
steering = vehicle.current_action[0]
prev_steering = self.previous_agent_actions[vehicle_id][0]
steering_diff = abs(steering - prev_steering)
steering_reward = -steering_diff * self.config["steering_penalty"] # 0.25 is to make the reward more spiky
steering_reward = steering_reward * vehicle.speed / vehicle.max_speed_km_h # when the vehicle is faster, the penalty is more significant
reward += steering_reward
if 'speed_reward' in self.config:
positive_road = 1 if not self._is_out_of_road(vehicle) else -1
reward += self.config["speed_reward"] * (vehicle.speed_km_h / vehicle.max_speed_km_h) * positive_road
if self.config["no_negative_reward"]:
reward = max(reward, 0)
# crash penalty
if vehicle.crash_vehicle:
reward = -self.config["crash_vehicle_penalty"]
if vehicle.crash_object:
reward = -self.config["crash_object_penalty"]
if vehicle.crash_human:
reward = -self.config["crash_human_penalty"]
if vehicle.crash_building:
reward = -self.config["crash_building_penalty"]
step_info["step_reward"] = reward
# termination reward
if self._is_arrive_destination(vehicle) and not self._is_out_of_road(vehicle):
reward = self.config["success_reward"]
elif self._is_out_of_road(vehicle):
reward = -self.config["out_of_road_penalty"]
# TODO LQY: all a callback to process these keys
step_info["track_length"] = vehicle.navigation.reference_trajectory.length
step_info["carsize"] = [vehicle.WIDTH, vehicle.LENGTH]
# add some new and informative keys
step_info["route_completion"] = vehicle.navigation.route_completion
step_info["curriculum_level"] = self.engine.current_level
step_info["scenario_index"] = self.engine.current_seed
step_info["lateral_dist"] = lateral_now
step_info["step_reward_lateral"] = lateral_penalty
step_info["step_reward_heading"] = heading_penalty
step_info["step_reward_action_smooth"] = steering_range_penalty
step_info["steering_reward"] = steering_reward
self.record_previous_agent_state(vehicle_id)
return float(reward), step_info
This reward function is composed of three parts as follows:
\(R = R_{term} + c_1 R_{disp} + c_2 R_{lateral} + c_3 R_{steering} + c_4 R_{crash}\)
Terminal reward \(R_{term}\): a sparse reward set to \(+5\) if the vehicle reaches the destination, and \(-5\) for out of route. If given \(R_{term}\neq 0\) at any time step \(t\), the episode will be terminated at \(t\) immediately.
Displacement reward \(R_{disp}\): a dense reward defined as \(R_{disp}=d_t-d_{t-1}\), wherein the \(d_t\) and \(d_{1}\) denote the longitudinal position of the ego agent in Frenet coordinates of current lane at time \(t\) and \(t-1\), respectively. We set the weight of \(R_{disp}\) as \(c_1=0.5\).
Lateral reward \(R_{lateral}\): a dense reward defined as \(R_{lateral}=-||l_t||\), wherein the \(l_t\) denotes the lateral offset of the ego agent in Frenet coordinates of current lane at time \(t\), which is designed to prevent agent driving on non walkable areas. We set the weight of \(R_{lateral}\) as \(c_2=1.0\).
Steering smoothness reward \(R_{steering}\): a dense reward defined as \(R_{steering}=-||s_t-s_{t-1}||\cdot v_t\), wherein the \(s_t\) and \(s_{t-1}\) denotes the steering of the agent at \(t\) and \(t-1\), respectively. And \(v_t\) denotes the speed of the agent at time \(t\). This reward term is designed as a regularization to prevent the agent changing the steering too frequently. We set the weight of \(R_{steering}\) as \(c_3=0.1\).
Crash reward \(R_{crash}\): a dense negative reward defined as \(-1(c_{t})\), wherein the \(c_{t}\) denotes the collision between agents and any other objects at time \(t\) and \(1(\cdot)\) is the indicator function. It’s notable we do not use the termination strategy for collision as in MetaDrive~\citep{li2022metadrive}. We set the weight of \(R_{crash}\) as \(c_4=1.0\).
Cost Function
Similar to the reward function, we also provide default cost function to measure the safety during driving. The cost function will be placed in the returned information dict as info["cost"] after env.step function.
crash_vehicle_cost = 1.0: yield cost when crashing to other vehicles.crash_human_cost = 1.0: yield cost when crashing to other vehicles.crash_object_cost = 1.0: yield cost when crashing to objects, such as cones and triangles.
The implementation of cost function is simple:
from metaurban.utils import print_source
from metaurban.envs import SidewalkStaticMetaUrbanEnv
print_source(SidewalkStaticMetaUrbanEnv.cost_function)
def cost_function(self, vehicle_id: str):
vehicle = self.agents[vehicle_id]
step_info = dict()
step_info["cost"] = 0
if self._is_out_of_road(vehicle):
step_info["cost"] = self.config["out_of_road_cost"]
elif vehicle.crash_vehicle:
step_info["cost"] = self.config["crash_vehicle_cost"]
elif vehicle.crash_object:
step_info["cost"] = self.config["crash_object_cost"]
return step_info['cost'], step_info
You can modify this function to add more information to the step_info dict. For example, you can log what kind of object raises this cost. Thus you can calculate how many cars the ego vehicle collides with in one episode by summing up the number of vehicle crashes in each step.
Termination and Truncation
MetaUrban will terminate an episode of a vehicle if:
the target vehicle arrive its destination,
the vehicle drives out of the road,
the vehicle crashes to other agents (vehicles),
the vehicle crashes to obstacles,
the vehicle crashes to human,
reach max step (horizon) limits.
The above termination function is implemented as:
print_source(SidewalkStaticMetaUrbanEnv.done_function)
def done_function(self, vehicle_id: str):
vehicle = self.agents[vehicle_id]
done = False
max_step = self.config["horizon"] is not None and self.episode_lengths[vehicle_id] >= self.config["horizon"]
done_info = {
TerminationState.CRASH_VEHICLE: vehicle.crash_vehicle,
TerminationState.CRASH_OBJECT: vehicle.crash_object,
TerminationState.CRASH_BUILDING: vehicle.crash_building,
TerminationState.CRASH_HUMAN: vehicle.crash_human,
TerminationState.CRASH_SIDEWALK: vehicle.crash_sidewalk,
TerminationState.OUT_OF_ROAD: self._is_out_of_road(vehicle),
TerminationState.SUCCESS: self._is_arrive_destination(vehicle) and not self._is_out_of_road(vehicle),
TerminationState.MAX_STEP: max_step,
TerminationState.ENV_SEED: self.current_seed,
# TerminationState.CURRENT_BLOCK: self.agent.navigation.current_road.block_ID(),
# crash_vehicle=False, crash_object=False, crash_building=False, out_of_road=False, arrive_dest=False,
}
# for compatibility
# crash almost equals to crashing with vehicles
done_info[TerminationState.CRASH] = (
done_info[TerminationState.CRASH_VEHICLE] or done_info[TerminationState.CRASH_OBJECT]
or done_info[TerminationState.CRASH_BUILDING] or done_info[TerminationState.CRASH_SIDEWALK]
or done_info[TerminationState.CRASH_HUMAN]
)
# determine env return
if done_info[TerminationState.SUCCESS]:
done = True
self.logger.info(
"Episode ended! Scenario Index: {} Reason: arrive_dest.".format(self.current_seed),
extra={"log_once": True}
)
if done_info[TerminationState.OUT_OF_ROAD]:
done = True
self.logger.info(
"Episode ended! Scenario Index: {} Reason: out_of_road.".format(self.current_seed),
extra={"log_once": True}
)
if done_info[TerminationState.CRASH_VEHICLE] and self.config["crash_vehicle_done"]:
done = True
self.logger.info(
"Episode ended! Scenario Index: {} Reason: crash vehicle ".format(self.current_seed),
extra={"log_once": True}
)
if done_info[TerminationState.CRASH_OBJECT] and self.config["crash_object_done"]:
done = True
self.logger.info(
"Episode ended! Scenario Index: {} Reason: crash object ".format(self.current_seed),
extra={"log_once": True}
)
if done_info[TerminationState.CRASH_BUILDING] and self.config["crash_building_done"]:
done = True
self.logger.info(
"Episode ended! Scenario Index: {} Reason: crash building ".format(self.current_seed),
extra={"log_once": True}
)
if done_info[TerminationState.CRASH_HUMAN] and self.config["crash_human_done"]:
done = True
self.logger.info(
"Episode ended! Scenario Index: {} Reason: crash human".format(self.current_seed),
extra={"log_once": True}
)
if done_info[TerminationState.MAX_STEP]:
# single agent horizon has the same meaning as max_step_per_agent
if self.config["truncate_as_terminate"]:
done = True
self.logger.info(
"Episode ended! Scenario Index: {} Reason: max step ".format(self.current_seed),
extra={"log_once": True}
)
return done, done_info
Step Information
The step information dict info contains rich information about current state of the environment and the target vehicle.
The step info is collected from various sources such as the engine, reward function, termination function, traffic manager, agent manager and so on.
We summarize the dict as follows:
{
# Number of vehicles being overtaken by ego vehicle in this episode
'overtake_vehicle_num': 0,
# Current velocity in km/h
'velocity': 0.0,
# The current normalized steering signal in [-1, 1]
'steering': -0.06901532411575317,
# The current normalized acceleration signal in [-1, 1]
'acceleration': -0.2931942343711853,
# The normalized action after clipped who is applied to the ego vehicle
'raw_action': (-0.06901532411575317, -0.2931942343711853),
# Whether crash to vehicle / object / building
'crash_vehicle': False,
'crash_object': False,
'crash_building': False,
'crash': False, # Whether any kind of crash happens
# Whether going out of the road / arrive destination
# or exceeding the maximal episode length
'out_of_road': False,
'arrive_dest': False,
'max_step': False,
# The reward in this time step / the whole episode so far
'step_reward': 0.0,
'episode_reward': 0.0,
# The cost in this time step
'cost': 0,
# The length of current episode
'episode_length': 1
}
The content of this dict keeps updating, and thus the content above may be out of date. We encourage users to write customized data to this dict, so more status can be exposed to monitor the simulation even without visualization.
Customization
To compose your own reward, cost and termination function. Just make a new environment and override the reward_function, cost_function, and termination_function of the base environment class. You can also record more information in step_info returned by these functions and deliver it outside the simulator.
from metaurban.envs import SidewalkStaticMetaUrbanEnv
class MyEnv(SidewalkStaticMetaUrbanEnv):
def reward_function(*args, **kwargs):
return -10, {"is_customized": True}
env=MyEnv({'object_density': 0.1})
env.reset()
_,r,_,_,info = env.step([0,0])
assert r==-10 and info["is_customized"]
print("reward: {}, `is_customized` in info: {}".format(r, info["is_customized"]))
env.close()
[INFO] Environment: MyEnv
[INFO] MetaUrban version: 0.0.1
[INFO] Sensors: [lidar: Lidar(), side_detector: SideDetector(), lane_line_detector: LaneLineDetector()]
[INFO] Render Mode: none
[INFO] Horizon (Max steps per agent): None
[INFO] Assets version: 0.0.1
[INFO] Known Pipes: glxGraphicsPipe
[INFO] Start Scenario Index: 0, Num Scenarios : 1
[WARNING] Not set var:walk_on_all_regions, so that agents can walk on all regions (orca_navigation.py:561)
[INFO] Agents can walk on all regions
[INFO] Agents can walk on all regions
reward: -10, `is_customized` in info: True