Skip to content
Snippets Groups Projects
app.py 6.87 KiB
from argparse import ArgumentParser
from pathlib import Path
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
from typing import List, Tuple
import statsmodels.api as sm

TRAIN_DATA_ARG = '--train-data'
TRAIN_DATA_ARG_SHORT = '-t'
OUT_DIR_ARG = '--out'
OUT_DIR_ARG_SHORT = '-o'
DEFAULT_OUT_DIR = 'out/'
CORRELATION_THRESHOLD_ARG = '--correlation-threshold'
CORRELATION_THRESHOLD_ARG_SHORT = '-c'
DEFAULT_CORRELATION_THRESHOLD = 0.9
TARGET_ARG = '--target'
P_VALUE_THRESHOLD_ARG = '--p-value-threshold'
DEFAULT_P_VALUE_THRESHOLD = 0.05

PVALUE_COLUMN_NAME = 'p-value'
RSQUARED_COLUMN_NAME = 'R^2'

def ensure_directory(directory: Path):
    directory.mkdir(parents=True, exist_ok=True)

def correlation_analysis(train_data_file: Path, out_dir: Path, correlation_threshold: float):
    # Load training and test data
    train_data = pd.read_csv(train_data_file)

    # Display correlation matrix
    correlation_matrix = train_data.corr()
    plt.figure(figsize=(10, 8))
    sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', vmin=-1, vmax=1)
    plt.title('Correlation Matrix')

    ensure_directory(out_dir)
    correlation_matrix_file = out_dir / 'correlation_matrix.png'
    plt.savefig(correlation_matrix_file, bbox_inches='tight')

    # Identify highly correlated variables
    high_correlation_matrix = correlation_matrix[correlation_matrix.abs() >= correlation_threshold] # outputs a matrix, where all values < correlation_threshold are NaN and values >= correlation_threshold are their correlation
    print('High correlation matrix')
    print(high_correlation_matrix)

    high_correlations: List[Tuple[str, str, int]] = []
    for i in range(len(correlation_matrix.index)):
        for j in range(i):
            correlation_value: float = high_correlation_matrix.iloc[i, j]
            if not math.isnan(correlation_value):
                high_correlations.append((correlation_matrix.index[i], correlation_matrix.columns[j], correlation_value))
    
    if not high_correlations:
        print("No high correlations found.")
    else:
        correlations = [f'{column}/{row}: {correlation_value:.5f}' for column, row, correlation_value in high_correlations]
        print(f'High correlations found in {', '.join(correlations)}')

        ensure_directory(out_dir)
        correlations_file = out_dir / "correlations.txt"
        with open(correlations_file, 'w') as f:
            f.writelines(correlations)

def single_feature_regression(data: pd.DataFrame, feature: str, target: str):
    X = sm.add_constant(data[[feature]])  # Add constant for intercept
    y = data[target]
    model = sm.OLS(y, X).fit()
    return model.pvalues.iloc[1], model.rsquared

