Skip to content
Snippets Groups Projects
Commit 771a5349 authored by Andri Joos's avatar Andri Joos :blush:
Browse files

string columns to classes

parent 6bfb8962
No related branches found
No related tags found
No related merge requests found
......@@ -20,6 +20,10 @@ MAX_DATASET_MEMORY_SIZE = 16602933278
MIN_JOBS = 2
VARIANCE_THRESHOLD = 0.01
CORRELATION_THRESHOLD = 0.9
Y_CLASS_COLUMN = 'Maneuver'
MANUALLY_EXCLUDED_COLUMNS = [
'Tablet_Endpoint',
]
def _ensure_shape(array: NDArray, shape: Tuple) -> NDArray:
array_shape = array.shape
......@@ -89,9 +93,9 @@ def _apply_labels(df: pd.DataFrame, json_file: Path) -> pd.DataFrame:
mask = (df.index.get_level_values('TimeStamp') >= maneuver.start_timestamp) & \
(df.index.get_level_values('TimeStamp') <= maneuver.end_timestamp)
df.loc[mask, 'Maneuver'] = maneuver.label
df.loc[mask, Y_CLASS_COLUMN] = maneuver.label
return df.fillna({'Maneuver': JsonManeuverData.NO_MANEUVER_LABEL})
return df.fillna({Y_CLASS_COLUMN: JsonManeuverData.NO_MANEUVER_LABEL})
def _transform_parquet_file(
file: Path,
......@@ -118,6 +122,9 @@ def _transform_parquet_file(
# Rename columns
df.rename(columns=lambda col: state_id_name_mapping[col], inplace=True)
# Drop manually evaluated columns
df = df.drop(columns=MANUALLY_EXCLUDED_COLUMNS)
# Parse columns
df = _cast_columns(df, column_name_type_mapping)
......@@ -125,7 +132,7 @@ def _transform_parquet_file(
df = _split_array_column(df)
# Drop string columns
df = _remove_string_columns(df)
# df = _remove_string_columns(df)
# Add labels
df = _apply_labels(df, json_file)
......@@ -136,9 +143,19 @@ def _transform_parquet_file(
print(f'Processed {filename}')
def _transform_complete_dataset(dataset_dir: Path, out_dir: Path):
def _transform_complete_dataset(dataset_dir: Path, out_dir: Path) -> Path:
string_columns_as_classes_out_dir = out_dir / 'str_columns_as_classes'
preprocessing_utils.recreate_dir(string_columns_as_classes_out_dir)
parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet)
_remove_unimportant_predictors(parquet_files, out_dir)
_string_columns_to_classes(parquet_files, string_columns_as_classes_out_dir)
unimportant_predictors_removed_out_dir = out_dir / 'removed_unimportant_predictors'
preprocessing_utils.recreate_dir(unimportant_predictors_removed_out_dir)
train_files = preprocessing_utils.train_files_from_dataset(string_columns_as_classes_out_dir)
all_files = preprocessing_utils.files_from_dataset(string_columns_as_classes_out_dir, FileType.Parquet)
_remove_unimportant_predictors(train_files, all_files, unimportant_predictors_removed_out_dir)
return unimportant_predictors_removed_out_dir
def _shared_columns(parquet_files: List[Path]) -> Set[str]:
if len(parquet_files) == 0:
......@@ -151,10 +168,10 @@ def _shared_columns(parquet_files: List[Path]) -> Set[str]:
return shared_columns
def _remove_unimportant_predictors(parquet_files: List[Path], out_dir: Path) -> None:
columns_to_keep: Set[str] = {'Maneuver'}
def _remove_unimportant_predictors(train_files: List[Path], all_files: List[Path], out_dir: Path) -> None:
columns_to_keep: Set[str] = {Y_CLASS_COLUMN}
for file in parquet_files:
for file in train_files:
print(f'Collecting important predictors from {file.name}')
df = pd.read_parquet(file)
......@@ -172,7 +189,7 @@ def _remove_unimportant_predictors(parquet_files: List[Path], out_dir: Path) ->
columns_to_keep.update([col for col in upper_tri.columns if all(upper_tri[col] <= CORRELATION_THRESHOLD)])
for file in parquet_files:
for file in all_files:
print(f'Removing not important predictors from {file.name}')
df = pd.read_parquet(file)
......@@ -181,6 +198,33 @@ def _remove_unimportant_predictors(parquet_files: List[Path], out_dir: Path) ->
df.to_parquet(out_dir / file.name)
def _string_columns_to_classes(parquet_files: List[Path], out_dir: Path) -> None:
str_column_values: Dict[str, Set[str]] = {}
for file in parquet_files:
print(f'Collecting string classes from {file.stem}')
df = pd.read_parquet(file)
for column in df.columns:
if preprocessing_utils.is_column_of_type(df[column], str) and column != Y_CLASS_COLUMN:
if str_column_values.get(column) is None:
str_column_values[column] = set()
str_column_values[column].update(df[column].unique())
for file in parquet_files:
print(f'Applying classes to {file.stem}')
df = pd.read_parquet(file)
for column in str_column_values.keys():
one_hot = pd.get_dummies(df[column], prefix=column, dtype=np.int32)
one_hot_columns = [f"{column}_{value}" for value in str_column_values[column]]
one_hot = one_hot.reindex(columns=one_hot_columns, fill_value=0)
df = df.drop(columns=[column])
df = pd.concat([df, one_hot], axis=1)
df.to_parquet(out_dir / file.name)
def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: Path, parallelize: bool = True) -> None:
filewise_transformation_out_dir = out_dir / 'filewise_transformation'
full_dataset_transformation_out_dir = out_dir / 'full_dataset_transformation'
......
from typing import List
from typing import List, Callable
import shutil
import os
from pathlib import Path
import pandas as pd
from .file_type import FileType
from . import common_filenames
def recreate_dir(dir: Path) -> None:
if dir.exists():
......@@ -12,8 +13,19 @@ def recreate_dir(dir: Path) -> None:
os.makedirs(dir)
def files_from_dataset(dataset_dir: Path, dataType: FileType):
return [path for path in dataset_dir.glob(f'*{dataType.file_extension}') if path.is_file()]
def files_from_dataset(dataset_dir: Path, file_type: FileType) -> List[Path]:
return _files_from_dataset_where(dataset_dir, file_type)
def is_column_of_type(column: pd.Series, type: type):
def is_column_of_type(column: pd.Series, type: type) -> bool:
return isinstance(column.values[0], type)
def _files_from_dataset_where(dataset_dir: Path, file_type: FileType, predicate: Callable[[Path], bool] = None) -> List[Path]:
if predicate is None:
predicate = lambda _: True
return [path for path in dataset_dir.glob(f'*{file_type.file_extension}') if path.is_file() and predicate(path)]
def train_files_from_dataset(dataset_dir: Path, file_type: FileType = FileType.Parquet) -> List[Path]:
return _files_from_dataset_where(dataset_dir, file_type, lambda p: p.stem not in common_filenames.TEST_DATA_FILES)
def test_files_from_dataset(dataset_dir: Path, file_type: FileType = FileType.Parquet) -> List[Path]:
return _files_from_dataset_where(dataset_dir, file_type, lambda p: p.stem in common_filenames.TEST_DATA_FILES)
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment