Storm-kafka中如何理解ZkCoordinator的过程
Storm-kafka中如何理解ZkCoordinator的过程,针对这个问题,这篇文章详细介绍了相对应的分析和解答,希望可以帮助更多想解决这个问题的小伙伴找到更简单易行的方法。

为刚察等地区用户提供了全套网页设计制作服务,及刚察网站建设行业解决方案。主营业务为网站制作、成都做网站、刚察网站设计,以传统方式定制建设网站,并提供域名空间备案等一条龙服务,秉承以专业、用心的态度为用户提供真诚的服务。我们深信只要达到每一位用户的要求,就会得到认可,从而选择与我们长期合作。这样,我们也可以走得更远!
梳理ZkCoordinator的过程
package com.mixbox.storm.kafka;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.mixbox.storm.kafka.trident.GlobalPartitionInformation;
import java.util.*;
import static com.mixbox.storm.kafka.KafkaUtils.taskId;
/**
*
*
* ZKCoordinator 协调器
*
* @author Yin Shuai
*/
public class ZkCoordinator implements PartitionCoordinator {
public static final Logger LOG = LoggerFactory
.getLogger(ZkCoordinator.class);
SpoutConfig _spoutConfig;
int _taskIndex;
int _totalTasks;
String _topologyInstanceId;
// 每一个分区对应着一个分区管理器
Map _managers = new HashMap();
//缓存的List
List _cachedList;
//上次刷新的时间
Long _lastRefreshTime = null;
//刷新频率 毫秒
int _refreshFreqMs;
//动态分区连接
DynamicPartitionConnections _connections;
//动态BrokersReader
DynamicBrokersReader _reader;
ZkState _state;
Map _stormConf;
/**
*
* @param connections
* 动态的 分区连接
* @param stormConf
* Storm的配置文件
* @param spoutConfig
* Storm sput的配置文件
* @param state
* 对于ZKState的连接
* @param taskIndex
* 任务
* @param totalTasks
* 总共的任务
* @param topologyInstanceId
* 拓扑的实例ID
*/
public ZkCoordinator(DynamicPartitionConnections connections,
Map stormConf, SpoutConfig spoutConfig, ZkState state,
int taskIndex, int totalTasks, String topologyInstanceId) {
this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks,
topologyInstanceId, buildReader(stormConf, spoutConfig));
}
public ZkCoordinator(DynamicPartitionConnections connections,
Map stormConf, SpoutConfig spoutConfig, ZkState state,
int taskIndex, int totalTasks, String topologyInstanceId,
DynamicBrokersReader reader) {
_spoutConfig = spoutConfig;
_connections = connections;
_taskIndex = taskIndex;
_totalTasks = totalTasks;
_topologyInstanceId = topologyInstanceId;
_stormConf = stormConf;
_state = state;
ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
_refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
_reader = reader;
}
/**
* @param stormConf
* @param spoutConfig
* @return
*/
private static DynamicBrokersReader buildReader(Map stormConf,
SpoutConfig spoutConfig) {
ZkHosts hosts = (ZkHosts) spoutConfig.hosts;
return new DynamicBrokersReader(stormConf, hosts.brokerZkStr,
hosts.brokerZkPath, spoutConfig.topic);
}
@Override
public List getMyManagedPartitions() {
if (_lastRefreshTime == null
|| (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
refresh();
_lastRefreshTime = System.currentTimeMillis();
}
return _cachedList;
}
/**
* 简单的刷新的行为
*
*/
void refresh() {
try {
LOG.info(taskId(_taskIndex, _totalTasks)
+ "Refreshing partition manager connections");
// 拿到所有的分区信息
GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
// 拿到自己任务的所有分区
List mine = KafkaUtils.calculatePartitionsForTask(
brokerInfo, _totalTasks, _taskIndex);
// 拿到当前任务的分区
Set curr = _managers.keySet();
// 构造一个集合
Set newPartitions = new HashSet(mine);
// 在new分区中,移除掉所有 自己拥有的分区
newPartitions.removeAll(curr);
// 要删除的分区
Set deletedPartitions = new HashSet(curr);
//
deletedPartitions.removeAll(mine);
LOG.info(taskId(_taskIndex, _totalTasks)
+ "Deleted partition managers: "
+ deletedPartitions.toString());
for (Partition id : deletedPartitions) {
PartitionManager man = _managers.remove(id);
man.close();
}
LOG.info(taskId(_taskIndex, _totalTasks)
+ "New partition managers: " + newPartitions.toString());
for (Partition id : newPartitions) {
PartitionManager man = new PartitionManager(_connections,
_topologyInstanceId, _state, _stormConf, _spoutConfig,
id);
_managers.put(id, man);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
_cachedList = new ArrayList(_managers.values());
LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
}
@Override
public PartitionManager getManager(Partition partition) {
return _managers.get(partition);
}
} 1 : 首先 ZKCoorDinator 实现 PartitionCoordinator的接口
package com.mixbox.storm.kafka;
import java.util.List;
/**
* @author Yin Shuai
*/
public interface PartitionCoordinator {
/**
* 拿到我管理的分区列表 List{PartitionManager}
* @return
*/
List getMyManagedPartitions();
/**
* @param 依据制定的分区partition,去getManager
* @return
*/
PartitionManager getManager(Partition partition);
} 第一个方法拿到所有的 PartitionManager
第二个方法依据特定的 Partition去得到一个分区管理器
关于 Storm-kafka中如何理解ZkCoordinator的过程问题的解答就分享到这里了,希望以上内容可以对大家有一定的帮助,如果你还有很多疑惑没有解开,可以关注创新互联行业资讯频道了解更多相关知识。
本文名称:Storm-kafka中如何理解ZkCoordinator的过程
标题网址:http://www.jxjierui.cn/article/jjdjog.html


咨询
建站咨询
