Flink中Connectors如何连接RabbitMq
这篇文章给大家分享的是有关Flink中Connectors如何连接RabbitMq的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
创新互联服务紧随时代发展步伐,进行技术革新和技术进步,经过十多年的发展和积累,已经汇集了一批资深网站策划师、设计师、专业的网站实施团队以及高素质售后服务人员,并且完全形成了一套成熟的业务流程,能够完全依照客户要求对网站进行成都网站制作、网站建设、外贸网站建设、建设、维护、更新和改版,实现客户网站对外宣传展示的首要目的,并为客户企业品牌互联网化提供全面的解决方案。
通过使用Flink DataStream Connectors 数据流连接器连接到RabbitMq消息队列中间件,并提供数据流输入与输出操作;
示例环境
java.version: 1.8.xflink.version: 1.11.1rabbitMq:3.5.7
示例数据源 (项目码云下载)
Flink 系例 之 搭建开发环境与数据
示例模块 (pom.xml)
Flink 系例 之 DataStream Connectors 与 示例模块
数据流输入
DataStreamSource.java
package com.flink.examples.rabbitmq;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
/**
* @Description 从MQ中获取数据并输出到DataStream流中
*/
public class DataStreamSource {
/**
* 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("127.0.0.1")
.setPort(5672)
.setUserName("admin")
.setPassword("admin")
.setVirtualHost("datastream")
.build();
final DataStream stream = env
.addSource(new RMQSource( connectionConfig, "test", true, new SimpleStringSchema()))
.setParallelism(1);
stream.print();
env.execute("flink rabbitMq source");
}
} 数据流输出
DataStreamSink.java
package com.flink.examples.rabbitmq;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSink;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;
/**
* @Description 将DataStream流中的数据输出到rabbitMq队列中
*/
public class DataStreamSink {
/**
* 官方文档:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/connectors/rabbitmq.html
*/
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
.setHost("127.0.0.1")
.setPort(5672)
.setUserName("admin")
.setPassword("admin")
.setVirtualHost("datastream")
.build();
String [] words = new String[]{"props","student","build","name","execute"};
final DataStream stream = env.fromElements(words);
stream.addSink(new RMQSink(connectionConfig,"test",new SimpleStringSchema()));
env.execute("flink rabbitMq sink");
}
} 数据展示

感谢各位的阅读!关于“Flink中Connectors如何连接RabbitMq”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
网站栏目:Flink中Connectors如何连接RabbitMq
标题路径:http://www.jxjierui.cn/article/jogdpe.html


咨询
建站咨询
