diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 8a2f1cf2b8446e8f17d367dba8f9f5b1be872f5d..aecac5740699fa991feec216e24a570c21b72f31 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -3,7 +3,7 @@ "workspaceFolder": "/workdir", "workspaceMount": "source=${localWorkspaceFolder},target=/workdir,type=bind,consistency=delegated", "remoteUser": "vscode", - "image": "andrijoos/devcontainer-python:3-bookworm", + "image": "andrijoos/devcontainer-python:3.12-bookworm", "customizations": { "vscode": { "extensions": [ diff --git a/.vscode/launch.json b/.vscode/launch.json index 5a7690422ec58358efbe6e573bd2716a80ed926b..40d12898d73371c3d2072a3d48240d87534e2291 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -20,11 +20,32 @@ ], }, { - "name": "Python Debugger: [test] covariance matrices", + "name": "Python Debugger: [eval] covariance matrices", "type": "debugpy", "request": "launch", "program": "${workspaceFolder}/app/preprocessing/covariance_matrices.py", "console": "integratedTerminal" }, + { + "name": "Python Debugger: [eval] blocksize finding", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/app/preprocessing/blocksize_finding.py", + "console": "integratedTerminal" + }, + { + "name": "Python Debugger: [eval] feature selection", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/app/preprocessing/feature_selection.py", + "console": "integratedTerminal" + }, + { + "name": "Python Debugger: [eval] qda", + "type": "debugpy", + "request": "launch", + "program": "${workspaceFolder}/app/qda.py", + "console": "integratedTerminal" + }, ] } diff --git a/app/preprocessing/blocksize_finding.py b/app/preprocessing/blocksize_finding.py index 1943d1899bc852f1f55081628b8d5b4421db22c0..0732b55ebb3825222312cd97982959214631c2e9 100644 --- a/app/preprocessing/blocksize_finding.py +++ b/app/preprocessing/blocksize_finding.py @@ -1,71 +1,73 @@ import pandas as pd import numpy as np -from sklearn.model_selection import train_test_split -from sklearn.metrics import accuracy_score, classification_report +from sklearn.metrics import make_scorer, accuracy_score from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis import matplotlib.pyplot as plt -import json from typing import List, Dict, Tuple +import os +import pyarrow.parquet as pq +from pathlib import Path +import pyarrow.types as pat +import pyarrow as pa +# import dask.dataframe as dd +# from dask_ml.model_selection import KFold +from sklearn.model_selection import cross_val_score -def load_data(parquet_file: str, json_file: str) -> Tuple[pd.DataFrame, Dict]: - """ - Load the flight data and corresponding maneuver labels - """ - # Load flight data - df = pd.read_parquet(parquet_file) - - # Load maneuver labels - with open(json_file, 'r') as f: - labels = json.load(f) - - return df, labels +def create_blocks(file: Path, block_size: int, out_dir: Path, step_size: int = 1): + table = pq.read_table(file) + rows = [] -def create_blocks(df: pd.DataFrame, block_size: int, overlap: float = 0.5) -> List[pd.DataFrame]: - """ - Divide time series data into overlapping blocks - - Args: - df: Input DataFrame with time series data - block_size: Number of time steps in each block - overlap: Fraction of overlap between consecutive blocks (0 to 1) - """ - blocks = [] - step_size = int(block_size * (1 - overlap)) - - for i in range(0, len(df) - block_size + 1, step_size): - block = df.iloc[i:i + block_size] - blocks.append(block) - - return blocks + if table.num_rows < block_size: + return [] -def extract_features(block: pd.DataFrame) -> Dict: - """ - Extract statistical features from a block of time series data - """ - features = {} - - # Select numerical columns only - numeric_cols = block.drop(columns=['Maneuver']).reset_index(drop=True) - - for col in numeric_cols: - # Basic statistical features - features[f'{col}_mean'] = block[col].mean() - features[f'{col}_std'] = block[col].std() - features[f'{col}_min'] = block[col].min() - features[f'{col}_max'] = block[col].max() - features[f'{col}_median'] = block[col].median() - - # Additional statistical features - features[f'{col}_skew'] = block[col].skew() - features[f'{col}_kurtosis'] = block[col].kurtosis() - - # Range and rate of change features - features[f'{col}_range'] = features[f'{col}_max'] - features[f'{col}_min'] - features[f'{col}_roc'] = block[col].diff().mean() # Rate of change - - return features + print(f'Creating blocks with size {block_size} from {file}') -def evaluate_block_size(parquet_files: List[str], block_sizes: List[int]) -> Dict: + for i in range(0, table.num_rows - block_size + 1, step_size): + block = table.slice(i, block_size) + last_row = block.slice(block_size - 1, 1) + row = { + 'TimeStamp': last_row['TimeStamp'][0].as_py(), + 'FrameCounter': last_row['FrameCounter'][0].as_py() + } + + for column in table.column_names: + if column not in row.keys(): + column_type = table.schema.field(column).type + if pat.is_floating(column_type) or pat.is_integer(column_type): + column_data = block[column] + row[f'{column}_mean'] = pa.compute.mean(column_data).as_py() + row[f'{column}_min'] = pa.compute.min(column_data).as_py() + row[f'{column}_max'] = pa.compute.max(column_data).as_py() + else: + row[column] = last_row[column][0].as_py() + + rows.append(row) + + df = pd.DataFrame(rows) + df.set_index(['TimeStamp', 'FrameCounter'], inplace=True) + df.sort_index(level=['TimeStamp']) + + out_file = out_dir / file.name + df.to_parquet(out_file) + return out_file + +def split_file_to_maneuvers(file: Path, out_dir: Path) -> List[Path]: + df = pd.read_parquet(file) + df['Group'] = (df['Maneuver'] != df['Maneuver'].shift()).cumsum() + split_dataframes = [group for _, group in df.groupby('Group')] + + out_files: List[Path] = [] + for i in range(len(split_dataframes)): + df_segment = split_dataframes[i] + df_segment.drop(columns=['Group'], inplace=True) + + out_file = Path(f'{out_dir / file.stem}_{i}.parquet') + df_segment.to_parquet(out_file) + out_files.append(out_file) + + return out_files + +def evaluate_block_size(block_size_files: Dict[int, List[Path]]) -> Dict: """ Evaluate different block sizes and return performance metrics @@ -74,54 +76,21 @@ def evaluate_block_size(parquet_files: List[str], block_sizes: List[int]) -> Dic block_sizes: List of block sizes to evaluate """ results = {} - - for block_size in block_sizes: - print(f"Evaluating block size: {block_size}") - - # Store features and labels for all files - X = [] - y = [] - - # Process each file - for parquet_file in parquet_files: - # Load data - # df, labels = load_data(parquet_file, json_file) - df = pd.read_parquet(parquet_file) - - # Create blocks - blocks = create_blocks(df, block_size) - - # Extract features from each block - for block in blocks: - features = extract_features(block) - X.append(features) - dominant_label = block['Maneuver'].mode()[0] - - # Determine the dominant maneuver in this block - # (You'll need to implement logic to match timestamps with labels) - # This is a placeholder - implement actual label assignment - y.append(dominant_label) - - # Convert to DataFrame - X_df = pd.DataFrame(X) - - # Split data - X_train, X_test, y_train, y_test = train_test_split(X_df, y, test_size=0.2, random_state=42) - - # Train a simple classifier (Random Forest as an example) - # clf = RandomForestClassifier(n_estimators=100, random_state=42) - # clf.fit(X_train, y_train) + + for block_size, block_files in block_size_files.items(): + cv_scores = [] qda = QuadraticDiscriminantAnalysis() - qda.fit(X_train, y_train) - - # Evaluate - y_pred = qda.predict(X_test) - accuracy = accuracy_score(y_test, y_pred) - - # Store results + for file in block_files: + df = pd.read_parquet(file) + X_df = df.drop(columns=['Maneuver']) + y_df = df['Maneuver'] + y = y_df.to_numpy() + scoring = make_scorer(accuracy_score) + scores = cross_val_score(qda, X_df, y, cv=5, scoring=scoring) + cv_scores.append(np.mean(scores)) + results[block_size] = { - 'accuracy': accuracy, - 'report': classification_report(y_test, y_pred) + 'accuracy': np.mean(cv_scores) } return results @@ -139,25 +108,32 @@ def plot_results(results: Dict): plt.ylabel('Accuracy') plt.title('Performance vs Block Size') plt.grid(True) - plt.show() + plt.savefig('out/blocks.png') -# Example usage -data_files = [ - ('path/to/file1.parquet', 'path/to/file1.json'), - ('path/to/file2.parquet', 'path/to/file2.json') -] +if __name__ == '__main__': + EVALUATION_FILES = [ + 'dataset/preprocessing/transformed_data/full_dataset_transformation/removed_unimportant_predictors/92b2d28b-21e4-498c-b6dd-c27a47716a25.parquet', + 'dataset/preprocessing/transformed_data/full_dataset_transformation/removed_unimportant_predictors/39b2c145-c49f-470b-8280-d253fa98153f.parquet', + 'dataset/preprocessing/transformed_data/full_dataset_transformation/removed_unimportant_predictors/ef4852a4-fcfe-429b-b753-d11e2ad08cac.parquet', + ] + parquet_files = [Path(f) for f in EVALUATION_FILES] -block_sizes = [10, 20, 50, 100, 200, 500] + block_sizes = [10, 20, 50, 100, 200, 500, 1000] -# Evaluate different block sizes -results = evaluate_block_size(data_files, block_sizes) + block_files: Dict[int, List[Path]] = {} + for block_size in block_sizes: + block_files[block_size] = [] + blocks_out_dir = Path(f'out/artifacts/blocks/{block_size}') + os.makedirs(blocks_out_dir, exist_ok=True) + for file in parquet_files: + block_file = create_blocks(file, block_size, step_size=10, out_dir=blocks_out_dir) + block_files[block_size].append(block_file) + results = evaluate_block_size(block_files) -# Plot results -plot_results(results) + # Plot results + plot_results(results) -# Print detailed results -for block_size, metrics in results.items(): - print(f"\nBlock Size: {block_size}") - print(f"Accuracy: {metrics['accuracy']:.4f}") - print("Classification Report:") - print(metrics['report']) + # Print detailed results + for block_size, metrics in results.items(): + print(f"\nBlock Size: {block_size}") + print(f"Accuracy: {metrics['accuracy']:.4f}") diff --git a/app/preprocessing/feature_selection.py b/app/preprocessing/feature_selection.py new file mode 100644 index 0000000000000000000000000000000000000000..a0ccab236367626df2e208cad50c7b4e4595a49a --- /dev/null +++ b/app/preprocessing/feature_selection.py @@ -0,0 +1,109 @@ +import numpy as np +from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis +from sklearn.feature_selection import SequentialFeatureSelector +from sklearn.metrics import accuracy_score, make_scorer +import pandas as pd +from sklearn.model_selection import cross_val_score + +# EVALUATION_FILES = [ +# Path('out/artifacts/blocks/10/39b2c145-c49f-470b-8280-d253fa98153f.parquet'), +# Path('out/artifacts/blocks/10/92b2d28b-21e4-498c-b6dd-c27a47716a25.parquet'), +# Path('out/artifacts/blocks/10/ef4852a4-fcfe-429b-b753-d11e2ad08cac.parquet'), +# ] + +# Simulated dataset chunk loader (replace with actual logic) +# def load_file(): +# for file in EVALUATION_FILES: +# df = pd.read_parquet(file) +# X = df.drop(columns=['Maneuver']) +# y = df['Maneuver'] + +# yield X.to_numpy(), y.to_numpy() +# # for _ in range(5): # Simulate 5 chunks +# # X_chunk = np.random.rand(chunk_size, 10) # 10 features +# # y_chunk = np.random.choice([0, 1], size=chunk_size) # Binary classes +# # yield X_chunk, y_chunk + +# # Custom scorer for SFS with incremental processing +# def incremental_scorer(estimator, X, y, selected_features): +# """ +# Incrementally evaluate model performance for a subset of features. +# """ +# total_predictions = [] +# total_true = [] + +# for X_chunk, y_chunk in load_file(): +# # Select only the relevant features +# X_chunk_selected = X_chunk[:, selected_features] + +# # Make predictions using the current model +# estimator.fit(X_chunk_selected, y_chunk) +# y_pred = estimator.predict(X_chunk_selected) + +# # Store predictions and true labels +# total_predictions.extend(y_pred) +# total_true.extend(y_chunk) + +# # Compute and return the metric (accuracy in this case) +# return accuracy_score(total_true, total_predictions) + +def evaluate_features(): + print('Evaluating features') + # df = pd.read_parquet('out/artifacts/blocks/20/39b2c145-c49f-470b-8280-d253fa98153f.parquet') + # df = pd.read_parquet('out/artifacts/blocks/20/92b2d28b-21e4-498c-b6dd-c27a47716a25.parquet') + df = pd.read_parquet([ + 'out/artifacts/blocks/20/39b2c145-c49f-470b-8280-d253fa98153f.parquet', + 'out/artifacts/blocks/20/92b2d28b-21e4-498c-b6dd-c27a47716a25.parquet', + ]) + X_df = df.drop(columns=['Maneuver']) + y_df = df['Maneuver'] + X = X_df.to_numpy() + y = y_df.to_numpy() + + qda = QuadraticDiscriminantAnalysis() + scorer = make_scorer(accuracy_score) + + # best_score = -np.inf + # best_features = [] + # for n_features in range(1, max_features): + # print(f'Running with {n_features} features') + sfs = SequentialFeatureSelector( + qda, + direction='forward', + scoring=scorer, + cv=5, + n_jobs=-1, + tol=1e-3 + ) + + # Simulate fitting the SFS on an "empty" dataset (data loading is done in chunks) + # X_placeholder = np.zeros((1, 10)) # Placeholder for 10 features + # y_placeholder = np.zeros(1) # Placeholder for labels + sfs.fit(X, y) + + # Get selected features + selected_features = sfs.get_support() + print(f'Selected Features (index): {selected_features}') + + column_names = [df.columns[idx] for idx in range(len(selected_features)) if selected_features[idx]] + print(f'Selected Features (name): {column_names}') + + X_selected = X[:, selected_features] + score = cross_val_score(qda, X_selected, y, cv=5, scoring='accuracy').mean() + print(f'Best score: {score}') + + return column_names, score + + +if __name__ == '__main__': + evaluate_features() + + # features = [] + + # df = pd.read_parquet('out/artifacts/blocks/10/92b2d28b-21e4-498c-b6dd-c27a47716a25.parquet') + # X_df = df[features] + # y_df = df['Maneuver'] + # qda = QuadraticDiscriminantAnalysis() + + # score = cross_val_score(qda, X_df.to_numpy(), y_df.to_numpy(), cv=5, scoring='accuracy').mean() + # print(f'CV Score: {score}') diff --git a/app/preprocessing/lazy_dataset.py b/app/preprocessing/lazy_dataset.py new file mode 100644 index 0000000000000000000000000000000000000000..7961d5a94b2dbc880129d07a42517128bb61c19a --- /dev/null +++ b/app/preprocessing/lazy_dataset.py @@ -0,0 +1,43 @@ +from pathlib import Path +from typing import List, Dict, Tuple +import pandas as pd + +class LazyDataset: + def __init__(self, files: List[Path]): + columns = None + combined_length = 0 + file_start_end: Dict[str, Tuple[int, int]] = {} + for file in files: + df = pd.read_parquet(file) + if columns is not None: + columns = df.columns + + df_length = len(df) + file_start_end[file] = (combined_length, combined_length + df_length - 1) + combined_length += len(df) + + + + self.n_samples = combined_length + self.n_features = len(columns) + self.file_start_end = file_start_end + self.cached_df = None + + def _infer_shape(self): + return self.n_samples, self.n_features + + def __getitem__(self, idx): + for file, (start, end) in self.file_start_end.items(): + if idx >= start and idx <= end: + if self.cached_df is not None and self.cached_df[0] == file: + df = self.cached_df[1] + else: + df = pd.read_parquet(file) + self.cached_df = (file, df) + + return df.iloc[idx - start].to_numpy() + + raise OverflowError(f'idx {idx} is out of range, max idx is {self.n_samples}') + + def __len__(self): + return self.n_samples diff --git a/app/preprocessing/preprocess.py b/app/preprocessing/preprocess.py index adc844197117d055fb55f53ba72a33f763864633..b0144ea9ff0068b2ae7e18d3e2283e22a2d03320 100644 --- a/app/preprocessing/preprocess.py +++ b/app/preprocessing/preprocess.py @@ -12,7 +12,9 @@ def preprocess( parallelize: bool = True, ) -> None: transform_out_dir = out_dir / 'transformed_data' - transform_dataset(dataset_dir, transform_out_dir, dataset_dir / STATE_DESCRIPTION_FILENAME, parallelize) + out_dir = transform_dataset(dataset_dir, transform_out_dir, dataset_dir / STATE_DESCRIPTION_FILENAME, parallelize) + + print(f'Preprocessed files saved in {out_dir}') # grouped_out_dir = out_dir / 'grouped' # _group_dataset(dataset_dir, grouped_out_dir, test_data_files, excluded_files) diff --git a/app/preprocessing/transform_dataset.py b/app/preprocessing/transform_dataset.py index 702f92776b928ffe243d13cda8bdc01701d4c127..e78959bdabf125239f5d953cced51a52648d4eb3 100644 --- a/app/preprocessing/transform_dataset.py +++ b/app/preprocessing/transform_dataset.py @@ -14,6 +14,7 @@ import pyarrow.parquet from . import utils as preprocessing_utils from .file_type import FileType from .json_maneuver_data import JsonManeuverData +from . import blocksize_finding as block_helper DOUBLE_PATTERN = r'Double(\d+)' MAX_DATASET_MEMORY_SIZE = 16602933278 @@ -24,12 +25,24 @@ Y_CLASS_COLUMN = 'Maneuver' MANUALLY_EXCLUDED_COLUMNS = [ 'Tablet_Endpoint', ] +BEST_FEATURES = [ + 'Aerofly_Out_Instrument_EngineTorqueFraction', + 'Aerofly_Out_Aircraft_VerticalSpeed', + 'Aerofly_Out_Aircraft_Velocity', + 'CLS_U', + 'Hmd_Position', + 'Aerofly_Out_Aircraft_Acceleration', + 'Aerofly_Out_Aircraft_AngularVelocity', +] +BLOCK_SIZE = 20 +# BEST_FEATURES = None +# BLOCK_SIZE = None def _ensure_shape(array: NDArray, shape: Tuple) -> NDArray: array_shape = array.shape if array_shape != shape: raise ValueError(f'Array shape {array_shape} does not match the required shape {shape}.') - + return array @@ -45,8 +58,8 @@ def _cast_columns(df: pd.DataFrame, column_type_mapping: Dict[str | int, str]) - df[column] = df[column].astype('Int32') elif column_type == 'Boolean': - df[column] = df[column].astype('Int32') - + df[column] = df[column].astype(bool) + elif column_type == 'String': df[column] = df[column].astype(str) @@ -55,7 +68,7 @@ def _cast_columns(df: pd.DataFrame, column_type_mapping: Dict[str | int, str]) - df[column] = df[column] \ .apply(lambda vec: np.array(vec, dtype=np.float64)) \ .apply(lambda arr: _ensure_shape(arr, (vector_rows,)) if isinstance(arr, np.ndarray) else arr) # if it is not instance of np.ndarray, it is NaN (empty cell) - + else: raise ValueError(f'Unexpected type {column_type}') @@ -92,9 +105,9 @@ def _apply_labels(df: pd.DataFrame, json_file: Path) -> pd.DataFrame: for maneuver in maneuvers: mask = (df.index.get_level_values('TimeStamp') >= maneuver.start_timestamp) & \ (df.index.get_level_values('TimeStamp') <= maneuver.end_timestamp) - + df.loc[mask, Y_CLASS_COLUMN] = maneuver.label - + return df.fillna({Y_CLASS_COLUMN: JsonManeuverData.NO_MANEUVER_LABEL}) def _transform_parquet_file( @@ -125,6 +138,10 @@ def _transform_parquet_file( # Drop manually evaluated columns df = df.drop(columns=MANUALLY_EXCLUDED_COLUMNS) + if BEST_FEATURES is not None: + columns_to_drop = [col for col in df.columns if col not in BEST_FEATURES] + df = df.drop(columns=columns_to_drop) + # Parse columns df = _cast_columns(df, column_name_type_mapping) @@ -155,7 +172,28 @@ def _transform_complete_dataset(dataset_dir: Path, out_dir: Path) -> Path: all_files = preprocessing_utils.files_from_dataset(string_columns_as_classes_out_dir, FileType.Parquet) _remove_unimportant_predictors(train_files, all_files, unimportant_predictors_removed_out_dir) - return unimportant_predictors_removed_out_dir + blocks_out_dir = out_dir / 'blocks' + preprocessing_utils.recreate_dir(blocks_out_dir) + train_files = preprocessing_utils.train_files_from_dataset(unimportant_predictors_removed_out_dir, FileType.Parquet) + test_files = preprocessing_utils.test_files_from_dataset(unimportant_predictors_removed_out_dir, FileType.Parquet) + _build_blocks(train_files, test_files, blocks_out_dir) + + bool_to_int_out_dir = out_dir / 'bool_as_int' + preprocessing_utils.recreate_dir(bool_to_int_out_dir) + all_files = preprocessing_utils.files_from_dataset(blocks_out_dir, FileType.Parquet) + _cast_boolean_to_int(all_files, bool_to_int_out_dir) + + return bool_to_int_out_dir + +def _cast_boolean_to_int(parquet_files: List[Path], out_dir: Path) -> None: + for file in parquet_files: + df = pd.read_parquet(file) + bool_columns = df.select_dtypes(include=[bool]).columns + + for column in bool_columns: + df[column] = df[column].astype('Int32') + + df.to_parquet(out_dir / file.name) def _shared_columns(parquet_files: List[Path]) -> Set[str]: if len(parquet_files) == 0: @@ -203,7 +241,7 @@ def _string_columns_to_classes(parquet_files: List[Path], out_dir: Path) -> None for file in parquet_files: print(f'Collecting string classes from {file.stem}') df = pd.read_parquet(file) - + for column in df.columns: if preprocessing_utils.is_column_of_type(df[column], str) and column != Y_CLASS_COLUMN: if str_column_values.get(column) is None: @@ -216,16 +254,30 @@ def _string_columns_to_classes(parquet_files: List[Path], out_dir: Path) -> None df = pd.read_parquet(file) for column in str_column_values.keys(): - one_hot = pd.get_dummies(df[column], prefix=column, dtype=np.int32) + one_hot = pd.get_dummies(df[column], prefix=column, dtype=bool) one_hot_columns = [f"{column}_{value}" for value in str_column_values[column]] - one_hot = one_hot.reindex(columns=one_hot_columns, fill_value=0) + one_hot = one_hot.reindex(columns=one_hot_columns, fill_value=False) df = df.drop(columns=[column]) df = pd.concat([df, one_hot], axis=1) df.to_parquet(out_dir / file.name) -def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: Path, parallelize: bool = True) -> None: +def _build_blocks(train_files: List[Path], test_files: List[Path], out_dir: Path) -> None: + maneuver_split_out_dir = out_dir / 'maneuver_split' + preprocessing_utils.recreate_dir(maneuver_split_out_dir) + + files_for_blocking: List[Path] = test_files + for file in train_files: + print(f'Creating maneuver split from {file.name}') + maneuver_files = block_helper.split_file_to_maneuvers(file, maneuver_split_out_dir) + files_for_blocking.extend(maneuver_files) + + if BLOCK_SIZE is not None: + for file in files_for_blocking: + block_helper.create_blocks(file, BLOCK_SIZE, out_dir, step_size=10) + +def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: Path, parallelize: bool = True) -> Path: filewise_transformation_out_dir = out_dir / 'filewise_transformation' full_dataset_transformation_out_dir = out_dir / 'full_dataset_transformation' preprocessing_utils.recreate_dir(out_dir) @@ -273,4 +325,5 @@ def transform_dataset(dataset_dir: Path, out_dir: Path, state_description_file: for parquet_file, json_file in file_tuples: _transform_parquet_file_function_with_args(parquet_file, json_file) - _transform_complete_dataset(filewise_transformation_out_dir, full_dataset_transformation_out_dir) + out_dir = _transform_complete_dataset(filewise_transformation_out_dir, full_dataset_transformation_out_dir) + return out_dir diff --git a/app/qda.py b/app/qda.py new file mode 100644 index 0000000000000000000000000000000000000000..2de100dfd56dec740e0c22df1886e4e8615a95fb --- /dev/null +++ b/app/qda.py @@ -0,0 +1,113 @@ +import numpy as np +from sklearn.discriminant_analysis import QuadraticDiscriminantAnalysis +import pandas as pd +from pathlib import Path +from typing import List +from sklearn.metrics import accuracy_score +from sklearn.metrics import recall_score + +FILES_PATH = Path('dataset/preprocessing/transformed_data/full_dataset_transformation/bool_as_int') + +TEST_FILENAMES = [ + "0b3f3902-2c04-4625-8576-3bb963e3d709", + "663f573a-74c5-4368-b60b-1fb433cd835d", + "8c36586f-94e9-4ae9-8384-0f3342008677", + "a376807a-82d3-4526-b19f-98d4b3f9078b", + "d76bb0eb-bc08-4b35-8c1f-37369452083d", + "f40f71de-5cc2-4719-8a5a-abcf950cbd71", +] + +TRAIN_FILES = [path for path in FILES_PATH.glob(f'*.parquet') if path.is_file() and path.stem not in TEST_FILENAMES] +TEST_FILES = [path for path in FILES_PATH.glob(f'*.parquet') if path.is_file() and path.stem in TEST_FILENAMES] + +# Simulated dataset chunk loader (replace with your data loader) +def load_chunk(files: List[Path]): + """ + Generator function to simulate loading chunks of data. + Replace this with your data loading logic. + Example: yield X_chunk, y_chunk + """ + # for _ in range(5): # Simulate 5 chunks + # X_chunk = np.random.rand(chunk_size, 10) # 10 features + # y_chunk = np.random.choice([0, 1], size=chunk_size) # Binary classes + # yield X_chunk, y_chunk + + for file in files: + print(f'Loading file {file.stem}') + df = pd.read_parquet(file) + X_df = df.drop(columns=['Maneuver']) + y_df = df['Maneuver'] + yield X_df.to_numpy(), y_df.to_numpy() + +# # Initialize variables +# class_sums = {} +# class_counts = {} +# class_covariances = {} + +# total_samples = 0 + +# # First pass: Compute class means and priors +# for X_chunk, y_chunk in load_chunk(TRAIN_FILES): +# unique_classes = np.unique(y_chunk) +# for cls in unique_classes: +# X_cls = X_chunk[y_chunk == cls] +# if cls not in class_sums: +# class_sums[cls] = np.zeros(X_cls.shape[1]) +# class_counts[cls] = 0 +# class_sums[cls] += np.sum(X_cls, axis=0) +# class_counts[cls] += X_cls.shape[0] +# total_samples += len(y_chunk) + +# # Compute class means and priors +# class_means = {cls: class_sums[cls] / class_counts[cls] for cls in class_sums} +# priors = {cls: class_counts[cls] / total_samples for cls in class_counts} + +# # Initialize covariance placeholders +# for cls in class_means: +# class_covariances[cls] = np.zeros((len(class_means[cls]), len(class_means[cls]))) + +# # Second pass: Compute covariances +# for X_chunk, y_chunk in load_chunk(TRAIN_FILES): +# unique_classes = np.unique(y_chunk) +# for cls in unique_classes: +# X_cls = X_chunk[y_chunk == cls] +# mean_diff = X_cls - class_means[cls] +# class_covariances[cls] += mean_diff.T @ mean_diff + +# # Normalize covariance matrices +# for cls in class_covariances: +# class_covariances[cls] /= (class_counts[cls] - 1) + +# # Fit QDA using precomputed parameters +# qda = QuadraticDiscriminantAnalysis(store_covariance=True) +# qda.means_ = np.array([class_means[cls] for cls in sorted(class_means.keys())]) +# qda.covariance_ = np.array([class_covariances[cls] for cls in sorted(class_covariances.keys())]) +# qda.priors_ = np.array([priors[cls] for cls in sorted(priors.keys())]) + +qda = QuadraticDiscriminantAnalysis() +df = pd.read_parquet(TRAIN_FILES) +X_df = df.drop(columns=['Maneuver']) +y_df = df['Maneuver'] + +qda.fit(X_df.to_numpy(), y_df.to_numpy()) + +# Ready to use the model for predictions +print("QDA model fitted successfully with precomputed parameters!") + +y_true = [] +y_pred = [] + +n_rows = 0 +for X, chunk_y_true in load_chunk(TEST_FILES): + chunk_y_pred = qda.predict(X) + y_true.extend(chunk_y_true) + y_pred.extend(chunk_y_pred) + +acc_score = accuracy_score(y_true, y_pred) + +global_rec_score = recall_score(y_true, y_pred, average='micro') +class_rec_score = recall_score(y_true, y_pred, average='macro') + +print(f'Test accuracy: {np.mean(acc_score)}') +print(f'Global test recall: {np.mean(global_rec_score)}') +print(f'Class test recall: {class_rec_score}')