Storm-kafka中如何理解ZkCoordinator的過(guò)程,針對(duì)這個(gè)問(wèn)題,這篇文章詳細(xì)介紹了相對(duì)應(yīng)的分析和解答,希望可以幫助更多想解決這個(gè)問(wèn)題的小伙伴找到更簡(jiǎn)單易行的方法。
為剛察等地區(qū)用戶提供了全套網(wǎng)頁(yè)設(shè)計(jì)制作服務(wù),及剛察網(wǎng)站建設(shè)行業(yè)解決方案。主營(yíng)業(yè)務(wù)為網(wǎng)站制作、成都做網(wǎng)站、剛察網(wǎng)站設(shè)計(jì),以傳統(tǒng)方式定制建設(shè)網(wǎng)站,并提供域名空間備案等一條龍服務(wù),秉承以專業(yè)、用心的態(tài)度為用戶提供真誠(chéng)的服務(wù)。我們深信只要達(dá)到每一位用戶的要求,就會(huì)得到認(rèn)可,從而選擇與我們長(zhǎng)期合作。這樣,我們也可以走得更遠(yuǎn)!
梳理ZkCoordinator的過(guò)程
package com.mixbox.storm.kafka; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.mixbox.storm.kafka.trident.GlobalPartitionInformation; import java.util.*; import static com.mixbox.storm.kafka.KafkaUtils.taskId; /** * * * ZKCoordinator 協(xié)調(diào)器 * * @author Yin Shuai */ public class ZkCoordinator implements PartitionCoordinator { public static final Logger LOG = LoggerFactory .getLogger(ZkCoordinator.class); SpoutConfig _spoutConfig; int _taskIndex; int _totalTasks; String _topologyInstanceId; // 每一個(gè)分區(qū)對(duì)應(yīng)著一個(gè)分區(qū)管理器 Map<Partition, PartitionManager> _managers = new HashMap(); //緩存的List List<PartitionManager> _cachedList; //上次刷新的時(shí)間 Long _lastRefreshTime = null; //刷新頻率 毫秒 int _refreshFreqMs; //動(dòng)態(tài)分區(qū)連接 DynamicPartitionConnections _connections; //動(dòng)態(tài)BrokersReader DynamicBrokersReader _reader; ZkState _state; Map _stormConf; /** * * @param connections * 動(dòng)態(tài)的 分區(qū)連接 * @param stormConf * Storm的配置文件 * @param spoutConfig * Storm sput的配置文件 * @param state * 對(duì)于ZKState的連接 * @param taskIndex * 任務(wù) * @param totalTasks * 總共的任務(wù) * @param topologyInstanceId * 拓?fù)涞膶?shí)例ID */ public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) { this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig)); } public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) { _spoutConfig = spoutConfig; _connections = connections; _taskIndex = taskIndex; _totalTasks = totalTasks; _topologyInstanceId = topologyInstanceId; _stormConf = stormConf; _state = state; ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts; _refreshFreqMs = brokerConf.refreshFreqSecs * 1000; _reader = reader; } /** * @param stormConf * @param spoutConfig * @return */ private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) { ZkHosts hosts = (ZkHosts) spoutConfig.hosts; return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic); } @Override public List<PartitionManager> getMyManagedPartitions() { if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) { refresh(); _lastRefreshTime = System.currentTimeMillis(); } return _cachedList; } /** * 簡(jiǎn)單的刷新的行為 * */ void refresh() { try { LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections"); // 拿到所有的分區(qū)信息 GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo(); // 拿到自己任務(wù)的所有分區(qū) List<Partition> mine = KafkaUtils.calculatePartitionsForTask( brokerInfo, _totalTasks, _taskIndex); // 拿到當(dāng)前任務(wù)的分區(qū) Set<Partition> curr = _managers.keySet(); // 構(gòu)造一個(gè)集合 Set<Partition> newPartitions = new HashSet<Partition>(mine); // 在new分區(qū)中,移除掉所有 自己擁有的分區(qū) newPartitions.removeAll(curr); // 要?jiǎng)h除的分區(qū) Set<Partition> deletedPartitions = new HashSet<Partition>(curr); // deletedPartitions.removeAll(mine); LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString()); for (Partition id : deletedPartitions) { PartitionManager man = _managers.remove(id); man.close(); } LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString()); for (Partition id : newPartitions) { PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id); _managers.put(id, man); } } catch (Exception e) { throw new RuntimeException(e); } _cachedList = new ArrayList<PartitionManager>(_managers.values()); LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing"); } @Override public PartitionManager getManager(Partition partition) { return _managers.get(partition); } }
1 : 首先 ZKCoorDinator 實(shí)現(xiàn) PartitionCoordinator的接口
package com.mixbox.storm.kafka; import java.util.List; /** * @author Yin Shuai */ public interface PartitionCoordinator { /** * 拿到我管理的分區(qū)列表 List{PartitionManager} * @return */ List<PartitionManager> getMyManagedPartitions(); /** * @param 依據(jù)制定的分區(qū)partition,去getManager * @return */ PartitionManager getManager(Partition partition); }
第一個(gè)方法拿到所有的 PartitionManager
第二個(gè)方法依據(jù)特定的 Partition去得到一個(gè)分區(qū)管理器
關(guān)于 Storm-kafka中如何理解ZkCoordinator的過(guò)程問(wèn)題的解答就分享到這里了,希望以上內(nèi)容可以對(duì)大家有一定的幫助,如果你還有很多疑惑沒(méi)有解開(kāi),可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識(shí)。
本文題目:Storm-kafka中如何理解ZkCoordinator的過(guò)程
網(wǎng)頁(yè)URL:http://bm7419.com/article6/jjdjog.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供服務(wù)器托管、品牌網(wǎng)站建設(shè)、App設(shè)計(jì)、外貿(mào)建站、全網(wǎng)營(yíng)銷推廣、自適應(yīng)網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)