Source code for dm_robotics.panda.environment

"""RL environment builder for the Panda MoMa model."""
import collections
import typing
from typing import Sequence, Union

from dm_control import composer
from dm_robotics.agentflow.preprocessors import timestep_preprocessor
from dm_robotics.geometry import joint_angles_distribution, pose_distribution
from dm_robotics.moma import base_task, effector, entity_initializer, prop
from dm_robotics.moma import robot as robot_module
from dm_robotics.moma import (scene_initializer, sensor, subtask_env,
                              subtask_env_builder)
from dm_robotics.moma.models.arenas import empty

from dm_robotics import agentflow as af

from . import arm, hardware
from . import parameters as params
from . import utils


[docs] class PandaEnvironment: """Adds Panda robots to an arena and creates a subtask environment."""
[docs] def __init__(self, robot_params: Union[params.RobotParams, Sequence[params.RobotParams]], arena: composer.Arena = empty.Arena(), control_timestep: float = 0.1, physics_timestep: float = 0.002) -> None: self._arena = arena self._robot_params = robot_params if isinstance( robot_params, Sequence) else [robot_params] self._control_timestep = control_timestep self._arena.mjcf_model.option.timestep = physics_timestep self._robots = collections.OrderedDict() for robot_params in self._robot_params: if robot_params.robot_ip is not None: robot = hardware.build_robot(robot_params, control_timestep=self._control_timestep) else: robot = arm.build_robot(robot_params, control_timestep=self._control_timestep) self._robots[robot_params.name] = robot for robot, robot_params in zip(self._robots.values(), self._robot_params): self._arena.attach(robot.arm, robot_params.attach_site) self._scene_initializers = [] self._entity_initializers = [] self._extra_sensors = [] self._extra_effectors = [] self._props = [] self._timestep_preprocessors = []
@property def robots(self) -> typing.Dict[str, robot_module.Robot]: """A map of the configured robots indexed by name.""" return self._robots
[docs] def build_task_environment(self) -> subtask_env.SubTaskEnvironment: """Builds a subtask environment. The environment includes the configured robots as well as any added components such as props, initializers, preprocessors etc. """ self._extra_sensors.append(utils.TimeSensor()) task = base_task.BaseTask( task_name='panda', arena=self._arena, robots=self._robots.values(), props=self._props, extra_sensors=self._extra_sensors, extra_effectors=self._extra_effectors, control_timestep=self._control_timestep, scene_initializer=self._build_scene_initializer(), episode_initializer=self._build_entity_initializer()) env_builder = subtask_env_builder.SubtaskEnvBuilder() env_builder.set_task(task) base_env = env_builder.build_base_env() physics = base_env.physics parent_action_spec = task.effectors_action_spec(physics) full_action_space = af.IdentityActionSpace(parent_action_spec) for preproc in self._timestep_preprocessors: env_builder.add_preprocessor(preproc) env_builder.set_action_space(full_action_space) env = env_builder.build() env._env._raise_exception_on_physics_error = False return env
[docs] def add_props(self, props: Sequence[prop.Prop]): """Add props as free entities to the arena.""" self._props.extend(props) for p in self._props: frame = self._arena.add_free_entity(p) p.set_freejoint(frame.freejoint)
[docs] def add_entity_initializers( self, initializers: Sequence[entity_initializer.base_initializer.Initializer]): """Add entity initializers.""" self._entity_initializers.extend(initializers)
[docs] def add_scene_initializers( self, initializers: Sequence[base_task.SceneInitializer]): """Add scene initializers.""" self._scene_initializers.extend(initializers)
[docs] def add_timestep_preprocessors( self, preprocessors: Sequence[timestep_preprocessor.TimestepPreprocessor]): """Add timestep preprocessors.""" self._timestep_preprocessors.extend(preprocessors)
[docs] def add_extra_sensors(self, extra_sensors: Sequence[sensor.Sensor]): """Add extra sensor.""" self._extra_sensors.extend(extra_sensors)
[docs] def add_extra_effectors(self, extra_effectors: Sequence[effector.Effector]): """Add extra effectors.""" self._extra_effectors.extend(extra_effectors)
def _build_scene_initializer(self) -> base_task.SceneInitializer: for robot, robot_params in zip(self._robots.values(), self._robot_params): if robot_params.pose is not None: arm_pose = pose_distribution.ConstantPoseDistribution(robot_params.pose) self._scene_initializers.insert( 0, scene_initializer.EntityPoseInitializer(robot.arm_frame, arm_pose.sample_pose)) return scene_initializer.CompositeSceneInitializer(self._scene_initializers) def _build_entity_initializer( self) -> entity_initializer.base_initializer.Initializer: for robot, robot_params in zip(self._robots.values(), self._robot_params): arm_joint_values = joint_angles_distribution.ConstantPanTiltDistribution( robot_params.joint_positions) arm_init = entity_initializer.JointsInitializer( robot.position_arm_joints, arm_joint_values.sample_angles) self._entity_initializers.insert(0, arm_init) if robot_params.has_hand: gripper_joint_values = joint_angles_distribution.ConstantPanTiltDistribution( [.04, .04]) initialize_gripper = entity_initializer.JointsInitializer( robot.gripper.set_joint_positions, gripper_joint_values.sample_angles) self._entity_initializers.insert(0, initialize_gripper) return entity_initializer.TaskEntitiesInitializer(self._entity_initializers)