2.假设我们的增量数据存储在下表中(非Hudi格式,可以是Hive) 。
+---------+-------------+-----------------+---------------+-------------------+-------------------+|seller_id|prod_category|product_name |product_package|discount_percentage|eff_start_ts |+---------+-------------+-----------------+---------------+-------------------+-------------------+|1234 |Detergent |Tide 5L |6 |25 |2022-01-31 10:00:30||4565 |Gourmet |Dairy Milk Almond|12 |45 |2022-06-12 20:30:40||3345 |Stationary |Sticky Notes |4 |12 |2022-07-09 21:30:45|+---------+-------------+-----------------+---------------+-------------------+-------------------+
- 现在让我们通过对目标表进行Left Anti Join过滤掉增量表中的所有 Insert only 记录 。
val updFileDf = spark.read.option("header",true).csv("gs://target_bucket/hudi_product_catalog/hudi_product_update.csv")val tgtHudiDf = spark.sql("select * from hudi_product_catalog")hudiTableData.createOrReplaceTempView("hudiTable")//Cast as neededval stgDf = updFileDf.withColumn("eff_start_ts",to_timestamp(col("eff_start_ts"))).withColumn("seller_id",col("seller_id").cast("int"))//Prepare an insert DF from incremental temp DFval instmpDf = stgDf.as("stg") .join(tgtHudiDf.as("tgt"), col("stg.seller_id") === col("tgt.seller_id") && col("stg.prod_category") === col("tgt.prod_category"),"left_anti").select("stg.*")val insDf = instmpDf.withColumn("eff_end_ts",to_timestamp(lit("9999-12-31 23:59:59"))).withColumn("actv_ind",lit(1))insDf.show(false)+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category|product_name|product_package|discount_percentage| eff_start_ts| eff_end_ts|actv_ind|+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+| 3345| Stationary|Sticky Notes| 4| 12|2022-07-09 21:30:45|9999-12-31 23:59:59| 1|+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+
- 我们有一个只插入记录的DataFrame 。接下来让我们创建一个DataFrame,其中将包含来自 delta 表和目标表的属性,并在目标上使用内连接,它将获取需要更新的记录 。
经验总结扩展阅读
-
-
泡沫说情感|心理学:凭感觉选一朵蘑菇,测你未来会靠什么发家致富?
-
-
熊猫医学社|40岁男子转氨酶280,确诊肝硬化,医生咆哮:3个习惯,是“祸根”
-
马丽|马丽也逃不过“中年发福”?穿白衬衫身材圆润不少,连气质都变了
-
2022年农历九月廿三祭祀灶神吉日 2022年10月18日适合祭祀灶神吗
-
2023年农历八月初四宜认干儿子吗 2023年9月18日认干儿子好不好
-
-
-
-
鲍里斯·约翰逊|乌克兰本已准备好和谈,却遭英前首相约翰逊竭力阻拦
-
-
-
-
2022年12月3日求神拜佛好吗 2022年12月3日求神拜佛好不好
-
-
-
-
-
我是文食肆|人生不顺时,默念三句话,你会越来越强大!