这篇文章给大家分享的是有关Spark2.x中如何实现SparkStreaming消费Kafka实例的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
在成都做网站、成都网站设计、成都外贸网站建设中从网站色彩、结构布局、栏目设置、关键词群组等细微处着手,突出企业的产品/服务/品牌,帮助企业锁定精准用户,提高在线咨询和转化,使成都网站营销成为有效果、有回报的无锡营销推广。创新互联专业成都网站建设十余年了,客户满意度97.8%,欢迎成都创新互联客户联系。
软件软件:
spark版本是apache spark2.2.0
kafka版本是kafka0.10.0
采用Direct Approach的方式来融合Spark Streaming和Kafka。没有采用Receiver-Based的方式。后续我会专门整理一篇文章分析两种融合方式不同。
1.kafka数据准备:
创建kafka的topic命令:
/usr/hdp/2.6.3.0-235/kafka/bin/kafka-topics.sh --zookeeper salver158.hadoop.unicom:2181,salver31.hadoop.unicom:2181,salver32.hadoop.unicom:2181 -topic kafkawordcount -replication-factor 2 -partitions 2 -create

发送数据命令:
/usr/hdp/2.6.3.0-235/kafka/bin/kafka-console-producer.sh --zookeeper salver158.hadoop.unicom:2181,salver31.hadoop.unicom:2181,salver32.hadoop.unicom:2181 -topic kafkawordcount

2.代码实例:
package com.unicom.ljs.spark220.study.streaming;import org.apache.kafka.clients.consumer.ConsumerRecord;import org.apache.kafka.common.TopicPartition;import org.apache.spark.SparkConf;import org.apache.spark.streaming.Durations;import org.apache.spark.streaming.api.java.JavaInputDStream;import org.apache.spark.streaming.api.java.JavaPairDStream;import org.apache.spark.streaming.api.java.JavaStreamingContext;import org.apache.spark.streaming.kafka010.ConsumerStrategies;import org.apache.spark.streaming.kafka010.KafkaUtils;import org.apache.spark.streaming.kafka010.LocationStrategies;import scala.Tuple2;import java.util.*;/*** @author: Created By lujisen* @company ChinaUnicom Software JiNan* @date: 2020-01-31 20:30* @version: v1.0* @description: com.unicom.ljs.spark220.study.streaming*/public class KafkaStreamingWordCount {public static void main(String[] args) throws InterruptedException {SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("KafkaStreamingWordCount");JavaStreamingContext ssc=new JavaStreamingContext(sparkConf, Durations.seconds(5));String topic="kafkawordcount";Collectiontopics = new HashSet<>(); topics.add(topic);//kafka相关参数,其他参数可自行百度String brokerList = "10.124.165.31:6667,10.124.165.32:6667";Mapprops = new HashMap<>(); props.put("bootstrap.servers", brokerList);props.put("group.id", "groupLjs1");props.put("auto.offset.reset", "earliest");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");/*指定kafka中topic的消费分区*/Mapoffsets = new HashMap<>(); offsets.put(new TopicPartition(topic, 0), 0L);offsets.put(new TopicPartition(topic, 1), 0L);//通过KafkaUtils.createDirectStream指定kafka数据源// 三个参数 1 sparkcontext 2.LocationStrategies.PreferConsistent,如上所示。这将在可用执行程序之间均匀分配分区 3,订阅kafka 的配置JavaInputDStream> lines = KafkaUtils.createDirectStream( ssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, props, offsets));JavaPairDStreamcounts =lines.flatMap( x -> Arrays.asList(x.value().toString().split(" ")).iterator()).mapToPair(x -> new Tuple2(x, 1)).reduceByKey((x, y) -> x + y); /*打印结果*/counts.print();/*启动*/ssc.start();ssc.awaitTermination();/*停止*/ssc.close();}}
3.数据统计展示:

感谢各位的阅读!关于“Spark2.x中如何实现SparkStreaming消费Kafka实例”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!
网站名称:Spark2.x中如何实现SparkStreaming消费Kafka实例
链接分享:http://www.jxjierui.cn/article/pecdgo.html


咨询
建站咨询
