Skip to content
Snippets Groups Projects
Commit ca298235 authored by Andri Joos's avatar Andri Joos :blush:
Browse files

Merge branch 'abgabe' into 'master'

Abgabe

See merge request !1
parents b029d13c 8c5871fa
Branches master
No related tags found
1 merge request!1Abgabe
......@@ -9,12 +9,12 @@ from pyspark.ml.feature import VectorAssembler
import os
import re
# to set an entry null
def _set_null(item: str):
return None if item == "null" else item
def _move_hdfs_file(src: str, dest: str, spark: SparkSession):
# Very good, accessing via private variable over some jvm stuff!
# move a file on the hdfs
def _move_hdfs_file(src: str, dest: str, spark: SparkSession):
Path = spark.sparkContext._gateway.jvm.org.apache.hadoop.fs.Path
FileUtil = spark.sparkContext._gateway.jvm.org.apache.hadoop.fs.FileUtil
conf = spark.sparkContext._jsc.hadoopConfiguration()
......@@ -32,7 +32,7 @@ builder = SparkSession.builder.appName("LinearRegression")\
.config("spark.driver.memory", "4g")
spark: SparkSession = configure_spark_with_delta_pip(builder).getOrCreate()
# Read unstructured data, convert to structured, and save as CSV
# paths for all diffrent directiories
dataset_file_regex = "/dataset_rearranged/*.op"
namenode_dir = "hdfs://namenode:9000"
structured_dir = os.path.join(namenode_dir, "structured")
......@@ -43,8 +43,7 @@ lr_dir = os.path.join(delta_dir, "linear_regression")
structured_files = os.path.join(structured_dir, "*", "part-*")
delimiter = r'\s+'
# Parallelize the file processing
# splitting lines at whitespaces, creating columns
def structure_line(line: str):
splitted_line = re.split(delimiter, line)
......@@ -61,16 +60,18 @@ def structure_line(line: str):
]
return ",".join(str(v) for v in values)
#read and apply structure_line
def preprocess_file(file: str):
rdd = spark.sparkContext.textFile(file, minPartitions=81)
result_rdd = rdd.map(structure_line)
result_rdd = rdd.map(structure_line) # !Serialization optimization!
output_path = os.path.join(structured_dir, os.path.basename(file) + ".csv")
result_rdd.saveAsTextFile(output_path)
return output_path
# apply schema, check schema, move invalid data, reducing to relevant columns, calculate median
def process_files(folder: str):
files_path = os.path.join(folder, "*")
# Read CSV files from the "structured" directory
schema = StructType([
StructField("stn", IntegerType(), nullable=False),
StructField("wban", IntegerType(), nullable=False),
......@@ -101,8 +102,7 @@ def process_files(folder: str):
)
invalid_paths = invalid_data.select("filename").dropDuplicates(["filename"])
# kind of udf, must be optimized
for file in invalid_paths.collect(): # does not work with invalid_paths.foreach() since spark resources are accessed
for file in invalid_paths.collect():
path = file["filename"]
_move_hdfs_file(path, quarantine_dir, spark)
......@@ -114,6 +114,7 @@ def process_files(folder: str):
structured_data = structured_data.withColumn("date", F.to_date(F.col("yearmoda"), "yyyyMMdd")).drop("yearmoda")
medians_pre_df = structured_data.select("stn", "temp", F.year("date").alias("year"))
# !spill optimization! because of mmissing repartition
medians_per_station_per_year = medians_pre_df.groupBy("stn", "year").agg(F.median("temp").alias("median_temp"))
medians_per_year = medians_per_station_per_year.groupBy("year").agg(F.median("median_temp").alias("median"))
medians_per_year = medians_per_year.select("year", "median")
......@@ -122,6 +123,7 @@ def process_files(folder: str):
preprocessed_path = preprocess_file(dataset_file_regex)
medians_per_year = process_files(preprocessed_path)
# create delta table
DeltaTable.createIfNotExists(spark).addColumns(medians_per_year.schema).location(year_medians_dir).execute()
yearly_median_table = DeltaTable.forPath(spark, year_medians_dir)
yearly_median_table.alias("oldData").merge(medians_per_year.alias("newData"), "oldData.year = newData.year")\
......@@ -138,10 +140,12 @@ lr_data = lr_data.select("year_vec", "median")
lr = LinearRegression(featuresCol="year_vec", labelCol="median")
lr_model: LinearRegressionModel = lr.fit(lr_data)
# new schema for linear regression
lr_schema = StructType([
StructField("coefficient", FloatType(), nullable=True),
StructField("intercept", FloatType(), nullable=True)
])
# saving results of linear regression
lr_df = spark.createDataFrame([(lr_model.coefficients[0].item(), lr_model.intercept)], schema=lr_schema) # since lr only has one coefficient, lr_model.coefficients[0] is fine
lr_df.write.format("delta").mode("overwrite").save(lr_dir)
......@@ -5,6 +5,7 @@ import shutil
dataset_years = "/dataset/gsod_all_years/*"
dataset_output = "/dataset_rearranged"
# saving all stations in one file per year, instead of one file per station per year
year_folders = glob.glob(dataset_years)
year_folders.sort()
for folder in year_folders:
......
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment