Reward, Cost, Termination, and Step Information

Click and Open In Colab

Following the standard OpenAI Gym API, after each step of the environment env.step(...), the environment will return a tuple containing five items: (obs, reward, terminated, truncated, info). In this page, we discuss the design of reward function reward, cost function info["cost"], termination criterion terminated in various settings, truncation information truncated, and the details of step information info.

Reward Function

For all environments, reward functions consist of generally a dense driving reward and a sparse terminal reward. The dense reward is the longitudinal movement along the reference line or lane toward destination. When the episode is terminated due to, i.e. arriving the destination or driving out of the road, a sparse reward will be added to the dense reward. In practice, the concrete implementations of reward function are slightly different across all environments.

The reward functions are implemented as follows.

from metaurban.envs import SidewalkStaticMetaUrbanEnv
from metaurban.utils import print_source
print_source(SidewalkStaticMetaUrbanEnv.reward_function)
def reward_function(self, vehicle_id: str):
    """
    Override this func to get a new reward function
    :param vehicle_id: id of BaseVehicle
    :return: reward
    """
    vehicle = self.agents[vehicle_id]
    step_info = dict()

    # Reward for moving forward in current lane
    current_lane = vehicle.lane
    long_last = vehicle.navigation.last_longitude
    long_now = vehicle.navigation.current_longitude
    lateral_now = vehicle.navigation.current_lateral

    # dense driving reward
    reward = 0
    reward += self.config["driving_reward"] * (long_now - long_last)

    # reward for lane keeping, without it vehicle can learn to overtake but fail to keep in lane
    lateral_factor = abs(lateral_now) / self.config["max_lateral_dist"]
    lateral_penalty = -lateral_factor * self.config["lateral_penalty"]
    reward += lateral_penalty

    # heading diff
    ref_line_heading = vehicle.navigation.current_heading_theta_at_long
    heading_diff = abs(wrap_to_pi(vehicle.heading_theta - ref_line_heading)) / np.pi
    heading_penalty = -heading_diff * self.config["heading_penalty"]
    reward += heading_penalty

    # steering_range
    steering = abs(vehicle.current_action[0])
    allowed_steering = (1 / max(vehicle.speed, 1e-2))
    overflowed_steering = min((allowed_steering - steering), 0)
    steering_range_penalty = overflowed_steering * self.config["steering_range_penalty"]
    reward += steering_range_penalty

    # steering smoothness
    steering_reward = 0
    if vehicle_id not in self.previous_agent_actions or "steering_penalty" not in self.config or self.config[
            "steering_penalty"] == 0:
        steering_reward = 0
    else:
        steering = vehicle.current_action[0]
        prev_steering = self.previous_agent_actions[vehicle_id][0]
        steering_diff = abs(steering - prev_steering)
        steering_reward = -steering_diff * self.config["steering_penalty"]  # 0.25 is to make the reward more spiky
        steering_reward = steering_reward * vehicle.speed / vehicle.max_speed_km_h  # when the vehicle is faster, the penalty is more significant
    reward += steering_reward

    if 'speed_reward' in self.config:
        positive_road = 1 if not self._is_out_of_road(vehicle) else -1
        reward += self.config["speed_reward"] * (vehicle.speed_km_h / vehicle.max_speed_km_h) * positive_road

    if self.config["no_negative_reward"]:
        reward = max(reward, 0)

    # crash penalty
    if vehicle.crash_vehicle:
        reward = -self.config["crash_vehicle_penalty"]
    if vehicle.crash_object:
        reward = -self.config["crash_object_penalty"]
    if vehicle.crash_human:
        reward = -self.config["crash_human_penalty"]
    if vehicle.crash_building:
        reward = -self.config["crash_building_penalty"]

    step_info["step_reward"] = reward

    # termination reward
    if self._is_arrive_destination(vehicle) and not self._is_out_of_road(vehicle):
        reward = self.config["success_reward"]
    elif self._is_out_of_road(vehicle):
        reward = -self.config["out_of_road_penalty"]

    # TODO LQY: all a callback to process these keys
    step_info["track_length"] = vehicle.navigation.reference_trajectory.length
    step_info["carsize"] = [vehicle.WIDTH, vehicle.LENGTH]
    # add some new and informative keys
    step_info["route_completion"] = vehicle.navigation.route_completion
    step_info["curriculum_level"] = self.engine.current_level
    step_info["scenario_index"] = self.engine.current_seed
    step_info["lateral_dist"] = lateral_now

    step_info["step_reward_lateral"] = lateral_penalty
    step_info["step_reward_heading"] = heading_penalty
    step_info["step_reward_action_smooth"] = steering_range_penalty
    step_info["steering_reward"] = steering_reward

    self.record_previous_agent_state(vehicle_id)

    return float(reward), step_info

This reward function is composed of three parts as follows:

\(R = R_{term} + c_1 R_{disp} + c_2 R_{lateral} + c_3 R_{steering} + c_4 R_{crash}\)

  • Terminal reward \(R_{term}\): a sparse reward set to \(+5\) if the vehicle reaches the destination, and \(-5\) for out of route. If given \(R_{term}\neq 0\) at any time step \(t\), the episode will be terminated at \(t\) immediately.

  • Displacement reward \(R_{disp}\): a dense reward defined as \(R_{disp}=d_t-d_{t-1}\), wherein the \(d_t\) and \(d_{1}\) denote the longitudinal position of the ego agent in Frenet coordinates of current lane at time \(t\) and \(t-1\), respectively. We set the weight of \(R_{disp}\) as \(c_1=0.5\).

  • Lateral reward \(R_{lateral}\): a dense reward defined as \(R_{lateral}=-||l_t||\), wherein the \(l_t\) denotes the lateral offset of the ego agent in Frenet coordinates of current lane at time \(t\), which is designed to prevent agent driving on non walkable areas. We set the weight of \(R_{lateral}\) as \(c_2=1.0\).

  • Steering smoothness reward \(R_{steering}\): a dense reward defined as \(R_{steering}=-||s_t-s_{t-1}||\cdot v_t\), wherein the \(s_t\) and \(s_{t-1}\) denotes the steering of the agent at \(t\) and \(t-1\), respectively. And \(v_t\) denotes the speed of the agent at time \(t\). This reward term is designed as a regularization to prevent the agent changing the steering too frequently. We set the weight of \(R_{steering}\) as \(c_3=0.1\).

  • Crash reward \(R_{crash}\): a dense negative reward defined as \(-1(c_{t})\), wherein the \(c_{t}\) denotes the collision between agents and any other objects at time \(t\) and \(1(\cdot)\) is the indicator function. It’s notable we do not use the termination strategy for collision as in MetaDrive~\citep{li2022metadrive}. We set the weight of \(R_{crash}\) as \(c_4=1.0\).

Cost Function

Similar to the reward function, we also provide default cost function to measure the safety during driving. The cost function will be placed in the returned information dict as info["cost"] after env.step function.

  • crash_vehicle_cost = 1.0: yield cost when crashing to other vehicles.

  • crash_human_cost = 1.0: yield cost when crashing to other vehicles.

  • crash_object_cost = 1.0: yield cost when crashing to objects, such as cones and triangles.

The implementation of cost function is simple:

from metaurban.utils import print_source
from metaurban.envs import SidewalkStaticMetaUrbanEnv
print_source(SidewalkStaticMetaUrbanEnv.cost_function)
def cost_function(self, vehicle_id: str):
    vehicle = self.agents[vehicle_id]
    step_info = dict()
    step_info["cost"] = 0
    if self._is_out_of_road(vehicle):
        step_info["cost"] = self.config["out_of_road_cost"]
    elif vehicle.crash_vehicle:
        step_info["cost"] = self.config["crash_vehicle_cost"]
    elif vehicle.crash_object:
        step_info["cost"] = self.config["crash_object_cost"]
    return step_info['cost'], step_info

You can modify this function to add more information to the step_info dict. For example, you can log what kind of object raises this cost. Thus you can calculate how many cars the ego vehicle collides with in one episode by summing up the number of vehicle crashes in each step.

Termination and Truncation

MetaUrban will terminate an episode of a vehicle if:

  1. the target vehicle arrive its destination,

  2. the vehicle drives out of the road,

  3. the vehicle crashes to other agents (vehicles),

  4. the vehicle crashes to obstacles,

  5. the vehicle crashes to human,

  6. reach max step (horizon) limits.

The above termination function is implemented as:

