flinksqlenv的定义
本篇内容介绍了“flinksql env的定义”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

创新互联公司主要从事成都网站设计、做网站、成都外贸网站建设公司、网页设计、企业做网站、公司建网站等业务。立足成都服务六安,10多年网站建设经验,价格优惠、服务专业,欢迎来电咨询建站服务:18982081108
1、编写 pom
4.0.0 org.example flinksqldemo 1.0-SNAPSHOT UTF-8 UTF-8 2.11 2.11.8 0.10.2.1 1.12.0 2.7.3 compile org.apache.maven.plugins maven-compiler-plugin 8 8 org.apache.flink flink-table-planner-blink_2.11 1.12.0 org.apache.flink flink-java ${flink.version} ${setting.scope} org.apache.flink flink-streaming-java_2.11 ${flink.version} ${setting.scope} org.apache.flink flink-clients_2.11 ${flink.version} ${setting.scope} org.apache.flink flink-connector-kafka-0.10_${scala.binary.version} ${flink.version} org.apache.flink flink-streaming-scala_${scala.binary.version} ${flink.version} ${setting.scope} org.apache.flink flink-connector-filesystem_${scala.binary.version} ${flink.version} org.apache.kafka kafka_${scala.binary.version} ${kafka.version} ${setting.scope} org.apache.hadoop hadoop-common ${hadoop.version} ${setting.scope} org.apache.hadoop hadoop-hdfs ${hadoop.version} ${setting.scope} org.apache.hadoop hadoop-client ${hadoop.version} ${setting.scope} org.slf4j slf4j-api 1.7.25 com.alibaba fastjson 1.2.72 redis.clients jedis 2.7.3 com.google.guava guava 29.0-jre
2、编写代码
package com.jd.data;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class FlinkTableApiDemo {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource stream = env.readTextFile("/Users/liuhaijing/Desktop/flinktestword/aaa.txt");
// 1、创建表执行环节
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// ==============================================
// 1.1 老版本planner的流式查询
EnvironmentSettings set = EnvironmentSettings.newInstance()
.useOldPlanner() //用老版本
.inStreamingMode() //流式处理
.build();
// 老版本的流式处理执行环境
StreamTableEnvironment oldStreamingEnv = StreamTableEnvironment.create(env, set);
// 1.2 老版本批处理环境
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment batchTableEnvironment = BatchTableEnvironment.create(executionEnvironment);
// =========================================================
// 1.3 blink 版本的流式查询
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamTableEnvironment blinkTableEnv = StreamTableEnvironment.create(env, settings);
// 1.4 blink 版本的批处理查询
EnvironmentSettings bsettings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment blinkBatchTableEnvironment = TableEnvironment.create(settings);
}
} “flinksql env的定义”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!
当前标题:flinksqlenv的定义
URL分享:http://www.jxjierui.cn/article/pjeeoh.html


咨询
建站咨询
