From 9a4b4dde4b8ee836731b4a7b9935ff4ed2116f2a Mon Sep 17 00:00:00 2001
From: Andri Joos <andri@joos.io>
Date: Sat, 9 Nov 2024 01:16:34 +0100
Subject: [PATCH] split array columns into multiple columns

---
 app/preprocessing/transform_dataset.py | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/app/preprocessing/transform_dataset.py b/app/preprocessing/transform_dataset.py
index ee8e110..cbc0cb8 100644
--- a/app/preprocessing/transform_dataset.py
+++ b/app/preprocessing/transform_dataset.py
@@ -54,6 +54,18 @@ def _cast_columns(df: pd.DataFrame, column_type_mapping: Dict[str | int, str]) -
 
     return df
 
+def _split_array_column(df: pd.DataFrame) -> pd.DataFrame:
+    array_columns = [col for col in df.columns if isinstance(df[col].values[0], np.ndarray)] # Data is consistent in each row
+    for column in array_columns:
+        array_dtype = df[column].iloc[0].dtype # First row must have a value
+        stacked_arrays = np.stack(df[column].values, dtype=array_dtype) # is faster than df[column].apply(lambda vec: pd.Series(vec, dtype=array_dtype))
+        expanded_columns = pd.DataFrame(stacked_arrays, index=df.index)
+        expanded_columns.columns = [f'{column}_{i}' for i in range(expanded_columns.shape[1])]
+
+        df = df.join(expanded_columns).drop(columns=[column])
+
+    return df
+
 def _drop_non_shared_columns(df: pd.DataFrame, shared_columns: Set[str]) -> pd.DataFrame:
     columns_to_drop = [column for column in df.columns if str(column) not in shared_columns]
     df = df.drop(columns=columns_to_drop)
@@ -87,6 +99,9 @@ def _transform_parquet_file(
     # Parse columns
     df = _cast_columns(df, column_name_type_mapping)
 
+    # Split arrays
+    df = _split_array_column(df)
+
     print(f'Saving {filename}')
     df.to_parquet(out_dir / filename)
     # df.to_csv(out_dir / f'{file.stem}.csv')
-- 
GitLab