加入收藏 | 设为首页 | 会员中心 | 我要投稿 惠州站长网 (https://www.0752zz.com.cn/)- 办公协同、云通信、物联设备、操作系统、高性能计算!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列 - SQL概览

发布时间:2018-11-18 23:35:03 所属栏目:教程 来源:孙金城
导读:副标题#e# 一、SQL简述 SQL是Structured Query Language的缩写,最初是由美国计算机科学家Donald D. Chamberlin和Raymond F. Boyce在20世纪70年代早期从 Early History of SQL 中了解关系模型后在IBM开发的。该版本最初称为[SEQUEL: A Structured English Q

b. 使用

  1. ... 
  2. val fun = new MyCount() 
  3. tEnv.registerFunction("myCount", fun) 
  4. val sql = "SELECT myCount(c) FROM MyTable GROUP BY a" 
  5. ... 

十三、Source&Sink

上面我们介绍了Apache Flink SQL核心算子的语法及语义,这部分将选取Bounded EventTime Tumble Window为例为大家编写一个完整的包括Source和Sink定义的Apache Flink SQL Job。假设有一张淘宝页面访问表(PageAccess_tab),有地域,用户ID和访问时间。我们需要按不同地域统计每2分钟的淘宝首页的访问量(PV). 具体数据如下:

Apache Flink 漫谈系列 - SQL概览

1. Source 定义

自定义Apache Flink Stream Source需要实现StreamTableSource, StreamTableSource中通过StreamExecutionEnvironment 的addSource方法获取DataStream, 所以我们需要自定义一个 SourceFunction, 并且要支持产生WaterMark,也就是要实现DefinedRowtimeAttributes接口。

(1) Source Function定义

支持接收携带EventTime的数据集合,Either的数据结构,Right表示WaterMark和Left表示数据:

  1. class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]]) 
  2. extends SourceFunction[T] { 
  3. override def run(ctx: SourceContext[T]): Unit = { 
  4. dataWithTimestampList.foreach { 
  5. case Left(t) => ctx.collectWithTimestamp(t._2, t._1) 
  6. case Right(w) => ctx.emitWatermark(new Watermark(w)) 
  7. override def cancel(): Unit = ???} 

(2) 定义 StreamTableSource

(编辑:惠州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

推荐文章
    热点阅读