Skip to content
Snippets Groups Projects
Commit ffcc4c81 authored by Andri Joos's avatar Andri Joos :blush:
Browse files

drop unimportant data based on variance and covariance

parent 66876851
No related branches found
No related tags found
No related merge requests found
......@@ -17,6 +17,8 @@ from .file_type import FileType
DOUBLE_PATTERN = r'Double(\d+)'
MAX_DATASET_MEMORY_SIZE = 7408802660
MIN_JOBS = 2
VARIANCE_THRESHOLD = 0.01
CORRELATION_THRESHOLD = 0.9
def _ensure_shape(array: NDArray, shape: Tuple) -> NDArray:
array_shape = array.shape
......@@ -115,6 +117,10 @@ def _transform_parquet_file(
print(f'Processed {filename}')
def _transform_complete_dataset(dataset_dir: Path, out_dir: Path):
parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet)
_remove_unimportant_predictors(parquet_files, out_dir)
def _shared_columns(parquet_files: List[Path]) -> Set[str]:
if len(parquet_files) == 0:
return {}
......@@ -126,8 +132,42 @@ def _shared_columns(parquet_files: List[Path]) -> Set[str]:
return shared_columns
def _remove_unimportant_predictors(parquet_files: List[Path], out_dir: Path) -> None:
columns_to_keep: Set[str] = {'Maneuver'}
for file in parquet_files:
print(f'Collecting important predictors from {file.name}')
df = pd.read_parquet(file)
columns_to_analyze = [col for col in df.columns if col not in columns_to_keep]
columns_to_keep.update([col for col in columns_to_analyze if np.std(df[col]) >= VARIANCE_THRESHOLD])
df: pd.DataFrame = df.drop(columns=columns_to_keep)
correlation_matrix = df.corr().abs()
# Select the upper triangle of the correlation matrix
upper_tri = correlation_matrix.where(
np.triu(np.ones(correlation_matrix.shape), k=1).astype(bool)
)
columns_to_keep.update([col for col in upper_tri.columns if all(upper_tri[col] <= CORRELATION_THRESHOLD)])
for file in parquet_files:
print(f'Removing not important predictors from {file.name}')
df = pd.read_parquet(file)
columns_to_drop = [col for col in df.columns if col not in columns_to_keep]
df.drop(columns=columns_to_drop)
df.to_parquet(out_dir / file.name)
def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: Path, parallelize: bool = True) -> None:
filewise_transformation_out_dir = out_dir / 'filewise_transformation'
full_dataset_transformation_out_dir = out_dir / 'full_dataset_transformation'
preprocessing_utils.recreate_dir(out_dir)
preprocessing_utils.recreate_dir(filewise_transformation_out_dir)
preprocessing_utils.recreate_dir(full_dataset_transformation_out_dir)
parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet)
......@@ -144,7 +184,7 @@ def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file:
state_id_name_mapping=state_id_name_mapping,
column_name_type_mapping=column_name_type_mapping,
shared_columns=shared_columns,
out_dir=out_dir,
out_dir=filewise_transformation_out_dir,
)
if parallelize:
......@@ -162,3 +202,5 @@ def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file:
else:
for file in parquet_files:
_transform_parquet_file_function_with_args(file)
_transform_complete_dataset(filewise_transformation_out_dir, full_dataset_transformation_out_dir)
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment