Ad

How To Create Modified Date Column On Pyspark Table?

I need to make delta table in Pyspark with 'modifiedDate' column. when row updates modifiedDate's value should change to current time. I thought about making a trigger, but couldn't find any information about using them in Pyspark with delta tables. how should I go about this?

Ad

Answer

There is no such thing as trigger in Delta tables. You're doing all modifications using the code, and you need to use code to add this column when doing the MERGE operation. Overall workflow may look as following:

  • you have data without modifiedDate column
  • you add this column setting it to the current timestamp
  • you're performing UPSERT (update of existing & insert of new) of data using the MERGE operation. For that you need to define a primary key that will be used to identify unique records.

Something like this (not tested)

import pyspark.sql.functions as F
from delta.tables import *

# this is your input data
df = ...
# add a column to it
input = df.withColumn("modifiedDate", F.current_timestamp())

destination = DeltaTable.forPath(spark, '<path-to-data>')
destination.alias('dest') \
  .merge(
    input.alias('updates'),
    # use 'and' if you have multiple columns in primary key
    'dest.id = updates.id'  
  ) \
  .whenMatchedUpdateAll() \
  .whenNotMatchedInsertAll() \
  .execute()
Ad
source: stackoverflow.com
Ad