# Initialize Input/Output bucket variable. Use the S3 output bucket name that was created during setup step ex: athena-spark-datastore.

In [None]:
NOAA_OUTPUT_BUCKET="athena-spark-datastore"
noaa_csv_prefix = f"s3://{NOAA_OUTPUT_BUCKET}/noaa_data_csv/"
output_prefix = f"s3://{NOAA_OUTPUT_BUCKET}/noaa_data_parquet/"


# sparkmagic SQL configs - do not modify
spark.conf.set('table.name', 'noaa_data_parquet')
spark.conf.set('table.location', output_prefix)


In [None]:
import sys
from datetime import datetime

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import concat

# Load data for from the S3 path

In [None]:
df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(noaa_csv_prefix)

In [None]:
df.printSchema()
print(df.show(10))

In [None]:
# Select specific columns and exclude missing values
subset_df = df \
.select("DATE", "STATION", "WND") \
.filter(F.split(df.WND, ",")[3] != '9999')

In [None]:
subset_df.printSchema()
print(subset_df.show(10))

In [None]:
# Parse year and wind speed - scaled backed from the raw data
wind_date_df = subset_df \
.withColumn("wind_speed", F.split(subset_df.WND, ",")[3].cast(DoubleType())/10 ) \
.withColumn("measurement_year", F.year(subset_df.DATE))\
.select("station", "measurement_year", "wind_speed")

In [None]:
wind_date_df.printSchema()
print(wind_date_df.show(10))

In [None]:
# Find yearly min, avg and max wind speed for each location 
agg_wind_df = wind_date_df \
.groupBy("station","measurement_year") \
.agg(F.min(wind_date_df.wind_speed).alias("min_wind_speed"),\
F.avg(wind_date_df.wind_speed).alias("avg_wind_speed"),\
F.max(wind_date_df.wind_speed).alias("max_wind_speed")\
)

In [None]:
agg_wind_df.printSchema()
print(agg_wind_df.show(10))

In [None]:
# Writing the file output to your local S3 bucket
current_time = datetime.now().strftime('%Y-%m-%d-%H-%M')

agg_wind_df \
    .write \
    .mode("overwrite") \
    .format("parquet") \
    .partitionBy("station") \
    .save(output_prefix)

print("Finished writing NOAA data out to: ", output_prefix)


# Create table in glue data catalog so we can also query data using Athena Query Editor.

# NOTE: Remember to load partitions in Athena Query Editor.


In [None]:
%%sql
create table if not exists default.${table.name}(
          measurement_year int,
          min_wind_speed double,
          avg_wind_speed double,
          max_wind_speed double)
    partitioned by (station bigint)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    location '${table.location}'
