diff --git a/linear_regression.py b/linear_regression.py index 3babcbf66498e74c26857f97de8a77b14ba03be1..80a60ff4fc9f347da823293333158cddac8b797d 100644 --- a/linear_regression.py +++ b/linear_regression.py @@ -9,12 +9,12 @@ from pyspark.ml.feature import VectorAssembler import os import re - +# to set an entry null def _set_null(item: str): return None if item == "null" else item -def _move_hdfs_file(src: str, dest: str, spark: SparkSession): - # Very good, accessing via private variable over some jvm stuff! +# move a file on the hdfs +def _move_hdfs_file(src: str, dest: str, spark: SparkSession): Path = spark.sparkContext._gateway.jvm.org.apache.hadoop.fs.Path FileUtil = spark.sparkContext._gateway.jvm.org.apache.hadoop.fs.FileUtil conf = spark.sparkContext._jsc.hadoopConfiguration() @@ -32,7 +32,7 @@ builder = SparkSession.builder.appName("LinearRegression")\ .config("spark.driver.memory", "4g") spark: SparkSession = configure_spark_with_delta_pip(builder).getOrCreate() -# Read unstructured data, convert to structured, and save as CSV +# paths for all diffrent directiories dataset_file_regex = "/dataset_rearranged/*.op" namenode_dir = "hdfs://namenode:9000" structured_dir = os.path.join(namenode_dir, "structured") @@ -43,8 +43,7 @@ lr_dir = os.path.join(delta_dir, "linear_regression") structured_files = os.path.join(structured_dir, "*", "part-*") delimiter = r'\s+' - -# Parallelize the file processing +# splitting lines at whitespaces, creating columns def structure_line(line: str): splitted_line = re.split(delimiter, line) @@ -61,16 +60,18 @@ def structure_line(line: str): ] return ",".join(str(v) for v in values) +#read and apply structure_line def preprocess_file(file: str): rdd = spark.sparkContext.textFile(file, minPartitions=81) - result_rdd = rdd.map(structure_line) + result_rdd = rdd.map(structure_line) # !Serialization optimization! output_path = os.path.join(structured_dir, os.path.basename(file) + ".csv") result_rdd.saveAsTextFile(output_path) return output_path +# apply schema, check schema, move invalid data, reducing to relevant columns, calculate median def process_files(folder: str): files_path = os.path.join(folder, "*") - # Read CSV files from the "structured" directory + schema = StructType([ StructField("stn", IntegerType(), nullable=False), StructField("wban", IntegerType(), nullable=False), @@ -101,8 +102,7 @@ def process_files(folder: str): ) invalid_paths = invalid_data.select("filename").dropDuplicates(["filename"]) - # kind of udf, must be optimized - for file in invalid_paths.collect(): # does not work with invalid_paths.foreach() since spark resources are accessed + for file in invalid_paths.collect(): path = file["filename"] _move_hdfs_file(path, quarantine_dir, spark) @@ -114,6 +114,7 @@ def process_files(folder: str): structured_data = structured_data.withColumn("date", F.to_date(F.col("yearmoda"), "yyyyMMdd")).drop("yearmoda") medians_pre_df = structured_data.select("stn", "temp", F.year("date").alias("year")) + # !spill optimization! because of mmissing repartition medians_per_station_per_year = medians_pre_df.groupBy("stn", "year").agg(F.median("temp").alias("median_temp")) medians_per_year = medians_per_station_per_year.groupBy("year").agg(F.median("median_temp").alias("median")) medians_per_year = medians_per_year.select("year", "median") @@ -122,6 +123,7 @@ def process_files(folder: str): preprocessed_path = preprocess_file(dataset_file_regex) medians_per_year = process_files(preprocessed_path) +# create delta table DeltaTable.createIfNotExists(spark).addColumns(medians_per_year.schema).location(year_medians_dir).execute() yearly_median_table = DeltaTable.forPath(spark, year_medians_dir) yearly_median_table.alias("oldData").merge(medians_per_year.alias("newData"), "oldData.year = newData.year")\ @@ -138,10 +140,12 @@ lr_data = lr_data.select("year_vec", "median") lr = LinearRegression(featuresCol="year_vec", labelCol="median") lr_model: LinearRegressionModel = lr.fit(lr_data) +# new schema for linear regression lr_schema = StructType([ StructField("coefficient", FloatType(), nullable=True), StructField("intercept", FloatType(), nullable=True) ]) +# saving results of linear regression lr_df = spark.createDataFrame([(lr_model.coefficients[0].item(), lr_model.intercept)], schema=lr_schema) # since lr only has one coefficient, lr_model.coefficients[0] is fine lr_df.write.format("delta").mode("overwrite").save(lr_dir) diff --git a/rearrange_dataset.py b/rearrange_dataset.py index 642c1a238efdea72ac5597bb0b7e42e39de3ccf4..b5ba2853026e87a337d1d6f621db12821148f773 100644 --- a/rearrange_dataset.py +++ b/rearrange_dataset.py @@ -5,6 +5,7 @@ import shutil dataset_years = "/dataset/gsod_all_years/*" dataset_output = "/dataset_rearranged" +# saving all stations in one file per year, instead of one file per station per year year_folders = glob.glob(dataset_years) year_folders.sort() for folder in year_folders: