Incremental Loading using Spark Partition
We can achieve the incremental loading using the spark data partition.
I will be demonstrating with a simple example here, and you can you according to the business need.
Let's consider the following data frame.(This could be your csv, or any formatted file, and this is the first data you will be processing.) Read it using spark.
# read csv file
df=spark.read.format("csv").option("header",True).load("s3-file-location")
Here we will be partitioning the data using date key, so in this case, I will be using the "date" column to partition the data. Since id=105 has null date, we will partition it by keeping null value as "NULL_DATE".
df = df.withColumn("date_key",
when(col("date").isNull(), lit("NULL_DATE")).otherwise(date_format(col("date"),'yyyyMM')))
Now write the df data to s3 location:
# Lets partition the data with date_key and write
df.write.format("parquet") \
.partitionBy("date_key") \
.mode("overwrite") \
.save("your-output-s3-location")
This will partitioned by date_key:
Now, reading the delta file, let's say it has the following data:
So in delta file, we received one updated data from previous file's data and two new data. id = 101, has been updated in the delta file.
We will be updating the id = 101, by overwriting the old data with new data from delta file.
First, create a same partitionid "date_key" in the delta dataframe.
# create a view of delta
delta = delta.withColumn("date_key",
when(col("date").isNull(), lit("NULL_DATE")).otherwise(date_format(col("date"),'yyyyMM')))
delta.createOrReplaceTempView("delta")
Now read the parquet file of the main csv file we just created before the delta df, and filter the record based on the "date_key" available on the delta df.
# read the parquet file of main csv file and filter those same date_key from delta df
dfmain = spark.read.format("parquet").load("s3-file-location") \
.filter(col("date_key").isin(delta.select("date_key").rdd.flatMap(lambda x: x).collect()))
dfmain.createOrReplaceTempView("dfmain")
If delta df consist the record from already partitioned keys, then dfmain will show as:
Since delta df consists of date_key = ['202306', 'NULL_DATE'], we are getting three records from dfmain frame.
Now we will be removing the old data from dfmain, which we need to update from delta frame.
uniondf = spark.sql("""
(select * from dfmain
minus
select dfmain.* from dfmain as dfmain
join delta as dlt on dfmain.date_key = dlt.date_key and dfmain.id = dlt.id
)
union
select * from delta
""")
uniondf.createOrReplaceTempView("uniondf")
Now we will write the data in the same location where we parqueted the main file data.
# Setting the partitionOverwriteMode as DYNAMIC
spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
spark.conf.get("spark.sql.sources.partitionOverwriteMode")
# Lets write to the same location using partition key
uniondf.write.format("parquet") \
.partitionBy("date_key") \
.mode("overwrite") \
.save("your-output-s3-location")
Now this time we need to set the partitionOverwirteMode to "dynamic" for overwriting only to those data which has been updated or new.
This way, we can achieve the delta loading of the data using spark partitioning.