Skip to content
Snippets Groups Projects
Commit 3b742078 authored by Andri Joos's avatar Andri Joos :blush:
Browse files

apply labels

parent ffcc4c81
No related branches found
No related tags found
No related merge requests found
from __future__ import annotations
from typing import Dict
from datetime import datetime
import pytz
class JsonManeuverData:
_label: str
_start_timestamp: datetime
_end_timestamp: datetime
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
NO_MANEUVER_LABEL = "no maneuver"
@property
def label(self) -> str:
return self._label
@property
def start_timestamp(self) -> datetime:
return self._start_timestamp
@property
def end_timestamp(self) -> datetime:
return self._end_timestamp
def __init__(self, label: str, start_timestamp: datetime, end_timestamp: datetime):
self._label = label
self._start_timestamp = start_timestamp
self._end_timestamp = end_timestamp
@staticmethod
def fromJson(json_data: Dict[str, str]) -> JsonManeuverData:
label = json_data['comment'].lower()
start_timestamp = datetime.strptime(json_data['startTimestamp'], JsonManeuverData.TIMESTAMP_FORMAT).replace(tzinfo=pytz.UTC)
end_timestamp = datetime.strptime(json_data['endTimeStamp'], JsonManeuverData.TIMESTAMP_FORMAT).replace(tzinfo=pytz.UTC)
return JsonManeuverData(label, start_timestamp, end_timestamp)
......@@ -13,6 +13,7 @@ import pyarrow.parquet
from . import utils as preprocessing_utils
from .file_type import FileType
from .json_maneuver_data import JsonManeuverData
DOUBLE_PATTERN = r'Double(\d+)'
MAX_DATASET_MEMORY_SIZE = 7408802660
......@@ -78,8 +79,23 @@ def _drop_non_shared_columns(df: pd.DataFrame, shared_columns: Set[str]) -> pd.D
return df
def _apply_labels(df: pd.DataFrame, json_file: Path) -> pd.DataFrame:
annotations: List[Dict[str, str]] = None
with open(json_file, 'r') as f:
annotations = json.load(f)['annotations']
maneuvers = [JsonManeuverData.fromJson(m) for m in annotations]
for maneuver in maneuvers:
mask = (df.index.get_level_values('TimeStamp') >= maneuver.start_timestamp) & \
(df.index.get_level_values('TimeStamp') <= maneuver.end_timestamp)
df.loc[mask, 'Maneuver'] = maneuver.label
return df.fillna({'Maneuver': JsonManeuverData.NO_MANEUVER_LABEL})
def _transform_parquet_file(
file: Path,
json_file: Path,
state_id_name_mapping: Dict[int, str],
column_name_type_mapping: Dict[str, str],
shared_columns: Set[str],
......@@ -111,6 +127,9 @@ def _transform_parquet_file(
# Drop string columns
df = _remove_string_columns(df)
# Add labels
df = _apply_labels(df, json_file)
print(f'Saving {filename}')
df.to_parquet(out_dir / filename)
# df.to_csv(out_dir / f'{file.stem}.csv')
......@@ -170,6 +189,12 @@ def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file:
preprocessing_utils.recreate_dir(full_dataset_transformation_out_dir)
parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet)
json_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Json)
json_files = [file for file in json_files if file.name != state_description_file.name]
parquet_files.sort()
json_files.sort()
file_tuples = zip(parquet_files, json_files)
shared_columns = _shared_columns(parquet_files)
......@@ -198,9 +223,9 @@ def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file:
print('Your system may run out of memory. In this case, don\'t use parallelization.')
n_jobs = max(MIN_JOBS, min(n_jobs_based_on_cpu, n_jobs_based_on_memory))
Parallel(n_jobs=n_jobs)(delayed(_transform_parquet_file_function_with_args)(file) for file in parquet_files)
Parallel(n_jobs=n_jobs)(delayed(_transform_parquet_file_function_with_args)(parquet_file, json_file) for parquet_file, json_file in file_tuples)
else:
for file in parquet_files:
_transform_parquet_file_function_with_args(file)
for parquet_file, json_file in file_tuples:
_transform_parquet_file_function_with_args(parquet_file, json_file)
_transform_complete_dataset(filewise_transformation_out_dir, full_dataset_transformation_out_dir)
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment