加入收藏 | 设为首页 | 会员中心 | 我要投稿 惠州站长网 (https://www.0752zz.com.cn/)- 办公协同、云通信、物联设备、操作系统、高性能计算!
当前位置: 首页 > 教程 > 正文

Apache Flink 漫谈系列 - SQL概览

发布时间:2018-11-18 23:35:03 所属栏目:教程 来源:孙金城
导读:副标题#e# 一、SQL简述 SQL是Structured Query Language的缩写,最初是由美国计算机科学家Donald D. Chamberlin和Raymond F. Boyce在20世纪70年代早期从 Early History of SQL 中了解关系模型后在IBM开发的。该版本最初称为[SEQUEL: A Structured English Q

我们自定义的Source要携带我们测试的数据,以及对应WaterMark数据,具体如下:

  1. class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes { 
  2.  
  3. val fieldNames = Array("accessTime", "region", "userId") 
  4. val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING)) 
  5. val rowType = new RowTypeInfo( 
  6. Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], 
  7. fieldNames) 
  8.  
  9. // 页面访问表数据 rows with timestamps and watermarks 
  10. val data = Seq( 
  11. Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")), 
  12. Right(1510365660000L), 
  13. Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")), 
  14. Right(1510365660000L), 
  15. Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")), 
  16. Right(1510366200000L), 
  17. Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")), 
  18. Right(1510366260000L), 
  19. Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")), 
  20. Right(1510373400000L) 
  21.  
  22. override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = { 
  23. Collections.singletonList(new RowtimeAttributeDescriptor( 
  24. "accessTime", 
  25. new ExistingField("accessTime"), 
  26. PreserveWatermarks.INSTANCE)) 
  27.  
  28. override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { 
  29. execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType) 
  30.  
  31. override def getReturnType: TypeInformation[Row] = rowType 
  32.  
  33. override def getTableSchema: TableSchema = schema 
  34.  

2. Sink 定义

(编辑:惠州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

推荐文章
    热点阅读