加入收藏 | 设为首页 | 会员中心 | 我要投稿 惠州站长网 (https://www.0752zz.com.cn/)- 办公协同、云通信、物联设备、操作系统、高性能计算!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列(11) - Temporal Table JOIN

发布时间:2018-12-12 23:19:28 所属栏目:教程 来源:孙金城
导读:副标题#e# 一、什么是Temporal Table 在《Apache Flink 漫谈系列 - JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家详细介绍什么是Temporal Table JOIN。 在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先

Temporal Table的概念旨在简化此类查询,加速它们的执行。Temporal Table是Append Only表上的参数化视图,它把Append Only的表变化解释为表的Changelog,并在特定时间点提供该表的版本(时间版本)。将Applend Only表解释为changelog需要指定主键属性和时间戳属性。主键确定覆盖哪些行,,时间戳确定行有效的时间,也就是数据版本,与上面SQL Server示例的有效期的概念一致。

在上面的示例中,currency是RatesHistory表的主键,而rowtime是timestamp属性。

2. 如何定义Temporal Table

在Apache Flink中扩展了TableFunction的接口,在TableFunction接口的基础上添加了时间属性和pk属性。

(1) 内部TemporalTableFunction定义如下:

  1. class TemporalTableFunction private( 
  2. @transient private val underlyingHistoryTable: Table, 
  3. // 时间属性,相当于版本信息 
  4. private val timeAttribute: Expression, 
  5. // 主键定义 
  6. private val primaryKey: String, 
  7. private val resultType: RowTypeInfo) 
  8. extends TableFunction[Row] { 
  9. ...} 

(2) 用户创建TemporalTableFunction方式

在Table中添加了createTemporalTableFunction方法,该方法需要传入时间属性和主键,接口定义如下:

  1. // Creates TemporalTableFunction backed up by this table as a history table. 
  2.  
  3. def createTemporalTableFunction( 
  4. timeAttribute: Expression, 
  5. primaryKey: Expression): TemporalTableFunction = { 
  6. ...} 

用户通过如下方式调用就可以得到一个TemporalTableFunction的实例,代码如下:

  1. val tab = ... 
  2. val temporalTableFunction = tab.createTemporalTableFunction('time, 'pk) 
  3. ... 

3. 案例代码

(1) 需求描述

假设我们有一张订单表Orders和一张汇率表Rates,那么订单来自于不同的地区,所以支付的币种各不一样,那么假设需要统计每个订单在下单时候Yen币种对应的金额。

(编辑:惠州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

推荐文章
    热点阅读