diff --git a/.vscode/launch.json b/.vscode/launch.json index e3fe23f6bd59667d905188d540035a620e9cf882..8c911f608c001d9f010f8f014289b44f7ae8c7f6 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -5,11 +5,19 @@ "version": "0.2.0", "configurations": [ { - "name": "Python Debugger: main", + "name": "Python Debugger: preprocessing", "type": "debugpy", "request": "launch", "program": "${workspaceFolder}/app/main.py", "console": "integratedTerminal", - } + "args": [ + "preprocess", + "--dataset-dir", + "dataset/plain", + "--out", + "dataset/preprocessed", + "--no-parallelization", + ], + }, ] } diff --git a/app/main.py b/app/main.py index 2910736c01cfa4f41e438536dd3e4b379d3a1be7..94306b9a2bfdcbdbcb60a459bef289fbd4d7351d 100644 --- a/app/main.py +++ b/app/main.py @@ -1,7 +1,26 @@ -from argparse import ArgumentParser +import argparse +from pathlib import Path +from app.preprocessing import preprocess + +DATASET_DIR_ARG = '--dataset-dir' +DATASET_DIR_ARG_SHORT = '-d' +OUT_DIR_ARG = '--out' +OUT_DIR_ARG_SHORT = '-o' +TEST_DATA_FILES_ARG = '--test-data-files' +EXCLUDED_FILES_ARG = '--excluded-files' +PARALLELIZATION_ARG = '--parallelization' def main(): - argument_parser = ArgumentParser('vqcfim', description='Virtual Quality Control for Injection Molding') + argument_parser = argparse.ArgumentParser('maneuver-detection', description='Maneuver Detection for the Loft Dynamics Helicopter Simulator') + subparsers = argument_parser.add_subparsers(title='action') + + preprocess_parser = subparsers.add_parser('preprocess') + preprocess_parser.add_argument(DATASET_DIR_ARG, DATASET_DIR_ARG_SHORT, action='store', type=Path, required=True) + preprocess_parser.add_argument(OUT_DIR_ARG, OUT_DIR_ARG_SHORT, action='store', type=Path, required=True) + preprocess_parser.add_argument(TEST_DATA_FILES_ARG, action='store', type=str, required=False, nargs='*', default=None, help="Only the filenames without the extension") + preprocess_parser.add_argument(EXCLUDED_FILES_ARG, action='store', type=str, required=False, nargs='*', default=None) + preprocess_parser.add_argument(PARALLELIZATION_ARG, action=argparse.BooleanOptionalAction, type=bool, required=False, default=True) + preprocess_parser.set_defaults(func=lambda dataset_dir, out, test_data_files, excluded_files, parallelization, func: preprocess(dataset_dir, out, test_data_files, excluded_files, parallelization)) parsed_args = argument_parser.parse_args() args = vars(parsed_args) diff --git a/app/preprocessing/__init__.py b/app/preprocessing/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..a465ee7fdbdfbb837dbdb8d9b1eb8fb15d8e6cd1 --- /dev/null +++ b/app/preprocessing/__init__.py @@ -0,0 +1 @@ +from .preprocess import preprocess diff --git a/app/preprocessing/common_filenames.py b/app/preprocessing/common_filenames.py new file mode 100644 index 0000000000000000000000000000000000000000..dacd7b0b3bf5284a6476a8e6578ed1f1180ffebd --- /dev/null +++ b/app/preprocessing/common_filenames.py @@ -0,0 +1,16 @@ +from typing import List + +STATE_DESCRIPTION_FILENAME = "StateDescriptions.json" + +TEST_DATA_FILES: List[str] = [ + "0b3f3902-2c04-4625-8576-3bb963e3d709", + "663f573a-74c5-4368-b60b-1fb433cd835d", + "8c36586f-94e9-4ae9-8384-0f3342008677", + "a376807a-82d3-4526-b19f-98d4b3f9078b", + "d76bb0eb-bc08-4b35-8c1f-37369452083d", + "f40f71de-5cc2-4719-8a5a-abcf950cbd71", +] + +EXCLUDED_FILES: List[str] = [ + STATE_DESCRIPTION_FILENAME, +] diff --git a/app/preprocessing/file_type.py b/app/preprocessing/file_type.py new file mode 100644 index 0000000000000000000000000000000000000000..94ecd5fbca0c46e2c3d52d5619fceeac1f1e40ff --- /dev/null +++ b/app/preprocessing/file_type.py @@ -0,0 +1,18 @@ +from enum import Enum + + +class FileType(Enum): + Parquet = 1 + Json = 2 + + @property + def file_extension(self): + match self.value: + case FileType.Parquet.value: + return '.parquet' + + case FileType.Json.value: + return '.json' + + case _: + raise ValueError('DataType must be Parquet or Json') diff --git a/app/preprocessing/group_files.py b/app/preprocessing/group_files.py new file mode 100644 index 0000000000000000000000000000000000000000..d1fabda44f27f0df4213f340fc1a92833302bb25 --- /dev/null +++ b/app/preprocessing/group_files.py @@ -0,0 +1,63 @@ +from typing import List, Tuple +from pathlib import Path +import shutil +import os + +import utils as preprocessing_utils +import common_filenames +from file_type import FileType + +def _train_test_split(files: List[Path], test_data_files: List[str]) -> Tuple[List[Path], List[Path]]: + train_files: List[Path] = [] + test_files: List[Path] = [] + + for file in files: + if file.stem in test_data_files: + test_files.append(file) + else: + train_files.append(file) + + return train_files, test_files + +def exclude_files_by_stem(files: List[Path], excluded_files: List[str]) -> List[Path]: + return [file for file in files if file.stem not in excluded_files] + +def _copy_files(files: List[Path], destination: Path) -> None: + for file in files: + destination_filename = destination / file.name + shutil.copyfile(file, destination_filename) + +def group_dataset(dataset_dir: Path, out_dir: Path, test_data_files: List[str] = None, excluded_files: List[str] = None) -> None: + test_data_files = test_data_files if test_data_files is not None else common_filenames.TEST_DATA_FILES + excluded_files = excluded_files if excluded_files is not None else common_filenames.EXCLUDED_FILES + + preprocessing_utils.recreate_dir(out_dir) + + training_dir = out_dir / 'training' + test_dir = out_dir / 'test' + + parquet_subdir = 'parquet' + json_subdir = 'json' + training_parquet_dir = training_dir / parquet_subdir + training_json_dir = training_dir / json_subdir + test_parquet_dir = test_dir / parquet_subdir + test_json_dir = test_dir / json_subdir + + os.makedirs(training_parquet_dir, exist_ok=True) + os.makedirs(training_json_dir, exist_ok=True) + os.makedirs(test_parquet_dir, exist_ok=True) + os.makedirs(test_json_dir, exist_ok=True) + + parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet) + json_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Json) + + parquet_files = exclude_files_by_stem(parquet_files, excluded_files) + json_files = exclude_files_by_stem(json_files, excluded_files) + + train_parquet_files, test_parquet_files = _train_test_split(parquet_files, test_data_files) + train_json_files, test_json_files = _train_test_split(json_files, test_data_files) + + _copy_files(train_parquet_files, training_parquet_dir) + _copy_files(test_parquet_files, test_parquet_dir) + _copy_files(train_json_files, training_json_dir) + _copy_files(test_json_files, test_json_dir) diff --git a/app/preprocessing/preprocess.py b/app/preprocessing/preprocess.py new file mode 100644 index 0000000000000000000000000000000000000000..adc844197117d055fb55f53ba72a33f763864633 --- /dev/null +++ b/app/preprocessing/preprocess.py @@ -0,0 +1,18 @@ +from pathlib import Path +from typing import List + +from .common_filenames import STATE_DESCRIPTION_FILENAME +from .transform_dataset import transform_dataset + +def preprocess( + dataset_dir: Path, + out_dir: Path, + test_data_files: List[str] = None, + excluded_files: List[str] = None, + parallelize: bool = True, +) -> None: + transform_out_dir = out_dir / 'transformed_data' + transform_dataset(dataset_dir, transform_out_dir, dataset_dir / STATE_DESCRIPTION_FILENAME, parallelize) + + # grouped_out_dir = out_dir / 'grouped' + # _group_dataset(dataset_dir, grouped_out_dir, test_data_files, excluded_files) diff --git a/app/preprocessing/transform_dataset.py b/app/preprocessing/transform_dataset.py new file mode 100644 index 0000000000000000000000000000000000000000..28c97c1f3f54ec746e84cfceef856dd3b073f3b0 --- /dev/null +++ b/app/preprocessing/transform_dataset.py @@ -0,0 +1,116 @@ +from typing import Dict, Any, Tuple +import pandas as pd +from pathlib import Path +import json +import numpy as np +from numpy.typing import NDArray +import re +from joblib import Parallel, delayed +from functools import partial +import psutil +import multiprocessing + +from . import utils as preprocessing_utils +from .file_type import FileType + +DOUBLE_PATTERN = r'Double(\d+)' +MAX_DATASET_MEMORY_SIZE = 7408802660 +MIN_JOBS = 2 + +def _ensure_shape(array: NDArray, shape: Tuple) -> NDArray: + array_shape = array.shape + if array_shape != shape: + raise ValueError(f'Array shape {array_shape} does not match the required shape {shape}.') + + return array + + +def _cast_columns(df: pd.DataFrame, column_type_mapping: Dict[str | int, str]) -> pd.DataFrame: + for column in df.columns: + column_type = column_type_mapping[column] + double_match = re.match(DOUBLE_PATTERN, column_type) + + if column_type == 'Double': + df[column] = df[column].astype(np.float64) + + elif column_type == 'Int32': + df[column] = df[column].astype(np.int32) + + elif column_type == 'Boolean': + df[column] = df[column].astype(np.int32) + + elif column_type == 'String': + df[column] = df[column].astype(str) + + elif double_match: + vector_rows = int(double_match.group(1)) # it is certain that this is an int because of the pattern + df[column] = df[column].apply(lambda vec: np.array(vec, dtype=np.float64)).apply(lambda arr: _ensure_shape(arr, (vector_rows,))) + + else: + raise ValueError(f'Unexpected type {column_type}') + + return df + +def _transform_parquet_file( + file: Path, + state_id_name_mapping: Dict[int, str], + column_name_type_mapping: Dict[str, str], + out_dir: Path +) -> None: + pd.set_option('future.no_silent_downcasting', True) + filename = file.name + + print(f'Reading {filename}') + df = pd.read_parquet(file) + + print(f'Processing {filename}') + df = df.sort_values(by='FrameCounter') + + # Forward fill + df = df.ffill() + + # Rename columns + df.rename(columns=lambda col: state_id_name_mapping[col], inplace=True) + + # Parse columns + df = _cast_columns(df, column_name_type_mapping) + + print(f'Saving {filename}') + df.to_parquet(out_dir / filename) + # df.to_csv(out_dir / f'{file.stem}.csv') + + print(f'Processed {filename}') + +def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: Path, parallelize: bool = True) -> None: + preprocessing_utils.recreate_dir(out_dir) + + state_id_name_mapping: Dict[int, str] = None + column_name_type_mapping: Dict[str, str] = None + with open(state_description_file, 'r') as f: + state_descriptions: Dict[str, Dict[str, Any]] = json.load(f) + state_id_name_mapping = {v['stateId']: k for k, v in state_descriptions.items()} + column_name_type_mapping = {k: v['dataType'] for k, v in state_descriptions.items()} + + parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet) + + _transform_parquet_file_function_with_args = partial(_transform_parquet_file, + state_id_name_mapping=state_id_name_mapping, + column_name_type_mapping=column_name_type_mapping, + out_dir=out_dir, + ) + + if parallelize: + cpu_count = multiprocessing.cpu_count() + n_jobs_based_on_cpu = max(1, cpu_count - 1) # Leave one core free for the system + + available_memory = psutil.virtual_memory().available + n_jobs_based_on_memory = available_memory // MAX_DATASET_MEMORY_SIZE + + if n_jobs_based_on_memory < MIN_JOBS: + print('Your system may run out of memory. In this case, don\'t use parallelization.') + + n_jobs = max(MIN_JOBS, min(n_jobs_based_on_cpu, n_jobs_based_on_memory)) + Parallel(n_jobs=n_jobs)(delayed(_transform_parquet_file_function_with_args)(file) for file in parquet_files) + else: + for file in parquet_files: + _transform_parquet_file_function_with_args(file) diff --git a/app/preprocessing/utils.py b/app/preprocessing/utils.py new file mode 100644 index 0000000000000000000000000000000000000000..56d1250e663f3289054faf110c3a50a9daa44fd6 --- /dev/null +++ b/app/preprocessing/utils.py @@ -0,0 +1,14 @@ +from typing import List +import shutil +import os +from pathlib import Path +from .file_type import FileType + +def recreate_dir(dir: Path) -> None: + if dir.exists(): + shutil.rmtree(dir) + + os.makedirs(dir) + +def files_from_dataset(dataset_dir: Path, dataType: FileType): + return [path for path in dataset_dir.glob(f'*{dataType.file_extension}') if path.is_file()] diff --git a/pyproject.toml b/pyproject.toml index b7ffa8ee181b35c77371c2da88521fe443d94a3d..ecccbe70c040824f9a3ab5e4d1af7e93fdd5be94 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,6 +3,10 @@ name = "maneuver-detection" version = "1.0.0" description = "Maneuver Detection for the Loft Dynamics Helicopter Simulator" dependencies = [ + "pandas >= 2.2.3, < 3.0.0", + "pyarrow >= 18.0.0, < 19.0.0", + "joblib >= 1.4.2, < 2.0.0", + "psutil >= 6.1.0, < 7.0.0", ] maintainers = [ { name = "Andri Joos" },