diff --git a/app/preprocessing/transform_dataset.py b/app/preprocessing/transform_dataset.py index c087805da051863680bb5cd1800fa5ccb60ba772..5711f94a9cc03e5a27b2767eede983db5acde0ee 100644 --- a/app/preprocessing/transform_dataset.py +++ b/app/preprocessing/transform_dataset.py @@ -20,6 +20,10 @@ MAX_DATASET_MEMORY_SIZE = 16602933278 MIN_JOBS = 2 VARIANCE_THRESHOLD = 0.01 CORRELATION_THRESHOLD = 0.9 +Y_CLASS_COLUMN = 'Maneuver' +MANUALLY_EXCLUDED_COLUMNS = [ + 'Tablet_Endpoint', +] def _ensure_shape(array: NDArray, shape: Tuple) -> NDArray: array_shape = array.shape @@ -89,9 +93,9 @@ def _apply_labels(df: pd.DataFrame, json_file: Path) -> pd.DataFrame: mask = (df.index.get_level_values('TimeStamp') >= maneuver.start_timestamp) & \ (df.index.get_level_values('TimeStamp') <= maneuver.end_timestamp) - df.loc[mask, 'Maneuver'] = maneuver.label + df.loc[mask, Y_CLASS_COLUMN] = maneuver.label - return df.fillna({'Maneuver': JsonManeuverData.NO_MANEUVER_LABEL}) + return df.fillna({Y_CLASS_COLUMN: JsonManeuverData.NO_MANEUVER_LABEL}) def _transform_parquet_file( file: Path, @@ -118,6 +122,9 @@ def _transform_parquet_file( # Rename columns df.rename(columns=lambda col: state_id_name_mapping[col], inplace=True) + # Drop manually evaluated columns + df = df.drop(columns=MANUALLY_EXCLUDED_COLUMNS) + # Parse columns df = _cast_columns(df, column_name_type_mapping) @@ -125,7 +132,7 @@ def _transform_parquet_file( df = _split_array_column(df) # Drop string columns - df = _remove_string_columns(df) + # df = _remove_string_columns(df) # Add labels df = _apply_labels(df, json_file) @@ -136,9 +143,19 @@ def _transform_parquet_file( print(f'Processed {filename}') -def _transform_complete_dataset(dataset_dir: Path, out_dir: Path): +def _transform_complete_dataset(dataset_dir: Path, out_dir: Path) -> Path: + string_columns_as_classes_out_dir = out_dir / 'str_columns_as_classes' + preprocessing_utils.recreate_dir(string_columns_as_classes_out_dir) parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet) - _remove_unimportant_predictors(parquet_files, out_dir) + _string_columns_to_classes(parquet_files, string_columns_as_classes_out_dir) + + unimportant_predictors_removed_out_dir = out_dir / 'removed_unimportant_predictors' + preprocessing_utils.recreate_dir(unimportant_predictors_removed_out_dir) + train_files = preprocessing_utils.train_files_from_dataset(string_columns_as_classes_out_dir) + all_files = preprocessing_utils.files_from_dataset(string_columns_as_classes_out_dir, FileType.Parquet) + _remove_unimportant_predictors(train_files, all_files, unimportant_predictors_removed_out_dir) + + return unimportant_predictors_removed_out_dir def _shared_columns(parquet_files: List[Path]) -> Set[str]: if len(parquet_files) == 0: @@ -151,10 +168,10 @@ def _shared_columns(parquet_files: List[Path]) -> Set[str]: return shared_columns -def _remove_unimportant_predictors(parquet_files: List[Path], out_dir: Path) -> None: - columns_to_keep: Set[str] = {'Maneuver'} +def _remove_unimportant_predictors(train_files: List[Path], all_files: List[Path], out_dir: Path) -> None: + columns_to_keep: Set[str] = {Y_CLASS_COLUMN} - for file in parquet_files: + for file in train_files: print(f'Collecting important predictors from {file.name}') df = pd.read_parquet(file) @@ -172,7 +189,7 @@ def _remove_unimportant_predictors(parquet_files: List[Path], out_dir: Path) -> columns_to_keep.update([col for col in upper_tri.columns if all(upper_tri[col] <= CORRELATION_THRESHOLD)]) - for file in parquet_files: + for file in all_files: print(f'Removing not important predictors from {file.name}') df = pd.read_parquet(file) @@ -181,6 +198,33 @@ def _remove_unimportant_predictors(parquet_files: List[Path], out_dir: Path) -> df.to_parquet(out_dir / file.name) +def _string_columns_to_classes(parquet_files: List[Path], out_dir: Path) -> None: + str_column_values: Dict[str, Set[str]] = {} + for file in parquet_files: + print(f'Collecting string classes from {file.stem}') + df = pd.read_parquet(file) + + for column in df.columns: + if preprocessing_utils.is_column_of_type(df[column], str) and column != Y_CLASS_COLUMN: + if str_column_values.get(column) is None: + str_column_values[column] = set() + + str_column_values[column].update(df[column].unique()) + + for file in parquet_files: + print(f'Applying classes to {file.stem}') + df = pd.read_parquet(file) + + for column in str_column_values.keys(): + one_hot = pd.get_dummies(df[column], prefix=column, dtype=np.int32) + one_hot_columns = [f"{column}_{value}" for value in str_column_values[column]] + one_hot = one_hot.reindex(columns=one_hot_columns, fill_value=0) + + df = df.drop(columns=[column]) + df = pd.concat([df, one_hot], axis=1) + + df.to_parquet(out_dir / file.name) + def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: Path, parallelize: bool = True) -> None: filewise_transformation_out_dir = out_dir / 'filewise_transformation' full_dataset_transformation_out_dir = out_dir / 'full_dataset_transformation' diff --git a/app/preprocessing/utils.py b/app/preprocessing/utils.py index 3372b03eedf8853d5f40cd5b631e5a9d1398422e..d18f92d4335d51aab6e9a39b1be79140e226f9aa 100644 --- a/app/preprocessing/utils.py +++ b/app/preprocessing/utils.py @@ -1,10 +1,11 @@ -from typing import List +from typing import List, Callable import shutil import os from pathlib import Path import pandas as pd from .file_type import FileType +from . import common_filenames def recreate_dir(dir: Path) -> None: if dir.exists(): @@ -12,8 +13,19 @@ def recreate_dir(dir: Path) -> None: os.makedirs(dir) -def files_from_dataset(dataset_dir: Path, dataType: FileType): - return [path for path in dataset_dir.glob(f'*{dataType.file_extension}') if path.is_file()] +def files_from_dataset(dataset_dir: Path, file_type: FileType) -> List[Path]: + return _files_from_dataset_where(dataset_dir, file_type) -def is_column_of_type(column: pd.Series, type: type): +def is_column_of_type(column: pd.Series, type: type) -> bool: return isinstance(column.values[0], type) + +def _files_from_dataset_where(dataset_dir: Path, file_type: FileType, predicate: Callable[[Path], bool] = None) -> List[Path]: + if predicate is None: + predicate = lambda _: True + return [path for path in dataset_dir.glob(f'*{file_type.file_extension}') if path.is_file() and predicate(path)] + +def train_files_from_dataset(dataset_dir: Path, file_type: FileType = FileType.Parquet) -> List[Path]: + return _files_from_dataset_where(dataset_dir, file_type, lambda p: p.stem not in common_filenames.TEST_DATA_FILES) + +def test_files_from_dataset(dataset_dir: Path, file_type: FileType = FileType.Parquet) -> List[Path]: + return _files_from_dataset_where(dataset_dir, file_type, lambda p: p.stem in common_filenames.TEST_DATA_FILES)