From ffcc4c81ffdfa7af6136105203f90a681dde0fc8 Mon Sep 17 00:00:00 2001 From: Andri Joos <andri@joos.io> Date: Sat, 9 Nov 2024 17:16:00 +0100 Subject: [PATCH] drop unimportant data based on variance and covariance --- app/preprocessing/transform_dataset.py | 44 +++++++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/app/preprocessing/transform_dataset.py b/app/preprocessing/transform_dataset.py index dbb1834..ab2fb31 100644 --- a/app/preprocessing/transform_dataset.py +++ b/app/preprocessing/transform_dataset.py @@ -17,6 +17,8 @@ from .file_type import FileType DOUBLE_PATTERN = r'Double(\d+)' MAX_DATASET_MEMORY_SIZE = 7408802660 MIN_JOBS = 2 +VARIANCE_THRESHOLD = 0.01 +CORRELATION_THRESHOLD = 0.9 def _ensure_shape(array: NDArray, shape: Tuple) -> NDArray: array_shape = array.shape @@ -115,6 +117,10 @@ def _transform_parquet_file( print(f'Processed {filename}') +def _transform_complete_dataset(dataset_dir: Path, out_dir: Path): + parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet) + _remove_unimportant_predictors(parquet_files, out_dir) + def _shared_columns(parquet_files: List[Path]) -> Set[str]: if len(parquet_files) == 0: return {} @@ -126,8 +132,42 @@ def _shared_columns(parquet_files: List[Path]) -> Set[str]: return shared_columns +def _remove_unimportant_predictors(parquet_files: List[Path], out_dir: Path) -> None: + columns_to_keep: Set[str] = {'Maneuver'} + + for file in parquet_files: + print(f'Collecting important predictors from {file.name}') + df = pd.read_parquet(file) + + columns_to_analyze = [col for col in df.columns if col not in columns_to_keep] + columns_to_keep.update([col for col in columns_to_analyze if np.std(df[col]) >= VARIANCE_THRESHOLD]) + + df: pd.DataFrame = df.drop(columns=columns_to_keep) + + correlation_matrix = df.corr().abs() + + # Select the upper triangle of the correlation matrix + upper_tri = correlation_matrix.where( + np.triu(np.ones(correlation_matrix.shape), k=1).astype(bool) + ) + + columns_to_keep.update([col for col in upper_tri.columns if all(upper_tri[col] <= CORRELATION_THRESHOLD)]) + + for file in parquet_files: + print(f'Removing not important predictors from {file.name}') + df = pd.read_parquet(file) + + columns_to_drop = [col for col in df.columns if col not in columns_to_keep] + df.drop(columns=columns_to_drop) + + df.to_parquet(out_dir / file.name) + def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: Path, parallelize: bool = True) -> None: + filewise_transformation_out_dir = out_dir / 'filewise_transformation' + full_dataset_transformation_out_dir = out_dir / 'full_dataset_transformation' preprocessing_utils.recreate_dir(out_dir) + preprocessing_utils.recreate_dir(filewise_transformation_out_dir) + preprocessing_utils.recreate_dir(full_dataset_transformation_out_dir) parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet) @@ -144,7 +184,7 @@ def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: state_id_name_mapping=state_id_name_mapping, column_name_type_mapping=column_name_type_mapping, shared_columns=shared_columns, - out_dir=out_dir, + out_dir=filewise_transformation_out_dir, ) if parallelize: @@ -162,3 +202,5 @@ def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: else: for file in parquet_files: _transform_parquet_file_function_with_args(file) + + _transform_complete_dataset(filewise_transformation_out_dir, full_dataset_transformation_out_dir) -- GitLab