diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 5426f91f8a0bacc92a20aa7e3ffec73f3a498433..aecac5740699fa991feec216e24a570c21b72f31 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -3,7 +3,7 @@ "workspaceFolder": "/workdir", "workspaceMount": "source=${localWorkspaceFolder},target=/workdir,type=bind,consistency=delegated", "remoteUser": "vscode", - "image": "andrijoos/devcontainer-python:3-bookworm", + "image": "andrijoos/devcontainer-python:3.12-bookworm", "customizations": { "vscode": { "extensions": [ @@ -19,5 +19,5 @@ "remoteEnv": { "PATH": "/home/vscode/.local/bin:${containerEnv:PATH}" }, - "postStartCommand": "pip install -e ." + "postCreateCommand": "pip install -e ." } diff --git a/.vscode/launch.json b/.vscode/launch.json index d27f7ce6ce1aca86a170f9dfcecc720eafb26dd9..40d12898d73371c3d2072a3d48240d87534e2291 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -19,5 +19,33 @@ "--no-parallelization", ], }, + { + "name": "Python Debugger: [eval] covariance matrices", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/app/preprocessing/covariance_matrices.py", + "console": "integratedTerminal" + }, + { + "name": "Python Debugger: [eval] blocksize finding", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/app/preprocessing/blocksize_finding.py", + "console": "integratedTerminal" + }, + { + "name": "Python Debugger: [eval] feature selection", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/app/preprocessing/feature_selection.py", + "console": "integratedTerminal" + }, + { + "name": "Python Debugger: [eval] qda", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/app/qda.py", + "console": "integratedTerminal" + }, ] } diff --git a/app/preprocessing/blocksize_finding.py b/app/preprocessing/blocksize_finding.py new file mode 100644 index 0000000000000000000000000000000000000000..0732b55ebb3825222312cd97982959214631c2e9 --- /dev/null +++ b/app/preprocessing/blocksize_finding.py @@ -0,0 +1,139 @@ +import pandas as pd +import numpy as np +from sklearn.metrics import make_scorer, accuracy_score +from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis +import matplotlib.pyplot as plt +from typing import List, Dict, Tuple +import os +import pyarrow.parquet as pq +from pathlib import Path +import pyarrow.types as pat +import pyarrow as pa +# import dask.dataframe as dd +# from dask_ml.model_selection import KFold +from sklearn.model_selection import cross_val_score + +def create_blocks(file: Path, block_size: int, out_dir: Path, step_size: int = 1): + table = pq.read_table(file) + rows = [] + + if table.num_rows < block_size: + return [] + + print(f'Creating blocks with size {block_size} from {file}') + + for i in range(0, table.num_rows - block_size + 1, step_size): + block = table.slice(i, block_size) + last_row = block.slice(block_size - 1, 1) + row = { + 'TimeStamp': last_row['TimeStamp'][0].as_py(), + 'FrameCounter': last_row['FrameCounter'][0].as_py() + } + + for column in table.column_names: + if column not in row.keys(): + column_type = table.schema.field(column).type + if pat.is_floating(column_type) or pat.is_integer(column_type): + column_data = block[column] + row[f'{column}_mean'] = pa.compute.mean(column_data).as_py() + row[f'{column}_min'] = pa.compute.min(column_data).as_py() + row[f'{column}_max'] = pa.compute.max(column_data).as_py() + else: + row[column] = last_row[column][0].as_py() + + rows.append(row) + + df = pd.DataFrame(rows) + df.set_index(['TimeStamp', 'FrameCounter'], inplace=True) + df.sort_index(level=['TimeStamp']) + + out_file = out_dir / file.name + df.to_parquet(out_file) + return out_file + +def split_file_to_maneuvers(file: Path, out_dir: Path) -> List[Path]: + df = pd.read_parquet(file) + df['Group'] = (df['Maneuver'] != df['Maneuver'].shift()).cumsum() + split_dataframes = [group for _, group in df.groupby('Group')] + + out_files: List[Path] = [] + for i in range(len(split_dataframes)): + df_segment = split_dataframes[i] + df_segment.drop(columns=['Group'], inplace=True) + + out_file = Path(f'{out_dir / file.stem}_{i}.parquet') + df_segment.to_parquet(out_file) + out_files.append(out_file) + + return out_files + +def evaluate_block_size(block_size_files: Dict[int, List[Path]]) -> Dict: + """ + Evaluate different block sizes and return performance metrics + + Args: + data_files: List of tuples containing (parquet_file, json_file) paths + block_sizes: List of block sizes to evaluate + """ + results = {} + + for block_size, block_files in block_size_files.items(): + cv_scores = [] + qda = QuadraticDiscriminantAnalysis() + for file in block_files: + df = pd.read_parquet(file) + X_df = df.drop(columns=['Maneuver']) + y_df = df['Maneuver'] + y = y_df.to_numpy() + scoring = make_scorer(accuracy_score) + scores = cross_val_score(qda, X_df, y, cv=5, scoring=scoring) + cv_scores.append(np.mean(scores)) + + results[block_size] = { + 'accuracy': np.mean(cv_scores) + } + + return results + +def plot_results(results: Dict): + """ + Plot the performance metrics for different block sizes + """ + block_sizes = list(results.keys()) + accuracies = [results[size]['accuracy'] for size in block_sizes] + + plt.figure(figsize=(10, 6)) + plt.plot(block_sizes, accuracies, marker='o') + plt.xlabel('Block Size (time steps)') + plt.ylabel('Accuracy') + plt.title('Performance vs Block Size') + plt.grid(True) + plt.savefig('out/blocks.png') + +if __name__ == '__main__': + EVALUATION_FILES = [ + 'dataset/preprocessing/transformed_data/full_dataset_transformation/removed_unimportant_predictors/92b2d28b-21e4-498c-b6dd-c27a47716a25.parquet', + 'dataset/preprocessing/transformed_data/full_dataset_transformation/removed_unimportant_predictors/39b2c145-c49f-470b-8280-d253fa98153f.parquet', + 'dataset/preprocessing/transformed_data/full_dataset_transformation/removed_unimportant_predictors/ef4852a4-fcfe-429b-b753-d11e2ad08cac.parquet', + ] + parquet_files = [Path(f) for f in EVALUATION_FILES] + + block_sizes = [10, 20, 50, 100, 200, 500, 1000] + + block_files: Dict[int, List[Path]] = {} + for block_size in block_sizes: + block_files[block_size] = [] + blocks_out_dir = Path(f'out/artifacts/blocks/{block_size}') + os.makedirs(blocks_out_dir, exist_ok=True) + for file in parquet_files: + block_file = create_blocks(file, block_size, step_size=10, out_dir=blocks_out_dir) + block_files[block_size].append(block_file) + results = evaluate_block_size(block_files) + + # Plot results + plot_results(results) + + # Print detailed results + for block_size, metrics in results.items(): + print(f"\nBlock Size: {block_size}") + print(f"Accuracy: {metrics['accuracy']:.4f}") diff --git a/app/preprocessing/covariance_matrices.py b/app/preprocessing/covariance_matrices.py new file mode 100644 index 0000000000000000000000000000000000000000..a954af994f264dd68a2534b67b6011a53053dd86 --- /dev/null +++ b/app/preprocessing/covariance_matrices.py @@ -0,0 +1,48 @@ +import pandas as pd +import numpy as np +import os +from collections import defaultdict +from sklearn.preprocessing import StandardScaler +import matplotlib.pyplot as plt +import seaborn as sns + +# Define the path to your parquet files directory +parquet_dir = 'dataset/preprocessing/transformed_data/full_dataset_transformation/removed_unimportant_predictors' + +# Dictionary to store covariance matrices per class +covariances = defaultdict(list) + +parquet_files = [f for f in os.listdir(parquet_dir) if f.endswith('.parquet')] # TODO: use utils + +# Process each parquet file individually +for file in parquet_files: + # Load the current parquet file + file_path = os.path.join(parquet_dir, file) + print(f'Collecting covariance from {file}') + df = pd.read_parquet(file_path) + + # Compute covariance for each class (Maneuver) in the current file + for maneuver_class in df['Maneuver'].unique(): + class_data: pd.DataFrame = df[df['Maneuver'] == maneuver_class].drop(columns=['Maneuver']) + # Calculate and store the covariance matrix for the class + # cov_matrix = np.cov(class_data, rowvar=False) + cov_matrix = class_data.cov().abs() + covariances[maneuver_class].append(cov_matrix) + +# Now, average the covariances for each class across all files +avg_covariances = {} +for maneuver_class, cov_matrices in covariances.items(): + # Stack matrices along a new axis and compute mean along that axis + avg_covariances[maneuver_class] = np.mean(cov_matrices, axis=0) + +# Display results +for maneuver_class, cov_matrix in avg_covariances.items(): + print(f"Average Covariance Matrix for class {maneuver_class}:\n{cov_matrix}\n") + +for maneuver_class, cov_matrix in avg_covariances.items(): + plt.figure(figsize=(10, 8)) + sns.heatmap(cov_matrix, annot=False, fmt=".2f", cmap="viridis", cbar=True) + plt.title(f"Average Covariance Matrix for Class '{maneuver_class}'") + plt.xlabel("Features") + plt.ylabel("Features") + plt.savefig(f'out/{maneuver_class}.png') diff --git a/app/preprocessing/feature_selection.py b/app/preprocessing/feature_selection.py new file mode 100644 index 0000000000000000000000000000000000000000..a0ccab236367626df2e208cad50c7b4e4595a49a --- /dev/null +++ b/app/preprocessing/feature_selection.py @@ -0,0 +1,109 @@ +import numpy as np +from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis +from sklearn.feature_selection import SequentialFeatureSelector +from sklearn.metrics import accuracy_score, make_scorer +import pandas as pd +from sklearn.model_selection import cross_val_score + +# EVALUATION_FILES = [ +# Path('out/artifacts/blocks/10/39b2c145-c49f-470b-8280-d253fa98153f.parquet'), +# Path('out/artifacts/blocks/10/92b2d28b-21e4-498c-b6dd-c27a47716a25.parquet'), +# Path('out/artifacts/blocks/10/ef4852a4-fcfe-429b-b753-d11e2ad08cac.parquet'), +# ] + +# Simulated dataset chunk loader (replace with actual logic) +# def load_file(): +# for file in EVALUATION_FILES: +# df = pd.read_parquet(file) +# X = df.drop(columns=['Maneuver']) +# y = df['Maneuver'] + +# yield X.to_numpy(), y.to_numpy() +# # for _ in range(5): # Simulate 5 chunks +# # X_chunk = np.random.rand(chunk_size, 10) # 10 features +# # y_chunk = np.random.choice([0, 1], size=chunk_size) # Binary classes +# # yield X_chunk, y_chunk + +# # Custom scorer for SFS with incremental processing +# def incremental_scorer(estimator, X, y, selected_features): +# """ +# Incrementally evaluate model performance for a subset of features. +# """ +# total_predictions = [] +# total_true = [] + +# for X_chunk, y_chunk in load_file(): +# # Select only the relevant features +# X_chunk_selected = X_chunk[:, selected_features] + +# # Make predictions using the current model +# estimator.fit(X_chunk_selected, y_chunk) +# y_pred = estimator.predict(X_chunk_selected) + +# # Store predictions and true labels +# total_predictions.extend(y_pred) +# total_true.extend(y_chunk) + +# # Compute and return the metric (accuracy in this case) +# return accuracy_score(total_true, total_predictions) + +def evaluate_features(): + print('Evaluating features') + # df = pd.read_parquet('out/artifacts/blocks/20/39b2c145-c49f-470b-8280-d253fa98153f.parquet') + # df = pd.read_parquet('out/artifacts/blocks/20/92b2d28b-21e4-498c-b6dd-c27a47716a25.parquet') + df = pd.read_parquet([ + 'out/artifacts/blocks/20/39b2c145-c49f-470b-8280-d253fa98153f.parquet', + 'out/artifacts/blocks/20/92b2d28b-21e4-498c-b6dd-c27a47716a25.parquet', + ]) + X_df = df.drop(columns=['Maneuver']) + y_df = df['Maneuver'] + X = X_df.to_numpy() + y = y_df.to_numpy() + + qda = QuadraticDiscriminantAnalysis() + scorer = make_scorer(accuracy_score) + + # best_score = -np.inf + # best_features = [] + # for n_features in range(1, max_features): + # print(f'Running with {n_features} features') + sfs = SequentialFeatureSelector( + qda, + direction='forward', + scoring=scorer, + cv=5, + n_jobs=-1, + tol=1e-3 + ) + + # Simulate fitting the SFS on an "empty" dataset (data loading is done in chunks) + # X_placeholder = np.zeros((1, 10)) # Placeholder for 10 features + # y_placeholder = np.zeros(1) # Placeholder for labels + sfs.fit(X, y) + + # Get selected features + selected_features = sfs.get_support() + print(f'Selected Features (index): {selected_features}') + + column_names = [df.columns[idx] for idx in range(len(selected_features)) if selected_features[idx]] + print(f'Selected Features (name): {column_names}') + + X_selected = X[:, selected_features] + score = cross_val_score(qda, X_selected, y, cv=5, scoring='accuracy').mean() + print(f'Best score: {score}') + + return column_names, score + + +if __name__ == '__main__': + evaluate_features() + + # features = [] + + # df = pd.read_parquet('out/artifacts/blocks/10/92b2d28b-21e4-498c-b6dd-c27a47716a25.parquet') + # X_df = df[features] + # y_df = df['Maneuver'] + # qda = QuadraticDiscriminantAnalysis() + + # score = cross_val_score(qda, X_df.to_numpy(), y_df.to_numpy(), cv=5, scoring='accuracy').mean() + # print(f'CV Score: {score}') diff --git a/app/preprocessing/lazy_dataset.py b/app/preprocessing/lazy_dataset.py new file mode 100644 index 0000000000000000000000000000000000000000..7961d5a94b2dbc880129d07a42517128bb61c19a --- /dev/null +++ b/app/preprocessing/lazy_dataset.py @@ -0,0 +1,43 @@ +from pathlib import Path +from typing import List, Dict, Tuple +import pandas as pd + +class LazyDataset: + def __init__(self, files: List[Path]): + columns = None + combined_length = 0 + file_start_end: Dict[str, Tuple[int, int]] = {} + for file in files: + df = pd.read_parquet(file) + if columns is not None: + columns = df.columns + + df_length = len(df) + file_start_end[file] = (combined_length, combined_length + df_length - 1) + combined_length += len(df) + + + + self.n_samples = combined_length + self.n_features = len(columns) + self.file_start_end = file_start_end + self.cached_df = None + + def _infer_shape(self): + return self.n_samples, self.n_features + + def __getitem__(self, idx): + for file, (start, end) in self.file_start_end.items(): + if idx >= start and idx <= end: + if self.cached_df is not None and self.cached_df[0] == file: + df = self.cached_df[1] + else: + df = pd.read_parquet(file) + self.cached_df = (file, df) + + return df.iloc[idx - start].to_numpy() + + raise OverflowError(f'idx {idx} is out of range, max idx is {self.n_samples}') + + def __len__(self): + return self.n_samples diff --git a/app/preprocessing/preprocess.py b/app/preprocessing/preprocess.py index adc844197117d055fb55f53ba72a33f763864633..b0144ea9ff0068b2ae7e18d3e2283e22a2d03320 100644 --- a/app/preprocessing/preprocess.py +++ b/app/preprocessing/preprocess.py @@ -12,7 +12,9 @@ def preprocess( parallelize: bool = True, ) -> None: transform_out_dir = out_dir / 'transformed_data' - transform_dataset(dataset_dir, transform_out_dir, dataset_dir / STATE_DESCRIPTION_FILENAME, parallelize) + out_dir = transform_dataset(dataset_dir, transform_out_dir, dataset_dir / STATE_DESCRIPTION_FILENAME, parallelize) + + print(f'Preprocessed files saved in {out_dir}') # grouped_out_dir = out_dir / 'grouped' # _group_dataset(dataset_dir, grouped_out_dir, test_data_files, excluded_files) diff --git a/app/preprocessing/transform_dataset.py b/app/preprocessing/transform_dataset.py index 5711f94a9cc03e5a27b2767eede983db5acde0ee..e78959bdabf125239f5d953cced51a52648d4eb3 100644 --- a/app/preprocessing/transform_dataset.py +++ b/app/preprocessing/transform_dataset.py @@ -14,22 +14,35 @@ import pyarrow.parquet from . import utils as preprocessing_utils from .file_type import FileType from .json_maneuver_data import JsonManeuverData +from . import blocksize_finding as block_helper DOUBLE_PATTERN = r'Double(\d+)' MAX_DATASET_MEMORY_SIZE = 16602933278 MIN_JOBS = 2 -VARIANCE_THRESHOLD = 0.01 +VARIANCE_THRESHOLD = 1e-10 CORRELATION_THRESHOLD = 0.9 Y_CLASS_COLUMN = 'Maneuver' MANUALLY_EXCLUDED_COLUMNS = [ 'Tablet_Endpoint', ] +BEST_FEATURES = [ + 'Aerofly_Out_Instrument_EngineTorqueFraction', + 'Aerofly_Out_Aircraft_VerticalSpeed', + 'Aerofly_Out_Aircraft_Velocity', + 'CLS_U', + 'Hmd_Position', + 'Aerofly_Out_Aircraft_Acceleration', + 'Aerofly_Out_Aircraft_AngularVelocity', +] +BLOCK_SIZE = 20 +# BEST_FEATURES = None +# BLOCK_SIZE = None def _ensure_shape(array: NDArray, shape: Tuple) -> NDArray: array_shape = array.shape if array_shape != shape: raise ValueError(f'Array shape {array_shape} does not match the required shape {shape}.') - + return array @@ -45,8 +58,8 @@ def _cast_columns(df: pd.DataFrame, column_type_mapping: Dict[str | int, str]) - df[column] = df[column].astype('Int32') elif column_type == 'Boolean': - df[column] = df[column].astype('Int32') - + df[column] = df[column].astype(bool) + elif column_type == 'String': df[column] = df[column].astype(str) @@ -55,7 +68,7 @@ def _cast_columns(df: pd.DataFrame, column_type_mapping: Dict[str | int, str]) - df[column] = df[column] \ .apply(lambda vec: np.array(vec, dtype=np.float64)) \ .apply(lambda arr: _ensure_shape(arr, (vector_rows,)) if isinstance(arr, np.ndarray) else arr) # if it is not instance of np.ndarray, it is NaN (empty cell) - + else: raise ValueError(f'Unexpected type {column_type}') @@ -92,9 +105,9 @@ def _apply_labels(df: pd.DataFrame, json_file: Path) -> pd.DataFrame: for maneuver in maneuvers: mask = (df.index.get_level_values('TimeStamp') >= maneuver.start_timestamp) & \ (df.index.get_level_values('TimeStamp') <= maneuver.end_timestamp) - + df.loc[mask, Y_CLASS_COLUMN] = maneuver.label - + return df.fillna({Y_CLASS_COLUMN: JsonManeuverData.NO_MANEUVER_LABEL}) def _transform_parquet_file( @@ -125,6 +138,10 @@ def _transform_parquet_file( # Drop manually evaluated columns df = df.drop(columns=MANUALLY_EXCLUDED_COLUMNS) + if BEST_FEATURES is not None: + columns_to_drop = [col for col in df.columns if col not in BEST_FEATURES] + df = df.drop(columns=columns_to_drop) + # Parse columns df = _cast_columns(df, column_name_type_mapping) @@ -155,7 +172,28 @@ def _transform_complete_dataset(dataset_dir: Path, out_dir: Path) -> Path: all_files = preprocessing_utils.files_from_dataset(string_columns_as_classes_out_dir, FileType.Parquet) _remove_unimportant_predictors(train_files, all_files, unimportant_predictors_removed_out_dir) - return unimportant_predictors_removed_out_dir + blocks_out_dir = out_dir / 'blocks' + preprocessing_utils.recreate_dir(blocks_out_dir) + train_files = preprocessing_utils.train_files_from_dataset(unimportant_predictors_removed_out_dir, FileType.Parquet) + test_files = preprocessing_utils.test_files_from_dataset(unimportant_predictors_removed_out_dir, FileType.Parquet) + _build_blocks(train_files, test_files, blocks_out_dir) + + bool_to_int_out_dir = out_dir / 'bool_as_int' + preprocessing_utils.recreate_dir(bool_to_int_out_dir) + all_files = preprocessing_utils.files_from_dataset(blocks_out_dir, FileType.Parquet) + _cast_boolean_to_int(all_files, bool_to_int_out_dir) + + return bool_to_int_out_dir + +def _cast_boolean_to_int(parquet_files: List[Path], out_dir: Path) -> None: + for file in parquet_files: + df = pd.read_parquet(file) + bool_columns = df.select_dtypes(include=[bool]).columns + + for column in bool_columns: + df[column] = df[column].astype('Int32') + + df.to_parquet(out_dir / file.name) def _shared_columns(parquet_files: List[Path]) -> Set[str]: if len(parquet_files) == 0: @@ -176,25 +214,25 @@ def _remove_unimportant_predictors(train_files: List[Path], all_files: List[Path df = pd.read_parquet(file) columns_to_analyze = [col for col in df.columns if col not in columns_to_keep] - columns_to_keep.update([col for col in columns_to_analyze if np.std(df[col]) >= VARIANCE_THRESHOLD]) + columns_to_keep.update([col for col in columns_to_analyze if np.var(df[col]) > VARIANCE_THRESHOLD]) - df: pd.DataFrame = df.drop(columns=columns_to_keep) + # df: pd.DataFrame = df.drop(columns=columns_to_keep) - correlation_matrix = df.corr().abs() + # correlation_matrix = df.corr().abs() - # Select the upper triangle of the correlation matrix - upper_tri = correlation_matrix.where( - np.triu(np.ones(correlation_matrix.shape), k=1).astype(bool) - ) + # # Select the upper triangle of the correlation matrix + # upper_tri = correlation_matrix.where( + # np.triu(np.ones(correlation_matrix.shape), k=1).astype(bool) + # ) - columns_to_keep.update([col for col in upper_tri.columns if all(upper_tri[col] <= CORRELATION_THRESHOLD)]) + # columns_to_keep.update([col for col in upper_tri.columns if all(upper_tri[col] <= CORRELATION_THRESHOLD)]) for file in all_files: print(f'Removing not important predictors from {file.name}') df = pd.read_parquet(file) columns_to_drop = [col for col in df.columns if col not in columns_to_keep] - df.drop(columns=columns_to_drop) + df = df.drop(columns=columns_to_drop) df.to_parquet(out_dir / file.name) @@ -203,7 +241,7 @@ def _string_columns_to_classes(parquet_files: List[Path], out_dir: Path) -> None for file in parquet_files: print(f'Collecting string classes from {file.stem}') df = pd.read_parquet(file) - + for column in df.columns: if preprocessing_utils.is_column_of_type(df[column], str) and column != Y_CLASS_COLUMN: if str_column_values.get(column) is None: @@ -216,16 +254,30 @@ def _string_columns_to_classes(parquet_files: List[Path], out_dir: Path) -> None df = pd.read_parquet(file) for column in str_column_values.keys(): - one_hot = pd.get_dummies(df[column], prefix=column, dtype=np.int32) + one_hot = pd.get_dummies(df[column], prefix=column, dtype=bool) one_hot_columns = [f"{column}_{value}" for value in str_column_values[column]] - one_hot = one_hot.reindex(columns=one_hot_columns, fill_value=0) + one_hot = one_hot.reindex(columns=one_hot_columns, fill_value=False) df = df.drop(columns=[column]) df = pd.concat([df, one_hot], axis=1) df.to_parquet(out_dir / file.name) -def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: Path, parallelize: bool = True) -> None: +def _build_blocks(train_files: List[Path], test_files: List[Path], out_dir: Path) -> None: + maneuver_split_out_dir = out_dir / 'maneuver_split' + preprocessing_utils.recreate_dir(maneuver_split_out_dir) + + files_for_blocking: List[Path] = test_files + for file in train_files: + print(f'Creating maneuver split from {file.name}') + maneuver_files = block_helper.split_file_to_maneuvers(file, maneuver_split_out_dir) + files_for_blocking.extend(maneuver_files) + + if BLOCK_SIZE is not None: + for file in files_for_blocking: + block_helper.create_blocks(file, BLOCK_SIZE, out_dir, step_size=10) + +def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: Path, parallelize: bool = True) -> Path: filewise_transformation_out_dir = out_dir / 'filewise_transformation' full_dataset_transformation_out_dir = out_dir / 'full_dataset_transformation' preprocessing_utils.recreate_dir(out_dir) @@ -273,4 +325,5 @@ def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: for parquet_file, json_file in file_tuples: _transform_parquet_file_function_with_args(parquet_file, json_file) - _transform_complete_dataset(filewise_transformation_out_dir, full_dataset_transformation_out_dir) + out_dir = _transform_complete_dataset(filewise_transformation_out_dir, full_dataset_transformation_out_dir) + return out_dir diff --git a/app/qda.py b/app/qda.py new file mode 100644 index 0000000000000000000000000000000000000000..2de100dfd56dec740e0c22df1886e4e8615a95fb --- /dev/null +++ b/app/qda.py @@ -0,0 +1,113 @@ +import numpy as np +from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis +import pandas as pd +from pathlib import Path +from typing import List +from sklearn.metrics import accuracy_score +from sklearn.metrics import recall_score + +FILES_PATH = Path('dataset/preprocessing/transformed_data/full_dataset_transformation/bool_as_int') + +TEST_FILENAMES = [ + "0b3f3902-2c04-4625-8576-3bb963e3d709", + "663f573a-74c5-4368-b60b-1fb433cd835d", + "8c36586f-94e9-4ae9-8384-0f3342008677", + "a376807a-82d3-4526-b19f-98d4b3f9078b", + "d76bb0eb-bc08-4b35-8c1f-37369452083d", + "f40f71de-5cc2-4719-8a5a-abcf950cbd71", +] + +TRAIN_FILES = [path for path in FILES_PATH.glob(f'*.parquet') if path.is_file() and path.stem not in TEST_FILENAMES] +TEST_FILES = [path for path in FILES_PATH.glob(f'*.parquet') if path.is_file() and path.stem in TEST_FILENAMES] + +# Simulated dataset chunk loader (replace with your data loader) +def load_chunk(files: List[Path]): + """ + Generator function to simulate loading chunks of data. + Replace this with your data loading logic. + Example: yield X_chunk, y_chunk + """ + # for _ in range(5): # Simulate 5 chunks + # X_chunk = np.random.rand(chunk_size, 10) # 10 features + # y_chunk = np.random.choice([0, 1], size=chunk_size) # Binary classes + # yield X_chunk, y_chunk + + for file in files: + print(f'Loading file {file.stem}') + df = pd.read_parquet(file) + X_df = df.drop(columns=['Maneuver']) + y_df = df['Maneuver'] + yield X_df.to_numpy(), y_df.to_numpy() + +# # Initialize variables +# class_sums = {} +# class_counts = {} +# class_covariances = {} + +# total_samples = 0 + +# # First pass: Compute class means and priors +# for X_chunk, y_chunk in load_chunk(TRAIN_FILES): +# unique_classes = np.unique(y_chunk) +# for cls in unique_classes: +# X_cls = X_chunk[y_chunk == cls] +# if cls not in class_sums: +# class_sums[cls] = np.zeros(X_cls.shape[1]) +# class_counts[cls] = 0 +# class_sums[cls] += np.sum(X_cls, axis=0) +# class_counts[cls] += X_cls.shape[0] +# total_samples += len(y_chunk) + +# # Compute class means and priors +# class_means = {cls: class_sums[cls] / class_counts[cls] for cls in class_sums} +# priors = {cls: class_counts[cls] / total_samples for cls in class_counts} + +# # Initialize covariance placeholders +# for cls in class_means: +# class_covariances[cls] = np.zeros((len(class_means[cls]), len(class_means[cls]))) + +# # Second pass: Compute covariances +# for X_chunk, y_chunk in load_chunk(TRAIN_FILES): +# unique_classes = np.unique(y_chunk) +# for cls in unique_classes: +# X_cls = X_chunk[y_chunk == cls] +# mean_diff = X_cls - class_means[cls] +# class_covariances[cls] += mean_diff.T @ mean_diff + +# # Normalize covariance matrices +# for cls in class_covariances: +# class_covariances[cls] /= (class_counts[cls] - 1) + +# # Fit QDA using precomputed parameters +# qda = QuadraticDiscriminantAnalysis(store_covariance=True) +# qda.means_ = np.array([class_means[cls] for cls in sorted(class_means.keys())]) +# qda.covariance_ = np.array([class_covariances[cls] for cls in sorted(class_covariances.keys())]) +# qda.priors_ = np.array([priors[cls] for cls in sorted(priors.keys())]) + +qda = QuadraticDiscriminantAnalysis() +df = pd.read_parquet(TRAIN_FILES) +X_df = df.drop(columns=['Maneuver']) +y_df = df['Maneuver'] + +qda.fit(X_df.to_numpy(), y_df.to_numpy()) + +# Ready to use the model for predictions +print("QDA model fitted successfully with precomputed parameters!") + +y_true = [] +y_pred = [] + +n_rows = 0 +for X, chunk_y_true in load_chunk(TEST_FILES): + chunk_y_pred = qda.predict(X) + y_true.extend(chunk_y_true) + y_pred.extend(chunk_y_pred) + +acc_score = accuracy_score(y_true, y_pred) + +global_rec_score = recall_score(y_true, y_pred, average='micro') +class_rec_score = recall_score(y_true, y_pred, average='macro') + +print(f'Test accuracy: {np.mean(acc_score)}') +print(f'Global test recall: {np.mean(global_rec_score)}') +print(f'Class test recall: {class_rec_score}') diff --git a/pyproject.toml b/pyproject.toml index ecccbe70c040824f9a3ab5e4d1af7e93fdd5be94..b15d5863b4c2decc4b0a839d5c69cdb595606321 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -7,6 +7,9 @@ dependencies = [ "pyarrow >= 18.0.0, < 19.0.0", "joblib >= 1.4.2, < 2.0.0", "psutil >= 6.1.0, < 7.0.0", + "scikit-learn >= 1.5.2, < 2.0.0", + "matplotlib >= 3.9.2, < 4.0.0", + "seaborn >= 0.13.2, < 1.0.0", ] maintainers = [ { name = "Andri Joos" },