def best_single_feature_regression(train_data_file: Path, target: str, p_value_threshold: float, out_dir: Path):
    train_data = pd.read_csv(train_data_file)
    features = train_data.columns
    features = features.drop(target)

    evaluated_features = pd.DataFrame({
        PVALUE_COLUMN_NAME: pd.Series(dtype='float'),
        RSQUARED_COLUMN_NAME: pd.Series(dtype='float'),
    })
    for feature in features:
        pvalue, rsquared = single_feature_regression(train_data, feature, target)
        evaluated_features.loc[feature] = {PVALUE_COLUMN_NAME: pvalue, RSQUARED_COLUMN_NAME: rsquared}

    print('Evaluated features')
    print(evaluated_features)

    plt.figure(figsize=(1.75, 4.8))
    evaluated_pvalues = evaluated_features[[PVALUE_COLUMN_NAME]]
    sns.heatmap(evaluated_pvalues, annot=True, cmap='coolwarm', vmin=0, vmax=1)

    ensure_directory(out_dir)
    evaluated_pvalues_file_path = out_dir / "evaluated_pvalues.png"
    plt.savefig(evaluated_pvalues_file_path, bbox_inches='tight')

    plt.figure(figsize=(1.75, 4.8))
    evaluated_rsquares = evaluated_features[[RSQUARED_COLUMN_NAME]]
    sns.heatmap(evaluated_rsquares, annot=True, cmap='coolwarm', vmin=0, vmax=1)

    ensure_directory(out_dir)
    evaluated_rsquares_file_path = out_dir / "evaluated_rsquares.png"
    plt.savefig(evaluated_rsquares_file_path, bbox_inches='tight')

    possible_features = evaluated_features.where(evaluated_features[PVALUE_COLUMN_NAME] < p_value_threshold).dropna()
    possible_features = possible_features.sort_values(RSQUARED_COLUMN_NAME, ascending=False)

    print()
    print('Features matching p-value-threshold')
    print(possible_features)

    best_feature: pd.Series = None
    if possible_features.shape[0] > 0:
        best_feature = possible_features.iloc[0]
    else:
        print('No feature meets all criteria')
        return

    print()
    print('Best Feature')
    print(best_feature)

    ensure_directory(out_dir)
    best_feature_file = out_dir / 'best_feature.txt'
    with open(best_feature_file, 'w') as f:
        f.write(f'''Name: {best_feature.name}
p-value: {best_feature[PVALUE_COLUMN_NAME]}
R^2: {best_feature[RSQUARED_COLUMN_NAME]}
''')

def main():
    argument_parser = ArgumentParser('vqcfim', description='Virtual Quality Control for Injection Molding')
    subparsers = argument_parser.add_subparsers(title='action')

    correlation_analysis_subparser = subparsers.add_parser('correlation-analysis', aliases=['ca'], description='Gets variables with correlation coefficients >= --correlation-coefficient')
    correlation_analysis_subparser.add_argument(TRAIN_DATA_ARG, TRAIN_DATA_ARG_SHORT, action='store', type=Path, required=True)
    correlation_analysis_subparser.add_argument(OUT_DIR_ARG, OUT_DIR_ARG_SHORT, action='store', type=Path, required=False, default=DEFAULT_OUT_DIR)
    correlation_analysis_subparser.add_argument(CORRELATION_THRESHOLD_ARG, CORRELATION_THRESHOLD_ARG_SHORT, action='store', type=float, required=False, default=DEFAULT_CORRELATION_THRESHOLD)
    correlation_analysis_subparser.set_defaults(func=lambda train_data, out, correlation_threshold, func: correlation_analysis(train_data, out, correlation_threshold))

    best_single_feature_regression_subparser = subparsers.add_parser('best-single-feature-regression', aliases=['bsfr'], description='Evaluates the best single feature regression feature from the dataset')
    best_single_feature_regression_subparser.add_argument(TRAIN_DATA_ARG, TRAIN_DATA_ARG_SHORT, action='store', type=Path, required=True)
    best_single_feature_regression_subparser.add_argument(TARGET_ARG, action='store', type=str, required=True)
    best_single_feature_regression_subparser.add_argument(OUT_DIR_ARG, OUT_DIR_ARG_SHORT, action='store', type=Path, required=False, default=DEFAULT_OUT_DIR)
    best_single_feature_regression_subparser.add_argument(P_VALUE_THRESHOLD_ARG, action='store', type=float, required=False, default=DEFAULT_P_VALUE_THRESHOLD)
    best_single_feature_regression_subparser.set_defaults(func=lambda train_data, target, out, p_value_threshold, func: best_single_feature_regression(train_data, target, p_value_threshold, out))

    parsed_args = argument_parser.parse_args()
    args = vars(parsed_args)
    parsed_args.func(**args)

if __name__ == "__main__":
    main()