Storm中怎么使用DirectGrouping分组策略
这篇文章主要介绍“Storm中怎么使用Direct Grouping分组策略”,在日常操作中,相信很多人在Storm中怎么使用Direct Grouping分组策略问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Storm中怎么使用Direct Grouping分组策略”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

创新互联公司于2013年开始,是专业互联网技术服务公司,拥有项目网站设计、网站建设网站策划,项目实施与项目整合能力。我们以让每一个梦想脱颖而出为使命,1280元隆化做网站,已为上家服务,为隆化各地企业和个人服务,联系电话:13518219792
使用 Direct Grouping 分组策略,将首字母相同的单词发送给同一个task计数
数据源spout
package com.zhch.v3;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class SentenceSpout extends BaseRichSpout {
private FileReader fileReader = null;
private boolean completed = false;
private ConcurrentHashMap pending;
private SpoutOutputCollector collector;
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("sentence"));
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
this.collector = spoutOutputCollector;
this.pending = new ConcurrentHashMap();
try {
this.fileReader = new FileReader(map.get("wordsFile").toString());
} catch (Exception e) {
throw new RuntimeException("Error reading file [" + map.get("wordsFile") + "]");
}
}
@Override
public void nextTuple() {
if (completed) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
String line;
BufferedReader reader = new BufferedReader(fileReader);
try {
while ((line = reader.readLine()) != null) {
Values values = new Values(line);
UUID msgId = UUID.randomUUID();
this.pending.put(msgId, values);
this.collector.emit(values, msgId);
}
} catch (Exception e) {
throw new RuntimeException("Error reading tuple", e);
} finally {
completed = true;
}
}
@Override
public void ack(Object msgId) {
this.pending.remove(msgId);
}
@Override
public void fail(Object msgId) {
this.collector.emit(this.pending.get(msgId), msgId);
}
} 实现语句分割bolt
package com.zhch.v3;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.List;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
private List numCounterTasks;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
//获取下游bolt的taskId列表
this.numCounterTasks = topologyContext.getComponentTasks(WordCountTopology.COUNT_BOLT_ID);
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
Integer taskId = this.numCounterTasks.get(this.getWordCountIndex(word));
collector.emitDirect(taskId, tuple, new Values(word));
}
this.collector.ack(tuple);
}
public Integer getWordCountIndex(String word) {
word = word.trim().toUpperCase();
if (word.isEmpty())
return 0;
else {
//单词首字母对下游 bolt taskId 列表长度取余
return word.charAt(0) % numCounterTasks.size();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word"));
}
} 实现单词计数bolt
package com.zhch.v3;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
private OutputCollector collector;
private HashMap counts = null;
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
this.counts = new HashMap();
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Long count = this.counts.get(word);
if (count == null) {
count = 0L;
}
count++;
this.counts.put(word, count);
BufferedWriter writer = null;
try {
writer = new BufferedWriter(new FileWriter("/home/grid/stormData/result.txt"));
Iterator keys = this.counts.keySet().iterator();
while (keys.hasNext()) {
String w = keys.next();
Long c = this.counts.get(w);
writer.write(w + " : " + c);
writer.newLine();
writer.flush();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (writer != null) {
try {
writer.close();
} catch (Exception e) {
e.printStackTrace();
}
writer = null;
}
}
this.collector.ack(tuple);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word", "count"));
}
} 实现单词计数topology
package com.zhch.v3;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
public class WordCountTopology {
public static final String SENTENCE_SPOUT_ID = "sentence-spout";
public static final String SPLIT_BOLT_ID = "split-bolt";
public static final String COUNT_BOLT_ID = "count-bolt";
public static final String TOPOLOGY_NAME = "word-count-topology-v3";
public static void main(String[] args) throws Exception {
SentenceSpout spout = new SentenceSpout();
SplitSentenceBolt spiltBolt = new SplitSentenceBolt();
WordCountBolt countBolt = new WordCountBolt();
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(SENTENCE_SPOUT_ID, spout, 2);
builder.setBolt(SPLIT_BOLT_ID, spiltBolt, 2).setNumTasks(4)
.shuffleGrouping(SENTENCE_SPOUT_ID);
builder.setBolt(COUNT_BOLT_ID, countBolt, 2)
.directGrouping(SPLIT_BOLT_ID); //使用 Direct Grouping 分组策略
Config config = new Config();
config.put("wordsFile", args[0]);
if (args != null && args.length > 1) {
config.setNumWorkers(2);
//集群模式启动
StormSubmitter.submitTopology(args[1], config, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
try {
Thread.sleep(5 * 1000);
} catch (InterruptedException e) {
}
cluster.killTopology(TOPOLOGY_NAME);
cluster.shutdown();
}
}
}提交到Storm集群
storm jar Storm02-1.0-SNAPSHOT.jar com.zhch.v3.WordCountTopology /home/grid/stormData/input.txt word-count-topology-v3
运行结果:
[grid@hadoop5 stormData]$ cat result.txt second : 1 can : 1 set : 1 simple : 1 use : 2 unbounded : 1 used : 1 It : 1 Storm : 4 online : 1 cases: : 1 open : 1 Apache : 1 of : 2 over : 1 more : 1 clocked : 1 easy : 2 scalable : 1 any : 1 guarantees : 1 ETL : 1 million : 1 continuous : 1 is : 6 with : 1 it : 2 makes : 1 your : 1 a : 4 at : 1 machine : 1 analytics : 1 up : 1 and : 5 many : 1 system : 1 source : 1 what : 1 operate : 1 will : 1 computation : 2 streams : 1 [grid@hadoop6 stormData]$ cat result.txt to : 3 for : 2 data : 2 distributed : 2 has : 1 free : 1 programming : 1 reliably : 1 fast: : 1 processing : 2 be : 2 Hadoop : 1 did : 1 fun : 1 learning : 1 torm : 1 process : 1 RPC : 1 node : 1 processed : 2 per : 2 realtime : 3 benchmark : 1 batch : 1 doing : 1 lot : 1 language : 1 tuples : 1 fault-tolerant : 1
到此,关于“Storm中怎么使用Direct Grouping分组策略”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!
当前文章:Storm中怎么使用DirectGrouping分组策略
URL地址:http://www.jxjierui.cn/article/ieccjd.html


咨询
建站咨询
