Skip to content
Snippets Groups Projects
Commit 73a4bb5b authored by Andri Joos's avatar Andri Joos :blush:
Browse files

Merge branch 'preprocessing'

parents 0d8e411d 1afd5952
No related branches found
No related tags found
No related merge requests found
......@@ -5,11 +5,19 @@
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: main",
"name": "Python Debugger: preprocessing",
"type": "debugpy",
"request": "launch",
"program": "${workspaceFolder}/app/main.py",
"console": "integratedTerminal",
}
"args": [
"preprocess",
"--dataset-dir",
"dataset/plain",
"--out",
"dataset/preprocessed",
"--no-parallelization",
],
},
]
}
from argparse import ArgumentParser
import argparse
from pathlib import Path
from app.preprocessing import preprocess
DATASET_DIR_ARG = '--dataset-dir'
DATASET_DIR_ARG_SHORT = '-d'
OUT_DIR_ARG = '--out'
OUT_DIR_ARG_SHORT = '-o'
TEST_DATA_FILES_ARG = '--test-data-files'
EXCLUDED_FILES_ARG = '--excluded-files'
PARALLELIZATION_ARG = '--parallelization'
def main():
argument_parser = ArgumentParser('vqcfim', description='Virtual Quality Control for Injection Molding')
argument_parser = argparse.ArgumentParser('maneuver-detection', description='Maneuver Detection for the Loft Dynamics Helicopter Simulator')
subparsers = argument_parser.add_subparsers(title='action')
preprocess_parser = subparsers.add_parser('preprocess')
preprocess_parser.add_argument(DATASET_DIR_ARG, DATASET_DIR_ARG_SHORT, action='store', type=Path, required=True)
preprocess_parser.add_argument(OUT_DIR_ARG, OUT_DIR_ARG_SHORT, action='store', type=Path, required=True)
preprocess_parser.add_argument(TEST_DATA_FILES_ARG, action='store', type=str, required=False, nargs='*', default=None, help="Only the filenames without the extension")
preprocess_parser.add_argument(EXCLUDED_FILES_ARG, action='store', type=str, required=False, nargs='*', default=None)
preprocess_parser.add_argument(PARALLELIZATION_ARG, action=argparse.BooleanOptionalAction, type=bool, required=False, default=True)
preprocess_parser.set_defaults(func=lambda dataset_dir, out, test_data_files, excluded_files, parallelization, func: preprocess(dataset_dir, out, test_data_files, excluded_files, parallelization))
parsed_args = argument_parser.parse_args()
args = vars(parsed_args)
......
from .preprocess import preprocess
from typing import List
STATE_DESCRIPTION_FILENAME = "StateDescriptions.json"
TEST_DATA_FILES: List[str] = [
"0b3f3902-2c04-4625-8576-3bb963e3d709",
"663f573a-74c5-4368-b60b-1fb433cd835d",
"8c36586f-94e9-4ae9-8384-0f3342008677",
"a376807a-82d3-4526-b19f-98d4b3f9078b",
"d76bb0eb-bc08-4b35-8c1f-37369452083d",
"f40f71de-5cc2-4719-8a5a-abcf950cbd71",
]
EXCLUDED_FILES: List[str] = [
STATE_DESCRIPTION_FILENAME,
]
from enum import Enum
class FileType(Enum):
Parquet = 1
Json = 2
@property
def file_extension(self):
match self.value:
case FileType.Parquet.value:
return '.parquet'
case FileType.Json.value:
return '.json'
case _:
raise ValueError('DataType must be Parquet or Json')
from typing import List, Tuple
from pathlib import Path
import shutil
import os
import utils as preprocessing_utils
import common_filenames
from file_type import FileType
def _train_test_split(files: List[Path], test_data_files: List[str]) -> Tuple[List[Path], List[Path]]:
train_files: List[Path] = []
test_files: List[Path] = []
for file in files:
if file.stem in test_data_files:
test_files.append(file)
else:
train_files.append(file)
return train_files, test_files
def exclude_files_by_stem(files: List[Path], excluded_files: List[str]) -> List[Path]:
return [file for file in files if file.stem not in excluded_files]
def _copy_files(files: List[Path], destination: Path) -> None:
for file in files:
destination_filename = destination / file.name
shutil.copyfile(file, destination_filename)
def group_dataset(dataset_dir: Path, out_dir: Path, test_data_files: List[str] = None, excluded_files: List[str] = None) -> None:
test_data_files = test_data_files if test_data_files is not None else common_filenames.TEST_DATA_FILES
excluded_files = excluded_files if excluded_files is not None else common_filenames.EXCLUDED_FILES
preprocessing_utils.recreate_dir(out_dir)
training_dir = out_dir / 'training'
test_dir = out_dir / 'test'
parquet_subdir = 'parquet'
json_subdir = 'json'
training_parquet_dir = training_dir / parquet_subdir
training_json_dir = training_dir / json_subdir
test_parquet_dir = test_dir / parquet_subdir
test_json_dir = test_dir / json_subdir
os.makedirs(training_parquet_dir, exist_ok=True)
os.makedirs(training_json_dir, exist_ok=True)
os.makedirs(test_parquet_dir, exist_ok=True)
os.makedirs(test_json_dir, exist_ok=True)
parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet)
json_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Json)
parquet_files = exclude_files_by_stem(parquet_files, excluded_files)
json_files = exclude_files_by_stem(json_files, excluded_files)
train_parquet_files, test_parquet_files = _train_test_split(parquet_files, test_data_files)
train_json_files, test_json_files = _train_test_split(json_files, test_data_files)
_copy_files(train_parquet_files, training_parquet_dir)
_copy_files(test_parquet_files, test_parquet_dir)
_copy_files(train_json_files, training_json_dir)
_copy_files(test_json_files, test_json_dir)
from pathlib import Path
from typing import List
from .common_filenames import STATE_DESCRIPTION_FILENAME
from .transform_dataset import transform_dataset
def preprocess(
dataset_dir: Path,
out_dir: Path,
test_data_files: List[str] = None,
excluded_files: List[str] = None,
parallelize: bool = True,
) -> None:
transform_out_dir = out_dir / 'transformed_data'
transform_dataset(dataset_dir, transform_out_dir, dataset_dir / STATE_DESCRIPTION_FILENAME, parallelize)
# grouped_out_dir = out_dir / 'grouped'
# _group_dataset(dataset_dir, grouped_out_dir, test_data_files, excluded_files)
from typing import Dict, Any, Tuple
import pandas as pd
from pathlib import Path
import json
import numpy as np
from numpy.typing import NDArray
import re
from joblib import Parallel, delayed
from functools import partial
import psutil
import multiprocessing
from . import utils as preprocessing_utils
from .file_type import FileType
DOUBLE_PATTERN = r'Double(\d+)'
MAX_DATASET_MEMORY_SIZE = 7408802660
MIN_JOBS = 2
def _ensure_shape(array: NDArray, shape: Tuple) -> NDArray:
array_shape = array.shape
if array_shape != shape:
raise ValueError(f'Array shape {array_shape} does not match the required shape {shape}.')
return array
def _cast_columns(df: pd.DataFrame, column_type_mapping: Dict[str | int, str]) -> pd.DataFrame:
for column in df.columns:
column_type = column_type_mapping[column]
double_match = re.match(DOUBLE_PATTERN, column_type)
if column_type == 'Double':
df[column] = df[column].astype(np.float64)
elif column_type == 'Int32':
df[column] = df[column].astype(np.int32)
elif column_type == 'Boolean':
df[column] = df[column].astype(np.int32)
elif column_type == 'String':
df[column] = df[column].astype(str)
elif double_match:
vector_rows = int(double_match.group(1)) # it is certain that this is an int because of the pattern
df[column] = df[column].apply(lambda vec: np.array(vec, dtype=np.float64)).apply(lambda arr: _ensure_shape(arr, (vector_rows,)))
else:
raise ValueError(f'Unexpected type {column_type}')
return df
def _transform_parquet_file(
file: Path,
state_id_name_mapping: Dict[int, str],
column_name_type_mapping: Dict[str, str],
out_dir: Path
) -> None:
pd.set_option('future.no_silent_downcasting', True)
filename = file.name
print(f'Reading {filename}')
df = pd.read_parquet(file)
print(f'Processing {filename}')
df = df.sort_values(by='FrameCounter')
# Forward fill
df = df.ffill()
# Rename columns
df.rename(columns=lambda col: state_id_name_mapping[col], inplace=True)
# Parse columns
df = _cast_columns(df, column_name_type_mapping)
print(f'Saving {filename}')
df.to_parquet(out_dir / filename)
# df.to_csv(out_dir / f'{file.stem}.csv')
print(f'Processed {filename}')
def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: Path, parallelize: bool = True) -> None:
preprocessing_utils.recreate_dir(out_dir)
state_id_name_mapping: Dict[int, str] = None
column_name_type_mapping: Dict[str, str] = None
with open(state_description_file, 'r') as f:
state_descriptions: Dict[str, Dict[str, Any]] = json.load(f)
state_id_name_mapping = {v['stateId']: k for k, v in state_descriptions.items()}
column_name_type_mapping = {k: v['dataType'] for k, v in state_descriptions.items()}
parquet_files = preprocessing_utils.files_from_dataset(dataset_dir, FileType.Parquet)
_transform_parquet_file_function_with_args = partial(_transform_parquet_file,
state_id_name_mapping=state_id_name_mapping,
column_name_type_mapping=column_name_type_mapping,
out_dir=out_dir,
)
if parallelize:
cpu_count = multiprocessing.cpu_count()
n_jobs_based_on_cpu = max(1, cpu_count - 1) # Leave one core free for the system
available_memory = psutil.virtual_memory().available
n_jobs_based_on_memory = available_memory // MAX_DATASET_MEMORY_SIZE
if n_jobs_based_on_memory < MIN_JOBS:
print('Your system may run out of memory. In this case, don\'t use parallelization.')
n_jobs = max(MIN_JOBS, min(n_jobs_based_on_cpu, n_jobs_based_on_memory))
Parallel(n_jobs=n_jobs)(delayed(_transform_parquet_file_function_with_args)(file) for file in parquet_files)
else:
for file in parquet_files:
_transform_parquet_file_function_with_args(file)
from typing import List
import shutil
import os
from pathlib import Path
from .file_type import FileType
def recreate_dir(dir: Path) -> None:
if dir.exists():
shutil.rmtree(dir)
os.makedirs(dir)
def files_from_dataset(dataset_dir: Path, dataType: FileType):
return [path for path in dataset_dir.glob(f'*{dataType.file_extension}') if path.is_file()]
......@@ -3,6 +3,10 @@ name = "maneuver-detection"
version = "1.0.0"
description = "Maneuver Detection for the Loft Dynamics Helicopter Simulator"
dependencies = [
"pandas >= 2.2.3, < 3.0.0",
"pyarrow >= 18.0.0, < 19.0.0",
"joblib >= 1.4.2, < 2.0.0",
"psutil >= 6.1.0, < 7.0.0",
]
maintainers = [
{ name = "Andri Joos" },
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment