From 3b742078384d2f359ef8ee3851279d1ea907e806 Mon Sep 17 00:00:00 2001 From: Andri Joos <andri@joos.io> Date: Sun, 10 Nov 2024 22:46:14 +0100 Subject: [PATCH] apply labels --- app/preprocessing/json_maneuver_data.py | 37 +++++++++++++++++++++++++ app/preprocessing/transform_dataset.py | 31 +++++++++++++++++++-- 2 files changed, 65 insertions(+), 3 deletions(-) create mode 100644 app/preprocessing/json_maneuver_data.py diff --git a/app/preprocessing/json_maneuver_data.py b/app/preprocessing/json_maneuver_data.py new file mode 100644 index 0000000..d04eaa4 --- /dev/null +++ b/app/preprocessing/json_maneuver_data.py @@ -0,0 +1,37 @@ +from __future__ import annotations +from typing import Dict +from datetime import datetime +import pytz + +class JsonManeuverData: + _label: str + _start_timestamp: datetime + _end_timestamp: datetime + + TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" + NO_MANEUVER_LABEL = "no maneuver" + + @property + def label(self) -> str: + return self._label + + @property + def start_timestamp(self) -> datetime: + return self._start_timestamp + + @property + def end_timestamp(self) -> datetime: + return self._end_timestamp + + def __init__(self, label: str, start_timestamp: datetime, end_timestamp: datetime): + self._label = label + self._start_timestamp = start_timestamp + self._end_timestamp = end_timestamp + + @staticmethod + def fromJson(json_data: Dict[str, str]) -> JsonManeuverData: + label = json_data['comment'].lower() + start_timestamp = datetime.strptime(json_data['startTimestamp'], JsonManeuverData.TIMESTAMP_FORMAT).replace(tzinfo=pytz.UTC) + end_timestamp = datetime.strptime(json_data['endTimeStamp'], JsonManeuverData.TIMESTAMP_FORMAT).replace(tzinfo=pytz.UTC) + + return JsonManeuverData(label, start_timestamp, end_timestamp) diff --git a/app/preprocessing/transform_dataset.py b/app/preprocessing/transform_dataset.py index ab2fb31..e06441b 100644 --- a/app/preprocessing/transform_dataset.py +++ b/app/preprocessing/transform_dataset.py @@ -13,6 +13,7 @@ import pyarrow.parquet from . import utils as preprocessing_utils from .file_type import FileType +from .json_maneuver_data import JsonManeuverData DOUBLE_PATTERN = r'Double(\d+)' MAX_DATASET_MEMORY_SIZE = 7408802660 @@ -78,8 +79,23 @@ def _drop_non_shared_columns(df: pd.DataFrame, shared_columns: Set[str]) -> pd.D return df +def _apply_labels(df: pd.DataFrame, json_file: Path) -> pd.DataFrame: + annotations: List[Dict[str, str]] = None + with open(json_file, 'r') as f: + annotations = json.load(f)['annotations'] + + maneuvers = [JsonManeuverData.fromJson(m) for m in annotations] + for maneuver in maneuvers: + mask = (df.index.get_level_values('TimeStamp') >= maneuver.start_timestamp) & \ + (df.index.get_level_values('TimeStamp') <= maneuver.end_timestamp) + + df.loc[mask, 'Maneuver'] = maneuver.label + + return df.fillna({'Maneuver': JsonManeuverData.NO_MANEUVER_LABEL}) + def _transform_parquet_file( file: Path, + json_file: Path, state_id_name_mapping: Dict[int, str], column_name_type_mapping: Dict[str, str], shared_columns: Set[str], @@ -111,6 +127,9 @@ def _transform_parquet_file( # Drop string columns df = _remove_string_columns(df) + # Add labels + df = _apply_labels(df, json_file) + print(f'Saving {filename}') df.to_parquet(out_dir / filename) # df.to_csv(out_dir / f'{file.stem}.csv') @@ -170,6 +189,12 @@ def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: preprocessing_utils.recreate_dir(full_dataset_transformation_out_dir) parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet) + json_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Json) + json_files = [file for file in json_files if file.name != state_description_file.name] + parquet_files.sort() + json_files.sort() + + file_tuples = zip(parquet_files, json_files) shared_columns = _shared_columns(parquet_files) @@ -198,9 +223,9 @@ def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: print('Your system may run out of memory. In this case, don\'t use parallelization.') n_jobs = max(MIN_JOBS, min(n_jobs_based_on_cpu, n_jobs_based_on_memory)) - Parallel(n_jobs=n_jobs)(delayed(_transform_parquet_file_function_with_args)(file) for file in parquet_files) + Parallel(n_jobs=n_jobs)(delayed(_transform_parquet_file_function_with_args)(parquet_file, json_file) for parquet_file, json_file in file_tuples) else: - for file in parquet_files: - _transform_parquet_file_function_with_args(file) + for parquet_file, json_file in file_tuples: + _transform_parquet_file_function_with_args(parquet_file, json_file) _transform_complete_dataset(filewise_transformation_out_dir, full_dataset_transformation_out_dir) -- GitLab