Skip to content
Snippets Groups Projects
Commit ab6f7296 authored by Andri Joos's avatar Andri Joos :blush:
Browse files

Merge branch 'labels'

parents 73a4bb5b 771a5349
No related branches found
No related tags found
No related merge requests found
......@@ -15,7 +15,7 @@
"--dataset-dir",
"dataset/plain",
"--out",
"dataset/preprocessed",
"dataset/preprocessing",
"--no-parallelization",
],
},
......
from __future__ import annotations
from typing import Dict
from datetime import datetime
import pytz
class JsonManeuverData:
_label: str
_start_timestamp: datetime
_end_timestamp: datetime
TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
NO_MANEUVER_LABEL = "no maneuver"
@property
def label(self) -> str:
return self._label
@property
def start_timestamp(self) -> datetime:
return self._start_timestamp
@property
def end_timestamp(self) -> datetime:
return self._end_timestamp
def __init__(self, label: str, start_timestamp: datetime, end_timestamp: datetime):
self._label = label
self._start_timestamp = start_timestamp
self._end_timestamp = end_timestamp
@staticmethod
def fromJson(json_data: Dict[str, str]) -> JsonManeuverData:
label = json_data['comment'].lower()
start_timestamp = datetime.strptime(json_data['startTimestamp'], JsonManeuverData.TIMESTAMP_FORMAT).replace(tzinfo=pytz.UTC)
end_timestamp = datetime.strptime(json_data['endTimeStamp'], JsonManeuverData.TIMESTAMP_FORMAT).replace(tzinfo=pytz.UTC)
return JsonManeuverData(label, start_timestamp, end_timestamp)
from typing import Dict, Any, Tuple
from typing import Dict, Any, Tuple, List, Set
import pandas as pd
from pathlib import Path
import json
......@@ -9,13 +9,21 @@ from joblib import Parallel, delayed
from functools import partial
import psutil
import multiprocessing
import pyarrow.parquet
from . import utils as preprocessing_utils
from .file_type import FileType
from .json_maneuver_data import JsonManeuverData
DOUBLE_PATTERN = r'Double(\d+)'
MAX_DATASET_MEMORY_SIZE = 7408802660
MAX_DATASET_MEMORY_SIZE = 16602933278
MIN_JOBS = 2
VARIANCE_THRESHOLD = 0.01
CORRELATION_THRESHOLD = 0.9
Y_CLASS_COLUMN = 'Maneuver'
MANUALLY_EXCLUDED_COLUMNS = [
'Tablet_Endpoint',
]
def _ensure_shape(array: NDArray, shape: Tuple) -> NDArray:
array_shape = array.shape
......@@ -31,31 +39,71 @@ def _cast_columns(df: pd.DataFrame, column_type_mapping: Dict[str | int, str]) -
double_match = re.match(DOUBLE_PATTERN, column_type)
if column_type == 'Double':
df[column] = df[column].astype(np.float64)
df[column] = df[column].astype('Float64')
elif column_type == 'Int32':
df[column] = df[column].astype(np.int32)
df[column] = df[column].astype('Int32')
elif column_type == 'Boolean':
df[column] = df[column].astype(np.int32)
df[column] = df[column].astype('Int32')
elif column_type == 'String':
df[column] = df[column].astype(str)
elif double_match:
vector_rows = int(double_match.group(1)) # it is certain that this is an int because of the pattern
df[column] = df[column].apply(lambda vec: np.array(vec, dtype=np.float64)).apply(lambda arr: _ensure_shape(arr, (vector_rows,)))
df[column] = df[column] \
.apply(lambda vec: np.array(vec, dtype=np.float64)) \
.apply(lambda arr: _ensure_shape(arr, (vector_rows,)) if isinstance(arr, np.ndarray) else arr) # if it is not instance of np.ndarray, it is NaN (empty cell)
else:
raise ValueError(f'Unexpected type {column_type}')
return df
def _split_array_column(df: pd.DataFrame) -> pd.DataFrame:
array_columns = [col for col in df.columns if preprocessing_utils.is_column_of_type(df[col], np.ndarray)]
for column in array_columns:
array_dtype = df[column].iloc[0].dtype # First row must have a value
stacked_arrays = np.stack(df[column].values, dtype=array_dtype) # is faster than df[column].apply(lambda vec: pd.Series(vec, dtype=array_dtype))
expanded_columns = pd.DataFrame(stacked_arrays, index=df.index)
expanded_columns.columns = [f'{column}_{i}' for i in range(expanded_columns.shape[1])]
df = df.join(expanded_columns).drop(columns=[column])
return df
def _remove_string_columns(df: pd.DataFrame) -> pd.DataFrame:
string_columns = [col for col in df.columns if preprocessing_utils.is_column_of_type(df[col], str)]
return df.drop(columns=string_columns)
def _drop_non_shared_columns(df: pd.DataFrame, shared_columns: Set[str]) -> pd.DataFrame:
columns_to_drop = [column for column in df.columns if str(column) not in shared_columns]
df = df.drop(columns=columns_to_drop)
return df
def _apply_labels(df: pd.DataFrame, json_file: Path) -> pd.DataFrame:
annotations: List[Dict[str, str]] = None
with open(json_file, 'r') as f:
annotations = json.load(f)['annotations']
maneuvers = [JsonManeuverData.fromJson(m) for m in annotations]
for maneuver in maneuvers:
mask = (df.index.get_level_values('TimeStamp') >= maneuver.start_timestamp) & \
(df.index.get_level_values('TimeStamp') <= maneuver.end_timestamp)
df.loc[mask, Y_CLASS_COLUMN] = maneuver.label
return df.fillna({Y_CLASS_COLUMN: JsonManeuverData.NO_MANEUVER_LABEL})
def _transform_parquet_file(
file: Path,
json_file: Path,
state_id_name_mapping: Dict[int, str],
column_name_type_mapping: Dict[str, str],
out_dir: Path
shared_columns: Set[str],
out_dir: Path,
) -> None:
pd.set_option('future.no_silent_downcasting', True)
filename = file.name
......@@ -66,23 +114,133 @@ def _transform_parquet_file(
print(f'Processing {filename}')
df = df.sort_values(by='FrameCounter')
df = _drop_non_shared_columns(df, shared_columns)
# Forward fill
df = df.ffill()
# Rename columns
df.rename(columns=lambda col: state_id_name_mapping[col], inplace=True)
# Drop manually evaluated columns
df = df.drop(columns=MANUALLY_EXCLUDED_COLUMNS)
# Parse columns
df = _cast_columns(df, column_name_type_mapping)
# Split arrays
df = _split_array_column(df)
# Drop string columns
# df = _remove_string_columns(df)
# Add labels
df = _apply_labels(df, json_file)
print(f'Saving {filename}')
df.to_parquet(out_dir / filename)
# df.to_csv(out_dir / f'{file.stem}.csv')
print(f'Processed {filename}')
def _transform_complete_dataset(dataset_dir: Path, out_dir: Path) -> Path:
string_columns_as_classes_out_dir = out_dir / 'str_columns_as_classes'
preprocessing_utils.recreate_dir(string_columns_as_classes_out_dir)
parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet)
_string_columns_to_classes(parquet_files, string_columns_as_classes_out_dir)
unimportant_predictors_removed_out_dir = out_dir / 'removed_unimportant_predictors'
preprocessing_utils.recreate_dir(unimportant_predictors_removed_out_dir)
train_files = preprocessing_utils.train_files_from_dataset(string_columns_as_classes_out_dir)
all_files = preprocessing_utils.files_from_dataset(string_columns_as_classes_out_dir, FileType.Parquet)
_remove_unimportant_predictors(train_files, all_files, unimportant_predictors_removed_out_dir)
return unimportant_predictors_removed_out_dir
def _shared_columns(parquet_files: List[Path]) -> Set[str]:
if len(parquet_files) == 0:
return {}
shared_columns: Set[str] = set(pyarrow.parquet.read_schema(parquet_files[0]).names)
for file in parquet_files[1:]:
columns = pyarrow.parquet.read_schema(file).names
shared_columns.intersection_update(columns)
return shared_columns
def _remove_unimportant_predictors(train_files: List[Path], all_files: List[Path], out_dir: Path) -> None:
columns_to_keep: Set[str] = {Y_CLASS_COLUMN}
for file in train_files:
print(f'Collecting important predictors from {file.name}')
df = pd.read_parquet(file)
columns_to_analyze = [col for col in df.columns if col not in columns_to_keep]
columns_to_keep.update([col for col in columns_to_analyze if np.std(df[col]) >= VARIANCE_THRESHOLD])
df: pd.DataFrame = df.drop(columns=columns_to_keep)
correlation_matrix = df.corr().abs()
# Select the upper triangle of the correlation matrix
upper_tri = correlation_matrix.where(
np.triu(np.ones(correlation_matrix.shape), k=1).astype(bool)
)
columns_to_keep.update([col for col in upper_tri.columns if all(upper_tri[col] <= CORRELATION_THRESHOLD)])
for file in all_files:
print(f'Removing not important predictors from {file.name}')
df = pd.read_parquet(file)
columns_to_drop = [col for col in df.columns if col not in columns_to_keep]
df.drop(columns=columns_to_drop)
df.to_parquet(out_dir / file.name)
def _string_columns_to_classes(parquet_files: List[Path], out_dir: Path) -> None:
str_column_values: Dict[str, Set[str]] = {}
for file in parquet_files:
print(f'Collecting string classes from {file.stem}')
df = pd.read_parquet(file)
for column in df.columns:
if preprocessing_utils.is_column_of_type(df[column], str) and column != Y_CLASS_COLUMN:
if str_column_values.get(column) is None:
str_column_values[column] = set()
str_column_values[column].update(df[column].unique())
for file in parquet_files:
print(f'Applying classes to {file.stem}')
df = pd.read_parquet(file)
for column in str_column_values.keys():
one_hot = pd.get_dummies(df[column], prefix=column, dtype=np.int32)
one_hot_columns = [f"{column}_{value}" for value in str_column_values[column]]
one_hot = one_hot.reindex(columns=one_hot_columns, fill_value=0)
df = df.drop(columns=[column])
df = pd.concat([df, one_hot], axis=1)
df.to_parquet(out_dir / file.name)
def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: Path, parallelize: bool = True) -> None:
filewise_transformation_out_dir = out_dir / 'filewise_transformation'
full_dataset_transformation_out_dir = out_dir / 'full_dataset_transformation'
preprocessing_utils.recreate_dir(out_dir)
preprocessing_utils.recreate_dir(filewise_transformation_out_dir)
preprocessing_utils.recreate_dir(full_dataset_transformation_out_dir)
parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet)
json_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Json)
json_files = [file for file in json_files if file.name != state_description_file.name]
parquet_files.sort()
json_files.sort()
file_tuples = zip(parquet_files, json_files)
shared_columns = _shared_columns(parquet_files)
state_id_name_mapping: Dict[int, str] = None
column_name_type_mapping: Dict[str, str] = None
......@@ -91,12 +249,11 @@ def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file:
state_id_name_mapping = {v['stateId']: k for k, v in state_descriptions.items()}
column_name_type_mapping = {k: v['dataType'] for k, v in state_descriptions.items()}
parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet)
_transform_parquet_file_function_with_args = partial(_transform_parquet_file,
state_id_name_mapping=state_id_name_mapping,
column_name_type_mapping=column_name_type_mapping,
out_dir=out_dir,
shared_columns=shared_columns,
out_dir=filewise_transformation_out_dir,
)
if parallelize:
......@@ -110,7 +267,10 @@ def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file:
print('Your system may run out of memory. In this case, don\'t use parallelization.')
n_jobs = max(MIN_JOBS, min(n_jobs_based_on_cpu, n_jobs_based_on_memory))
Parallel(n_jobs=n_jobs)(delayed(_transform_parquet_file_function_with_args)(file) for file in parquet_files)
print(f'Using {n_jobs} jobs')
Parallel(n_jobs=n_jobs)(delayed(_transform_parquet_file_function_with_args)(parquet_file, json_file) for parquet_file, json_file in file_tuples)
else:
for file in parquet_files:
_transform_parquet_file_function_with_args(file)
for parquet_file, json_file in file_tuples:
_transform_parquet_file_function_with_args(parquet_file, json_file)
_transform_complete_dataset(filewise_transformation_out_dir, full_dataset_transformation_out_dir)
from typing import List
from typing import List, Callable
import shutil
import os
from pathlib import Path
import pandas as pd
from .file_type import FileType
from . import common_filenames
def recreate_dir(dir: Path) -> None:
if dir.exists():
......@@ -10,5 +13,19 @@ def recreate_dir(dir: Path) -> None:
os.makedirs(dir)
def files_from_dataset(dataset_dir: Path, dataType: FileType):
return [path for path in dataset_dir.glob(f'*{dataType.file_extension}') if path.is_file()]
def files_from_dataset(dataset_dir: Path, file_type: FileType) -> List[Path]:
return _files_from_dataset_where(dataset_dir, file_type)
def is_column_of_type(column: pd.Series, type: type) -> bool:
return isinstance(column.values[0], type)
def _files_from_dataset_where(dataset_dir: Path, file_type: FileType, predicate: Callable[[Path], bool] = None) -> List[Path]:
if predicate is None:
predicate = lambda _: True
return [path for path in dataset_dir.glob(f'*{file_type.file_extension}') if path.is_file() and predicate(path)]
def train_files_from_dataset(dataset_dir: Path, file_type: FileType = FileType.Parquet) -> List[Path]:
return _files_from_dataset_where(dataset_dir, file_type, lambda p: p.stem not in common_filenames.TEST_DATA_FILES)
def test_files_from_dataset(dataset_dir: Path, file_type: FileType = FileType.Parquet) -> List[Path]:
return _files_from_dataset_where(dataset_dir, file_type, lambda p: p.stem in common_filenames.TEST_DATA_FILES)
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment