我们简单的将计算结果写入到Apache Flink内置支持的CSVSink中,定义Sink如下:
- def getCsvTableSink: TableSink[Row] = {
- val tempFile = File.createTempFile("csv_sink_", "tem")
- // 打印sink的文件路径,方便我们查看运行结果
- println("Sink path : " + tempFile)
- if (tempFile.exists()) {
- tempFile.delete()
- }
- new CsvTableSink(tempFile.getAbsolutePath).configure(
- Array[String]("region", "winStart", "winEnd", "pv"),
- Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))}
3. 构建主程序
主程序包括执行环境的定义,Source/Sink的注册以及统计查SQL的执行,具体如下:
- def main(args: Array[String]): Unit = {
- // Streaming 环境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- // 设置EventTime
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
- //方便我们查出输出数据
- env.setParallelism(1)
-
- val sourceTableName = "mySource"
- // 创建自定义source数据结构
- val tableSource = new MyTableSource
-
- val sinkTableName = "csvSink"
- // 创建CSV sink 数据结构
- val tableSink = getCsvTableSink
-
- // 注册source
- tEnv.registerTableSource(sourceTableName, tableSource)
- // 注册sink
- tEnv.registerTableSink(sinkTableName, tableSink)
-
- val sql =
- "SELECT " +
- " region, " +
- " TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," +
- " TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " +
- " FROM mySource " +
- " GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region"
-
- tEnv.sqlQuery(sql).insertInto(sinkTableName);
- env.execute()
- }
4. 执行并查看运行结果 (编辑:惠州站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|