Skip to content
Snippets Groups Projects
Commit 1cd3be50 authored by Andri Joos's avatar Andri Joos :blush:
Browse files

abstract environment

parent 22438405
No related branches found
No related tags found
No related merge requests found
import numpy as np
from airsim import MultirotorClient
from envs import BaseEnvironment
import abc
from numpy.typing import NDArray
from tf_agents.environments.py_environment import PyEnvironment
from tf_agents.trajectories import time_step as ts
from tf_agents.typing.types import TimeStep, NestedArray, Text
from tf_agents.specs.array_spec import BoundedArraySpec, ArraySpec
class BaseEnvironment(PyEnvironment):
def __init__(self, ip: str = None, client: MultirotorClient = None, handle_auto_reset: bool = False):
if client is None:
self._client = MultirotorClient(ip=ip)
else:
self._client = client
super(BaseEnvironment, self).__init__(handle_auto_reset)
def _reset(self) -> TimeStep:
self._client.reset()
self._client.confirmConnection()
self._client.cancelLastTask()
self._client.armDisarm(True)
self._client.enableApiControl(True)
return ts.restart(self._get_observation())
def _step(self, action: NestedArray) -> TimeStep:
"""
action[0]: vz
action[1]: vx
action[2]: vy
"""
if self._must_restart():
return self.reset()
vz = self._convert_z_velocity_for_airsim(action[0].item())
vx = action[1].item()
vy = action[2].item()
# self._client.cancelLastTask() # Maybe needed when using async commands
self._client.moveByVelocityAsync(vx, vy, vz, 0.0001).join() # execute command for 0.0001 seconds
obs = self._get_observation()
reward = self._calculate_reward(obs)
if self._is_terminated(obs):
return ts.termination(obs, reward)
if self._is_truncated(obs):
return ts.truncation(obs, reward)
return ts.transition(obs, reward=reward)
def _convert_z_velocity_for_airsim(self, z_velocity: np.float64) -> np.float64:
"""
returns converted velocity:
- 1: max down
- 0: stop/hover
- -1: max up
"""
return z_velocity * (-2) + 1
def _convert_z_velocity_for_model(self, z_velocity: np.float64) -> np.float64:
"""
returns converted velocity:
- 0: max down
- 0.5: stop/hover
- 1: max up
"""
return (z_velocity - 1) / (-2)
def render(self, mode: Text = 'rgb_array') -> NestedArray | None:
raise NotImplementedError()
@abc.abstractmethod
def _is_terminated(self, obs: NDArray) -> bool:
""""""
@abc.abstractmethod
def _is_truncated(self, obs: NDArray) -> bool:
""""""
@abc.abstractmethod
def _must_restart(self) -> bool:
""""""
@abc.abstractmethod
def _get_observation(self) -> NDArray:
""""""
@abc.abstractmethod
def _calculate_reward(self, obs: NDArray) -> np.float32:
""""""
from typing import Text
import airsim
import numpy as np
import envs.HelloWorldEnv as HelloWorldEnv
from tf_agents.environments.py_environment import PyEnvironment
from tf_agents.trajectories import time_step as ts
from tf_agents.typing.types import NestedArraySpec, TimeStep, NestedArray
from tf_agents.specs.array_spec import BoundedArraySpec, ArraySpec
class HelloWorldEnv(PyEnvironment):
def __init__(self, ip: str = None, desired_height: np.float64=140.0, client: airsim.MultirotorClient = None, handle_auto_reset: bool = False, check_out_of_bounds: bool = False):
self._desired_height = desired_height
self._action_spec = BoundedArraySpec(shape=(1,), dtype=np.float32, minimum=0, maximum=1, name='action')
self._observation_spec = ArraySpec(shape=(1,), dtype=np.float64, name='observation')
self._client = airsim.MultirotorClient(ip=ip)
self._is_out_of_bounds = False
self._check_out_of_bounds = check_out_of_bounds
if client is None:
self._client = airsim.MultirotorClient(ip=ip)
else:
self._client = client
super(HelloWorldEnv, self).__init__(handle_auto_reset)
def action_spec(self) -> BoundedArraySpec:
return self._action_spec
def observation_spec(self) -> ArraySpec:
return self._observation_spec
def _reset(self) -> TimeStep:
self._is_out_of_bounds = False
self._client.reset()
self._client.confirmConnection()
self._client.cancelLastTask()
self._client.armDisarm(True)
self._client.enableApiControl(True)
return ts.restart(self._getObservation())
def _step(self, action: NestedArray) -> TimeStep:
if self._is_out_of_bounds:
return self.reset()
vz = action.item() * (-2) + 1 # Numbers in range 0-1
self._client.cancelLastTask()
self._client.moveByVelocityAsync(0, 0, vz, 0.0001).join() # execute command for 0.0001 seconds
obs = self._getObservation()
reward = HelloWorldEnv._calculate_reward(obs)
print("Diffrence to desired: {}".format(obs[0]))
if self._check_out_of_bounds and obs[0] > 30: # env could go higher, but crashes at some point (way higher than 170!)
self._is_out_of_bounds = True
return ts.truncation(obs, reward)
return ts.transition(obs, reward=reward)
def render(self, mode: Text = 'rgb_array') -> NestedArray | None:
raise NotImplementedError()
def _getObservation(self):
# geo_point = self._client.getGpsData().gnss.geo_point
geo_point = self._client.getMultirotorState().gps_location
# return geo_point.latitude, geo_point.longitude, geo_point.altitude
return np.array([geo_point.altitude - self._desired_height])
@staticmethod
def _calculate_reward(obs: tuple[float, float, float]):
# return -abs(133 - obs[2])
# if obs[0] > 132 and obs[0] < 134:
# return np.float32(100)
# return np.float32(0)
return np.float32(-(obs[0] ** 2))
from airsim import MultirotorClient
from numpy.typing import NDArray
from envs import BaseEnvironment, StartEnvironment
import numpy as np
from tf_agents.specs.array_spec import BoundedArraySpec, ArraySpec
from tf_agents.typing.types import TimeStep, NestedArray
class StartEnvironment(BaseEnvironment):
def __init__(self,
ip: str = None,
client: MultirotorClient = None,
handle_auto_reset: bool = False,
desired_height: np.float64=140.0,
check_out_of_bounds: bool = False):
self._desired_height = desired_height
self._check_out_of_bounds = check_out_of_bounds
self._is_out_of_bounds = False
self._action_spec = BoundedArraySpec(shape=(1,), dtype=np.float32, minimum=0, maximum=1, name='action')
self._observation_spec = ArraySpec(shape=(1,), dtype=np.float64, name='observation')
super(StartEnvironment, self).__init__(ip, client, handle_auto_reset)
def action_spec(self) -> BoundedArraySpec:
return self._action_spec
def observation_spec(self) -> ArraySpec:
return self._observation_spec
def _reset(self) -> TimeStep:
self._is_out_of_bounds = False
return super()._reset()
def _step(self, action: NestedArray) -> TimeStep:
full_action = np.append(action, [0, 0])
return super()._step(full_action)
def _is_terminated(self, obs: NDArray) -> bool:
return False
def _is_truncated(self, obs: NDArray) -> bool:
self._is_out_of_bounds = self._check_out_of_bounds and obs[0] > 30
return self._is_out_of_bounds
def _must_restart(self) -> bool:
return self._is_out_of_bounds
def _get_observation(self) -> NDArray:
geo_point = self._client.getMultirotorState().gps_location
return np.array([geo_point.altitude - self._desired_height])
def _calculate_reward(self, obs: NDArray) -> np.float32:
return np.float32(-(obs[0] ** 2))
from envs.HelloWorldEnv import HelloWorldEnv from envs.BaseEnvironment import BaseEnvironment
from envs.StartEnvironment import StartEnvironment
# import sys # import sys
# sys.path.append("../") # sys.path.append("../")
from envs import HelloWorldEnv from envs import StartEnvironment
import tensorflow as tf import tensorflow as tf
import airsim import airsim
import numpy as np import numpy as np
...@@ -46,7 +46,7 @@ def _execute_test(env, steps: int): ...@@ -46,7 +46,7 @@ def _execute_test(env, steps: int):
return _step_loop(env, steps, time_step) return _step_loop(env, steps, time_step)
def _make_env(desired_height: int) -> TFPyEnvironment: def _make_env(desired_height: int) -> TFPyEnvironment:
py_env = HelloWorldEnv(desired_height=desired_height, client=client) py_env = StartEnvironment(desired_height=desired_height, client=client)
return TFPyEnvironment(py_env) return TFPyEnvironment(py_env)
def _assert_last_time_steps(time_steps: []): def _assert_last_time_steps(time_steps: []):
......
from envs import HelloWorldEnv from envs import StartEnvironment
import tensorflow as tf import tensorflow as tf
import reverb import reverb
import os import os
...@@ -58,7 +58,7 @@ for gpu in gpus: ...@@ -58,7 +58,7 @@ for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True) tf.config.experimental.set_memory_growth(gpu, True)
py_env = HelloWorldEnv(ip=SIM_IP) py_env = StartEnvironment(ip=SIM_IP)
env = TFPyEnvironment(py_env) env = TFPyEnvironment(py_env)
# env.reset() # env.reset()
# t = env.step(np.float32(-1)) # t = env.step(np.float32(-1))
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment