-
Andri Joos authoredf216d713
app.py 6.87 KiB
from argparse import ArgumentParser
from pathlib import Path
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import math
from typing import List, Tuple
import statsmodels.api as sm
TRAIN_DATA_ARG = '--train-data'
TRAIN_DATA_ARG_SHORT = '-t'
OUT_DIR_ARG = '--out'
OUT_DIR_ARG_SHORT = '-o'
DEFAULT_OUT_DIR = 'out/'
CORRELATION_THRESHOLD_ARG = '--correlation-threshold'
CORRELATION_THRESHOLD_ARG_SHORT = '-c'
DEFAULT_CORRELATION_THRESHOLD = 0.9
TARGET_ARG = '--target'
P_VALUE_THRESHOLD_ARG = '--p-value-threshold'
DEFAULT_P_VALUE_THRESHOLD = 0.05
PVALUE_COLUMN_NAME = 'p-value'
RSQUARED_COLUMN_NAME = 'R^2'
def ensure_directory(directory: Path):
directory.mkdir(parents=True, exist_ok=True)
def correlation_analysis(train_data_file: Path, out_dir: Path, correlation_threshold: float):
# Load training and test data
train_data = pd.read_csv(train_data_file)
# Display correlation matrix
correlation_matrix = train_data.corr()
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', vmin=-1, vmax=1)
plt.title('Correlation Matrix')
ensure_directory(out_dir)
correlation_matrix_file = out_dir / 'correlation_matrix.png'
plt.savefig(correlation_matrix_file, bbox_inches='tight')
# Identify highly correlated variables
high_correlation_matrix = correlation_matrix[correlation_matrix.abs() >= correlation_threshold] # outputs a matrix, where all values < correlation_threshold are NaN and values >= correlation_threshold are their correlation
print('High correlation matrix')
print(high_correlation_matrix)
high_correlations: List[Tuple[str, str, int]] = []
for i in range(len(correlation_matrix.index)):
for j in range(i):
correlation_value: float = high_correlation_matrix.iloc[i, j]
if not math.isnan(correlation_value):
high_correlations.append((correlation_matrix.index[i], correlation_matrix.columns[j], correlation_value))
if not high_correlations:
print("No high correlations found.")
else:
correlations = [f'{column}/{row}: {correlation_value:.5f}' for column, row, correlation_value in high_correlations]
print(f'High correlations found in {', '.join(correlations)}')
ensure_directory(out_dir)
correlations_file = out_dir / "correlations.txt"
with open(correlations_file, 'w') as f:
f.writelines(correlations)
def single_feature_regression(data: pd.DataFrame, feature: str, target: str):
X = sm.add_constant(data[[feature]]) # Add constant for intercept
y = data[target]
model = sm.OLS(y, X).fit()
return model.pvalues.iloc[1], model.rsquared
def best_single_feature_regression(train_data_file: Path, target: str, p_value_threshold: float, out_dir: Path):
train_data = pd.read_csv(train_data_file)
features = train_data.columns
features = features.drop(target)
evaluated_features = pd.DataFrame({
PVALUE_COLUMN_NAME: pd.Series(dtype='float'),
RSQUARED_COLUMN_NAME: pd.Series(dtype='float'),
})
for feature in features:
pvalue, rsquared = single_feature_regression(train_data, feature, target)
evaluated_features.loc[feature] = {PVALUE_COLUMN_NAME: pvalue, RSQUARED_COLUMN_NAME: rsquared}
print('Evaluated features')
print(evaluated_features)
plt.figure(figsize=(1.75, 4.8))
evaluated_pvalues = evaluated_features[[PVALUE_COLUMN_NAME]]
sns.heatmap(evaluated_pvalues, annot=True, cmap='coolwarm', vmin=0, vmax=1)
ensure_directory(out_dir)
evaluated_pvalues_file_path = out_dir / "evaluated_pvalues.png"
plt.savefig(evaluated_pvalues_file_path, bbox_inches='tight')
plt.figure(figsize=(1.75, 4.8))
evaluated_rsquares = evaluated_features[[RSQUARED_COLUMN_NAME]]
sns.heatmap(evaluated_rsquares, annot=True, cmap='coolwarm', vmin=0, vmax=1)
ensure_directory(out_dir)
evaluated_rsquares_file_path = out_dir / "evaluated_rsquares.png"
plt.savefig(evaluated_rsquares_file_path, bbox_inches='tight')
possible_features = evaluated_features.where(evaluated_features[PVALUE_COLUMN_NAME] < p_value_threshold).dropna()
possible_features = possible_features.sort_values(RSQUARED_COLUMN_NAME, ascending=False)
print()
print('Features matching p-value-threshold')
print(possible_features)
best_feature: pd.Series = None
if possible_features.shape[0] > 0:
best_feature = possible_features.iloc[0]
else:
print('No feature meets all criteria')
return
print()
print('Best Feature')
print(best_feature)
ensure_directory(out_dir)
best_feature_file = out_dir / 'best_feature.txt'
with open(best_feature_file, 'w') as f:
f.write(f'''Name: {best_feature.name}
p-value: {best_feature[PVALUE_COLUMN_NAME]}
R^2: {best_feature[RSQUARED_COLUMN_NAME]}
''')
def main():
argument_parser = ArgumentParser('vqcfim', description='Virtual Quality Control for Injection Molding')
subparsers = argument_parser.add_subparsers(title='action')
correlation_analysis_subparser = subparsers.add_parser('correlation-analysis', aliases=['ca'], description='Gets variables with correlation coefficients >= --correlation-coefficient')
correlation_analysis_subparser.add_argument(TRAIN_DATA_ARG, TRAIN_DATA_ARG_SHORT, action='store', type=Path, required=True)
correlation_analysis_subparser.add_argument(OUT_DIR_ARG, OUT_DIR_ARG_SHORT, action='store', type=Path, required=False, default=DEFAULT_OUT_DIR)
correlation_analysis_subparser.add_argument(CORRELATION_THRESHOLD_ARG, CORRELATION_THRESHOLD_ARG_SHORT, action='store', type=float, required=False, default=DEFAULT_CORRELATION_THRESHOLD)
correlation_analysis_subparser.set_defaults(func=lambda train_data, out, correlation_threshold, func: correlation_analysis(train_data, out, correlation_threshold))
best_single_feature_regression_subparser = subparsers.add_parser('best-single-feature-regression', aliases=['bsfr'], description='Evaluates the best single feature regression feature from the dataset')
best_single_feature_regression_subparser.add_argument(TRAIN_DATA_ARG, TRAIN_DATA_ARG_SHORT, action='store', type=Path, required=True)
best_single_feature_regression_subparser.add_argument(TARGET_ARG, action='store', type=str, required=True)
best_single_feature_regression_subparser.add_argument(OUT_DIR_ARG, OUT_DIR_ARG_SHORT, action='store', type=Path, required=False, default=DEFAULT_OUT_DIR)
best_single_feature_regression_subparser.add_argument(P_VALUE_THRESHOLD_ARG, action='store', type=float, required=False, default=DEFAULT_P_VALUE_THRESHOLD)
best_single_feature_regression_subparser.set_defaults(func=lambda train_data, target, out, p_value_threshold, func: best_single_feature_regression(train_data, target, p_value_threshold, out))
parsed_args = argument_parser.parse_args()
args = vars(parsed_args)
parsed_args.func(**args)
if __name__ == "__main__":
main()