diff --git a/.vscode/launch.json b/.vscode/launch.json index 8c911f608c001d9f010f8f014289b44f7ae8c7f6..d27f7ce6ce1aca86a170f9dfcecc720eafb26dd9 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -15,7 +15,7 @@ "--dataset-dir", "dataset/plain", "--out", - "dataset/preprocessed", + "dataset/preprocessing", "--no-parallelization", ], }, diff --git a/app/preprocessing/json_maneuver_data.py b/app/preprocessing/json_maneuver_data.py new file mode 100644 index 0000000000000000000000000000000000000000..d04eaa4ace27f3fee8528c3c55949120cd509ff7 --- /dev/null +++ b/app/preprocessing/json_maneuver_data.py @@ -0,0 +1,37 @@ +from __future__ import annotations +from typing import Dict +from datetime import datetime +import pytz + +class JsonManeuverData: + _label: str + _start_timestamp: datetime + _end_timestamp: datetime + + TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" + NO_MANEUVER_LABEL = "no maneuver" + + @property + def label(self) -> str: + return self._label + + @property + def start_timestamp(self) -> datetime: + return self._start_timestamp + + @property + def end_timestamp(self) -> datetime: + return self._end_timestamp + + def __init__(self, label: str, start_timestamp: datetime, end_timestamp: datetime): + self._label = label + self._start_timestamp = start_timestamp + self._end_timestamp = end_timestamp + + @staticmethod + def fromJson(json_data: Dict[str, str]) -> JsonManeuverData: + label = json_data['comment'].lower() + start_timestamp = datetime.strptime(json_data['startTimestamp'], JsonManeuverData.TIMESTAMP_FORMAT).replace(tzinfo=pytz.UTC) + end_timestamp = datetime.strptime(json_data['endTimeStamp'], JsonManeuverData.TIMESTAMP_FORMAT).replace(tzinfo=pytz.UTC) + + return JsonManeuverData(label, start_timestamp, end_timestamp) diff --git a/app/preprocessing/transform_dataset.py b/app/preprocessing/transform_dataset.py index 28c97c1f3f54ec746e84cfceef856dd3b073f3b0..5711f94a9cc03e5a27b2767eede983db5acde0ee 100644 --- a/app/preprocessing/transform_dataset.py +++ b/app/preprocessing/transform_dataset.py @@ -1,4 +1,4 @@ -from typing import Dict, Any, Tuple +from typing import Dict, Any, Tuple, List, Set import pandas as pd from pathlib import Path import json @@ -9,13 +9,21 @@ from joblib import Parallel, delayed from functools import partial import psutil import multiprocessing +import pyarrow.parquet from . import utils as preprocessing_utils from .file_type import FileType +from .json_maneuver_data import JsonManeuverData DOUBLE_PATTERN = r'Double(\d+)' -MAX_DATASET_MEMORY_SIZE = 7408802660 +MAX_DATASET_MEMORY_SIZE = 16602933278 MIN_JOBS = 2 +VARIANCE_THRESHOLD = 0.01 +CORRELATION_THRESHOLD = 0.9 +Y_CLASS_COLUMN = 'Maneuver' +MANUALLY_EXCLUDED_COLUMNS = [ + 'Tablet_Endpoint', +] def _ensure_shape(array: NDArray, shape: Tuple) -> NDArray: array_shape = array.shape @@ -31,31 +39,71 @@ def _cast_columns(df: pd.DataFrame, column_type_mapping: Dict[str | int, str]) - double_match = re.match(DOUBLE_PATTERN, column_type) if column_type == 'Double': - df[column] = df[column].astype(np.float64) + df[column] = df[column].astype('Float64') elif column_type == 'Int32': - df[column] = df[column].astype(np.int32) + df[column] = df[column].astype('Int32') elif column_type == 'Boolean': - df[column] = df[column].astype(np.int32) + df[column] = df[column].astype('Int32') elif column_type == 'String': df[column] = df[column].astype(str) elif double_match: vector_rows = int(double_match.group(1)) # it is certain that this is an int because of the pattern - df[column] = df[column].apply(lambda vec: np.array(vec, dtype=np.float64)).apply(lambda arr: _ensure_shape(arr, (vector_rows,))) + df[column] = df[column] \ + .apply(lambda vec: np.array(vec, dtype=np.float64)) \ + .apply(lambda arr: _ensure_shape(arr, (vector_rows,)) if isinstance(arr, np.ndarray) else arr) # if it is not instance of np.ndarray, it is NaN (empty cell) else: raise ValueError(f'Unexpected type {column_type}') return df +def _split_array_column(df: pd.DataFrame) -> pd.DataFrame: + array_columns = [col for col in df.columns if preprocessing_utils.is_column_of_type(df[col], np.ndarray)] + for column in array_columns: + array_dtype = df[column].iloc[0].dtype # First row must have a value + stacked_arrays = np.stack(df[column].values, dtype=array_dtype) # is faster than df[column].apply(lambda vec: pd.Series(vec, dtype=array_dtype)) + expanded_columns = pd.DataFrame(stacked_arrays, index=df.index) + expanded_columns.columns = [f'{column}_{i}' for i in range(expanded_columns.shape[1])] + + df = df.join(expanded_columns).drop(columns=[column]) + + return df + +def _remove_string_columns(df: pd.DataFrame) -> pd.DataFrame: + string_columns = [col for col in df.columns if preprocessing_utils.is_column_of_type(df[col], str)] + return df.drop(columns=string_columns) + +def _drop_non_shared_columns(df: pd.DataFrame, shared_columns: Set[str]) -> pd.DataFrame: + columns_to_drop = [column for column in df.columns if str(column) not in shared_columns] + df = df.drop(columns=columns_to_drop) + + return df + +def _apply_labels(df: pd.DataFrame, json_file: Path) -> pd.DataFrame: + annotations: List[Dict[str, str]] = None + with open(json_file, 'r') as f: + annotations = json.load(f)['annotations'] + + maneuvers = [JsonManeuverData.fromJson(m) for m in annotations] + for maneuver in maneuvers: + mask = (df.index.get_level_values('TimeStamp') >= maneuver.start_timestamp) & \ + (df.index.get_level_values('TimeStamp') <= maneuver.end_timestamp) + + df.loc[mask, Y_CLASS_COLUMN] = maneuver.label + + return df.fillna({Y_CLASS_COLUMN: JsonManeuverData.NO_MANEUVER_LABEL}) + def _transform_parquet_file( file: Path, + json_file: Path, state_id_name_mapping: Dict[int, str], column_name_type_mapping: Dict[str, str], - out_dir: Path + shared_columns: Set[str], + out_dir: Path, ) -> None: pd.set_option('future.no_silent_downcasting', True) filename = file.name @@ -66,23 +114,133 @@ def _transform_parquet_file( print(f'Processing {filename}') df = df.sort_values(by='FrameCounter') + df = _drop_non_shared_columns(df, shared_columns) + # Forward fill df = df.ffill() # Rename columns df.rename(columns=lambda col: state_id_name_mapping[col], inplace=True) + # Drop manually evaluated columns + df = df.drop(columns=MANUALLY_EXCLUDED_COLUMNS) + # Parse columns df = _cast_columns(df, column_name_type_mapping) + # Split arrays + df = _split_array_column(df) + + # Drop string columns + # df = _remove_string_columns(df) + + # Add labels + df = _apply_labels(df, json_file) + print(f'Saving {filename}') df.to_parquet(out_dir / filename) # df.to_csv(out_dir / f'{file.stem}.csv') print(f'Processed {filename}') +def _transform_complete_dataset(dataset_dir: Path, out_dir: Path) -> Path: + string_columns_as_classes_out_dir = out_dir / 'str_columns_as_classes' + preprocessing_utils.recreate_dir(string_columns_as_classes_out_dir) + parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet) + _string_columns_to_classes(parquet_files, string_columns_as_classes_out_dir) + + unimportant_predictors_removed_out_dir = out_dir / 'removed_unimportant_predictors' + preprocessing_utils.recreate_dir(unimportant_predictors_removed_out_dir) + train_files = preprocessing_utils.train_files_from_dataset(string_columns_as_classes_out_dir) + all_files = preprocessing_utils.files_from_dataset(string_columns_as_classes_out_dir, FileType.Parquet) + _remove_unimportant_predictors(train_files, all_files, unimportant_predictors_removed_out_dir) + + return unimportant_predictors_removed_out_dir + +def _shared_columns(parquet_files: List[Path]) -> Set[str]: + if len(parquet_files) == 0: + return {} + + shared_columns: Set[str] = set(pyarrow.parquet.read_schema(parquet_files[0]).names) + for file in parquet_files[1:]: + columns = pyarrow.parquet.read_schema(file).names + shared_columns.intersection_update(columns) + + return shared_columns + +def _remove_unimportant_predictors(train_files: List[Path], all_files: List[Path], out_dir: Path) -> None: + columns_to_keep: Set[str] = {Y_CLASS_COLUMN} + + for file in train_files: + print(f'Collecting important predictors from {file.name}') + df = pd.read_parquet(file) + + columns_to_analyze = [col for col in df.columns if col not in columns_to_keep] + columns_to_keep.update([col for col in columns_to_analyze if np.std(df[col]) >= VARIANCE_THRESHOLD]) + + df: pd.DataFrame = df.drop(columns=columns_to_keep) + + correlation_matrix = df.corr().abs() + + # Select the upper triangle of the correlation matrix + upper_tri = correlation_matrix.where( + np.triu(np.ones(correlation_matrix.shape), k=1).astype(bool) + ) + + columns_to_keep.update([col for col in upper_tri.columns if all(upper_tri[col] <= CORRELATION_THRESHOLD)]) + + for file in all_files: + print(f'Removing not important predictors from {file.name}') + df = pd.read_parquet(file) + + columns_to_drop = [col for col in df.columns if col not in columns_to_keep] + df.drop(columns=columns_to_drop) + + df.to_parquet(out_dir / file.name) + +def _string_columns_to_classes(parquet_files: List[Path], out_dir: Path) -> None: + str_column_values: Dict[str, Set[str]] = {} + for file in parquet_files: + print(f'Collecting string classes from {file.stem}') + df = pd.read_parquet(file) + + for column in df.columns: + if preprocessing_utils.is_column_of_type(df[column], str) and column != Y_CLASS_COLUMN: + if str_column_values.get(column) is None: + str_column_values[column] = set() + + str_column_values[column].update(df[column].unique()) + + for file in parquet_files: + print(f'Applying classes to {file.stem}') + df = pd.read_parquet(file) + + for column in str_column_values.keys(): + one_hot = pd.get_dummies(df[column], prefix=column, dtype=np.int32) + one_hot_columns = [f"{column}_{value}" for value in str_column_values[column]] + one_hot = one_hot.reindex(columns=one_hot_columns, fill_value=0) + + df = df.drop(columns=[column]) + df = pd.concat([df, one_hot], axis=1) + + df.to_parquet(out_dir / file.name) + def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: Path, parallelize: bool = True) -> None: + filewise_transformation_out_dir = out_dir / 'filewise_transformation' + full_dataset_transformation_out_dir = out_dir / 'full_dataset_transformation' preprocessing_utils.recreate_dir(out_dir) + preprocessing_utils.recreate_dir(filewise_transformation_out_dir) + preprocessing_utils.recreate_dir(full_dataset_transformation_out_dir) + + parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet) + json_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Json) + json_files = [file for file in json_files if file.name != state_description_file.name] + parquet_files.sort() + json_files.sort() + + file_tuples = zip(parquet_files, json_files) + + shared_columns = _shared_columns(parquet_files) state_id_name_mapping: Dict[int, str] = None column_name_type_mapping: Dict[str, str] = None @@ -91,12 +249,11 @@ def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: state_id_name_mapping = {v['stateId']: k for k, v in state_descriptions.items()} column_name_type_mapping = {k: v['dataType'] for k, v in state_descriptions.items()} - parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet) - _transform_parquet_file_function_with_args = partial(_transform_parquet_file, state_id_name_mapping=state_id_name_mapping, column_name_type_mapping=column_name_type_mapping, - out_dir=out_dir, + shared_columns=shared_columns, + out_dir=filewise_transformation_out_dir, ) if parallelize: @@ -110,7 +267,10 @@ def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: print('Your system may run out of memory. In this case, don\'t use parallelization.') n_jobs = max(MIN_JOBS, min(n_jobs_based_on_cpu, n_jobs_based_on_memory)) - Parallel(n_jobs=n_jobs)(delayed(_transform_parquet_file_function_with_args)(file) for file in parquet_files) + print(f'Using {n_jobs} jobs') + Parallel(n_jobs=n_jobs)(delayed(_transform_parquet_file_function_with_args)(parquet_file, json_file) for parquet_file, json_file in file_tuples) else: - for file in parquet_files: - _transform_parquet_file_function_with_args(file) + for parquet_file, json_file in file_tuples: + _transform_parquet_file_function_with_args(parquet_file, json_file) + + _transform_complete_dataset(filewise_transformation_out_dir, full_dataset_transformation_out_dir) diff --git a/app/preprocessing/utils.py b/app/preprocessing/utils.py index 56d1250e663f3289054faf110c3a50a9daa44fd6..d18f92d4335d51aab6e9a39b1be79140e226f9aa 100644 --- a/app/preprocessing/utils.py +++ b/app/preprocessing/utils.py @@ -1,8 +1,11 @@ -from typing import List +from typing import List, Callable import shutil import os from pathlib import Path +import pandas as pd + from .file_type import FileType +from . import common_filenames def recreate_dir(dir: Path) -> None: if dir.exists(): @@ -10,5 +13,19 @@ def recreate_dir(dir: Path) -> None: os.makedirs(dir) -def files_from_dataset(dataset_dir: Path, dataType: FileType): - return [path for path in dataset_dir.glob(f'*{dataType.file_extension}') if path.is_file()] +def files_from_dataset(dataset_dir: Path, file_type: FileType) -> List[Path]: + return _files_from_dataset_where(dataset_dir, file_type) + +def is_column_of_type(column: pd.Series, type: type) -> bool: + return isinstance(column.values[0], type) + +def _files_from_dataset_where(dataset_dir: Path, file_type: FileType, predicate: Callable[[Path], bool] = None) -> List[Path]: + if predicate is None: + predicate = lambda _: True + return [path for path in dataset_dir.glob(f'*{file_type.file_extension}') if path.is_file() and predicate(path)] + +def train_files_from_dataset(dataset_dir: Path, file_type: FileType = FileType.Parquet) -> List[Path]: + return _files_from_dataset_where(dataset_dir, file_type, lambda p: p.stem not in common_filenames.TEST_DATA_FILES) + +def test_files_from_dataset(dataset_dir: Path, file_type: FileType = FileType.Parquet) -> List[Path]: + return _files_from_dataset_where(dataset_dir, file_type, lambda p: p.stem in common_filenames.TEST_DATA_FILES)