ResetScene-Henrik
From Robin
Here is a full code example on how to reset a scene to get a deterministic behavior
Warning: From testing, it seems that the answers Unity gives depend on the type of OS (Windows, Linux), OS version (Ubuntu 18, 22) and type of processor (Intel, AMD). Deterministic here means that you get the same answer on the same set-up. This is not a cross-platform solution.
Contents |
Unity
Here we have two objects, one Robot manager (RobotManager.cs) that connect to ML-agents on the python side and a Robot (Robot.cs) that is our agent. Robot will be controlled and can collect observations for ML agents.
RobotManager
public class RobotManager : MonoBehaviour { // The importent part for making a singleton private static RobotManager instance; public static RobotManager Instance { get { return instance; } } [Tooltip("Makes sure the physics start after n fixed updates")] [SerializeField] int warmupFixedUpdates = 80; // This is a custom sidechannel to send some more data. Not mandatory public ConfigSideChannel rc; public void Awake() { if (instance != null && instance != this) { // The importent part for making a singleton this.gameObject.SetActive(false); Destroy(this.gameObject); return; } else if (instance == null) { instance = this; } DontDestroyOnLoad(this.gameObject); if (rc == null) { rc = new ConfigSideChannel(); SideChannelManager.RegisterSideChannel(rc); Unity.MLAgents.Academy.Instance.OnEnvironmentReset += ResetScene; //this is to reload the entire scene when sending the reset command from python } } public void ResetScene() { // Will be run when sending the reset command SceneManager.LoadScene("SampleScene"); //Loads and overwrites the existing scene // WARNING SE BELOW StartCoroutine(SpawnENV()); // Start a coroutine/thread that waits n FixedUpdates for the scene to load // Instantiate the robot/agent here! } IEnumerator SpawnENV() { // Wait for scene to load properly for (int i = 0; i < warmupFixedUpdates; i++) yield return new WaitForFixedUpdate(); } public void OnDestroy(){ if (Academy.IsInitialized && rc != null) SideChannelManager.UnregisterSideChannel(rc); } }
The issue with the existing setup is that it relies on how fast the simulation runs, which is likely due to the way co-routines and the scene are configured. Right now, Unity starts the simulation as soon as it can, without waiting for SpawnENV to complete. This is not a problem if you run the simulation at a normal speed (with a timescale of 1), because you'll get consistent results. However, running it at a different speed can lead to unpredictable outcomes.
Currently, the robot is already placed in the scene. It gets created and starts receiving commands from the machine learning agents as soon as the scene is loaded. What would have been better is to create or "spawn" the agent in your code only after SpawnENV is done waiting. This way, you can ensure the agent only starts receiving commands once everything else is set up properly.
Robot/Agent
This robot is based on ML-agents crawler example
public class Robot : Agent { JointDriveController m_JdController; // Control joints private Transform Center; // Center of the robot, a prefab private ConfigSideChannel parameterChannel; // The custom sidechannel public void Awake() //Do not need anything here, happens before Initialize() { } public override void Initialize() { parameterChannel = FindObjectOfType<RobotManager>().rc; m_JdController = GetComponent<JointDriveController>(); Center = transform.Find("Center"); } public override void OnEpisodeBegin() // Do not need anything here, because the whole scene is reset { return; } public override void OnActionReceived(ActionBuffers actions) { // For The data from python part // ActionBuffers is like an array. To access an continuous action: actions.ContinuousActions[0] } public override void CollectObservations(VectorSensor sensor){ sensor.AddObservation(Center.localPosition); // 3 observations // Observe the agent local rotation in eulerangels (3 observations) sensor.AddObservation(Center.localRotation.eulerAngles); } public void OnDestroy() { if (Academy.IsInitialized){ SideChannelManager.UnregisterSideChannel(parameterChannel); } } }
Python
class UnityEvaluator: def __init__(self, evaluation_steps:int, qutee_config = None, editor_mode:bool=False, headless:bool=False, worker_id:int=0, time_scale:float=1): self.MAX_N_STEPS_PER_EVALUATION = evaluation_steps config_sideChannel = ConfigSideChannel() EngineChannel = EngineConfigurationChannel() side_channels = [] if qutee_config != None: side_channels.append(config_sideChannel) if time_scale > 1: side_channels.append(EngineChannel) self.BUILD_PATH = self._getBuild_Path() if editor_mode: self.env = UnityEnvironment(file_name=None, side_channels=side_channels, seed=1) elif headless: self.env = UnityEnvironment(file_name=self.BUILD_PATH, side_channels=side_channels, seed=1, no_graphics=True, worker_id=worker_id) else: self.env = UnityEnvironment(file_name=self.BUILD_PATH, side_channels=side_channels, seed=1, no_graphics=False, worker_id=worker_id) if time_scale > 1: EngineChannel.set_configuration_parameters(time_scale=time_scale) self.env.reset() def _getBuild_Path(self) -> str: plt = platform.system() if plt == "Windows": return "../Build/Windows/Program.exe" elif plt == "Linux": return "../Build/Linux/Program.x86_64" elif plt == "Darwin": raise NotImplementedError("Mac er ikke implementer ennå") else: print("Unidentified system") raise NotImplementedError("OS ikke gjennkjent") def close(self): self.env.close() def evaluate(self): DELTA_TIME = 0.01 self.env.reset() individual_name = list(self.env._env_specs)[0] # Henter mlagentene vil her være: Qutee_behavior for t in range(self.MAX_N_STEPS_PER_EVALUATION): # max antall steps per episode decisionSteps,other = self.env.get_steps(individual_name) if (len(decisionSteps.agent_id)>0): action = generate_action(t) obs = decisionSteps.obs end_position = obs[0][0][:3] # Henter observasjonene til agent 0 end_rotation = obs[0][0][3:6] reward += decisionSteps.reward for id in decisionSteps.agent_id: # Går gjennom alle agenter og setter dems actions self.env.set_action_for_agent(individual_name,id,ActionTuple(action)) self.env.step() #Når all data er satt setter man et setp else: print("Ingen obs fanget opp") return