2.假设我们的增量数据存储在下表中(非Hudi格式,可以是Hive) 。
+---------+-------------+-----------------+---------------+-------------------+-------------------+|seller_id|prod_category|product_name |product_package|discount_percentage|eff_start_ts |+---------+-------------+-----------------+---------------+-------------------+-------------------+|1234 |Detergent |Tide 5L |6 |25 |2022-01-31 10:00:30||4565 |Gourmet |Dairy Milk Almond|12 |45 |2022-06-12 20:30:40||3345 |Stationary |Sticky Notes |4 |12 |2022-07-09 21:30:45|+---------+-------------+-----------------+---------------+-------------------+-------------------+
- 现在让我们通过对目标表进行Left Anti Join过滤掉增量表中的所有 Insert only 记录 。
val updFileDf = spark.read.option("header",true).csv("gs://target_bucket/hudi_product_catalog/hudi_product_update.csv")val tgtHudiDf = spark.sql("select * from hudi_product_catalog")hudiTableData.createOrReplaceTempView("hudiTable")//Cast as neededval stgDf = updFileDf.withColumn("eff_start_ts",to_timestamp(col("eff_start_ts"))).withColumn("seller_id",col("seller_id").cast("int"))//Prepare an insert DF from incremental temp DFval instmpDf = stgDf.as("stg") .join(tgtHudiDf.as("tgt"), col("stg.seller_id") === col("tgt.seller_id") && col("stg.prod_category") === col("tgt.prod_category"),"left_anti").select("stg.*")val insDf = instmpDf.withColumn("eff_end_ts",to_timestamp(lit("9999-12-31 23:59:59"))).withColumn("actv_ind",lit(1))insDf.show(false)+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+|seller_id|prod_category|product_name|product_package|discount_percentage| eff_start_ts| eff_end_ts|actv_ind|+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+| 3345| Stationary|Sticky Notes| 4| 12|2022-07-09 21:30:45|9999-12-31 23:59:59| 1|+---------+-------------+------------+---------------+-------------------+-------------------+-------------------+--------+
- 我们有一个只插入记录的DataFrame 。接下来让我们创建一个DataFrame,其中将包含来自 delta 表和目标表的属性,并在目标上使用内连接,它将获取需要更新的记录 。
经验总结扩展阅读
-
-
-
-
202010月10日流星雨几点 是什麽星座的流星雨
-
不入虎穴焉得虎子是什么意思 不入虎穴焉得虎子的解释
-
仲念念|#情感#观点系列插图:电视剧《微微一笑很倾城》剧照01.|你爱的人和爱你的人,你更愿意和谁结婚?
-
保湿|这些保湿修复乳液补水保湿、细致毛孔、提亮肤色,你喜欢哪款?
-
-
-
-
-
-
-
-
李渊驾崩前留给李世民的遗言是什么,为什么李世民要当耳旁风呢?
-
-
-
-
-
哥哥按时打钱给老人养老|哥哥按时打钱给老人养老,弟弟赖在家不出门,老人却埋怨哥哥不孝