print_source(SidewalkStaticMetaUrbanEnv.done_function)
def done_function(self, vehicle_id: str):
    vehicle = self.agents[vehicle_id]
    done = False
    max_step = self.config["horizon"] is not None and self.episode_lengths[vehicle_id] >= self.config["horizon"]
    done_info = {
        TerminationState.CRASH_VEHICLE: vehicle.crash_vehicle,
        TerminationState.CRASH_OBJECT: vehicle.crash_object,
        TerminationState.CRASH_BUILDING: vehicle.crash_building,
        TerminationState.CRASH_HUMAN: vehicle.crash_human,
        TerminationState.CRASH_SIDEWALK: vehicle.crash_sidewalk,
        TerminationState.OUT_OF_ROAD: self._is_out_of_road(vehicle),
        TerminationState.SUCCESS: self._is_arrive_destination(vehicle) and not self._is_out_of_road(vehicle),
        TerminationState.MAX_STEP: max_step,
        TerminationState.ENV_SEED: self.current_seed,
        # TerminationState.CURRENT_BLOCK: self.agent.navigation.current_road.block_ID(),
        # crash_vehicle=False, crash_object=False, crash_building=False, out_of_road=False, arrive_dest=False,
    }

    # for compatibility
    # crash almost equals to crashing with vehicles
    done_info[TerminationState.CRASH] = (
        done_info[TerminationState.CRASH_VEHICLE] or done_info[TerminationState.CRASH_OBJECT]
        or done_info[TerminationState.CRASH_BUILDING] or done_info[TerminationState.CRASH_SIDEWALK]
        or done_info[TerminationState.CRASH_HUMAN]
    )

    # determine env return
    if done_info[TerminationState.SUCCESS]:
        done = True
        self.logger.info(
            "Episode ended! Scenario Index: {} Reason: arrive_dest.".format(self.current_seed),
            extra={"log_once": True}
        )
    if done_info[TerminationState.OUT_OF_ROAD]:
        done = True
        self.logger.info(
            "Episode ended! Scenario Index: {} Reason: out_of_road.".format(self.current_seed),
            extra={"log_once": True}
        )
    if done_info[TerminationState.CRASH_VEHICLE] and self.config["crash_vehicle_done"]:
        done = True
        self.logger.info(
            "Episode ended! Scenario Index: {} Reason: crash vehicle ".format(self.current_seed),
            extra={"log_once": True}
        )
    if done_info[TerminationState.CRASH_OBJECT] and self.config["crash_object_done"]:
        done = True
        self.logger.info(
            "Episode ended! Scenario Index: {} Reason: crash object ".format(self.current_seed),
            extra={"log_once": True}
        )
    if done_info[TerminationState.CRASH_BUILDING] and self.config["crash_building_done"]:
        done = True
        self.logger.info(
            "Episode ended! Scenario Index: {} Reason: crash building ".format(self.current_seed),
            extra={"log_once": True}
        )
    if done_info[TerminationState.CRASH_HUMAN] and self.config["crash_human_done"]:
        done = True
        self.logger.info(
            "Episode ended! Scenario Index: {} Reason: crash human".format(self.current_seed),
            extra={"log_once": True}
        )
    if done_info[TerminationState.MAX_STEP]:
        # single agent horizon has the same meaning as max_step_per_agent
        if self.config["truncate_as_terminate"]:
            done = True
        self.logger.info(
            "Episode ended! Scenario Index: {} Reason: max step ".format(self.current_seed),
            extra={"log_once": True}
        )
    return done, done_info

Step Information

The step information dict info contains rich information about current state of the environment and the target vehicle. The step info is collected from various sources such as the engine, reward function, termination function, traffic manager, agent manager and so on. We summarize the dict as follows:

    {
        # Number of vehicles being overtaken by ego vehicle in this episode
        'overtake_vehicle_num': 0,

        # Current velocity in km/h
        'velocity': 0.0,

        # The current normalized steering signal in [-1, 1]
        'steering': -0.06901532411575317,

        # The current normalized acceleration signal in [-1, 1]
        'acceleration': -0.2931942343711853,

        # The normalized action after clipped who is applied to the ego vehicle
        'raw_action': (-0.06901532411575317, -0.2931942343711853),

        # Whether crash to vehicle / object / building
        'crash_vehicle': False,
        'crash_object': False,
        'crash_building': False,
        'crash': False,  # Whether any kind of crash happens

        # Whether going out of the road / arrive destination
        # or exceeding the maximal episode length
        'out_of_road': False,
        'arrive_dest': False,
        'max_step': False,

        # The reward in this time step / the whole episode so far
        'step_reward': 0.0,
        'episode_reward': 0.0,

        # The cost in this time step
        'cost': 0,

        # The length of current episode
        'episode_length': 1
    }

The content of this dict keeps updating, and thus the content above may be out of date. We encourage users to write customized data to this dict, so more status can be exposed to monitor the simulation even without visualization.

Customization

To compose your own reward, cost and termination function. Just make a new environment and override the reward_function, cost_function, and termination_function of the base environment class. You can also record more information in step_info returned by these functions and deliver it outside the simulator.

from metaurban.envs import SidewalkStaticMetaUrbanEnv

class MyEnv(SidewalkStaticMetaUrbanEnv):
    
    def reward_function(*args, **kwargs):
        return -10, {"is_customized": True}
    
env=MyEnv({'object_density': 0.1})
env.reset()
_,r,_,_,info = env.step([0,0])
assert r==-10 and info["is_customized"]
print("reward: {}, `is_customized` in info: {}".format(r, info["is_customized"]))
env.close()
[INFO] Environment: MyEnv
[INFO] MetaUrban version: 0.0.1
[INFO] Sensors: [lidar: Lidar(), side_detector: SideDetector(), lane_line_detector: LaneLineDetector()]
[INFO] Render Mode: none
[INFO] Horizon (Max steps per agent): None
[INFO] Assets version: 0.0.1
[INFO] Known Pipes: glxGraphicsPipe
[INFO] Start Scenario Index: 0, Num Scenarios : 1
[WARNING] Not set var:walk_on_all_regions, so that agents can walk on all regions (orca_navigation.py:561)
[INFO] Agents can walk on all regions
[INFO] Agents can walk on all regions
reward: -10, `is_customized` in info: True