博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Storm-源码分析-Topology Submit-Client
阅读量:6590 次
发布时间:2019-06-24

本文共 5850 字,大约阅读时间需要 19 分钟。

1 Storm Client

最开始使用storm命令来启动topology, 如下

storm jar storm-starter-0.0.1-SNAPSHOT-standalone.jar storm.starter.WordCountTopology

这个storm命令是用python实现的, 看看其中的jar函数, 很简单, 调用exec_storm_class, 其中jvmtype="-client" 

而exec_storm_class其实就是拼出一条java执行命令, 然后用os.system(command)去执行, 为何用Python写, 简单? 可以直接使用storm命令? 
这儿的klass就是topology类, 所以java命令只是调用Topology类的main函数

def jar(jarfile, klass, *args):    """Syntax: [storm jar topology-jar-path class ...]Runs the main method of class with the specified arguments.The storm jars and configs in ~/.storm are put on the classpath.The process is configured so that StormSubmitter(http://nathanmarz.github.com/storm/doc/backtype/storm/StormSubmitter.html)will upload the jar at topology-jar-path when the topology is submitted."""    exec_storm_class(        klass,        jvmtype="-client",        extrajars=[jarfile, CONF_DIR, STORM_DIR + "/bin"],        args=args,        childopts="-Dstorm.jar=" + jarfile)def exec_storm_class(klass, jvmtype="-server", childopts="", extrajars=[], args=[]):    nativepath = confvalue("java.library.path", extrajars)    args_str = " ".join(map(lambda s: "\"" + s + "\"", args))    command = "java" + jvmtype + " -Dstorm.home=" + STORM_DIR + " " + get_config_opts() + " -Djava.library.path=" + nativepath + " " + childopts + " -cp" + get_classpath(extrajars) + " " + klass + " " + args_str    print "Running:" + command    os.system(command)

直接看看WordCountTopology例子的main函数都执行什么?

除了定义topology, 最终会调用StormSubmitter.submitTopology(args[0], conf, builder.createTopology()), 来提交topology

public static void main(String[] args) throws Exception {                TopologyBuilder builder = new TopologyBuilder();                builder.setSpout("spout", new RandomSentenceSpout(), 5);                builder.setBolt("split", new SplitSentence(), 8)                 .shuffleGrouping("spout");        builder.setBolt("count", new WordCount(), 12)                 .fieldsGrouping("split", new Fields("word"));        Config conf = new Config();        conf.setDebug(true);              if(args!=null && args.length > 0) {            conf.setNumWorkers(3);                        StormSubmitter.submitTopology(args[0], conf, builder.createTopology());        } else {                    conf.setMaxTaskParallelism(3);            LocalCluster cluster = new LocalCluster();            cluster.submitTopology("word-count", conf, builder.createTopology());               Thread.sleep(10000);            cluster.shutdown();        }    }

StormSubmitter

直接看看submitTopology, 

1. 配置参数 
   把命令行参数放在stormConf, 从conf/storm.yaml读取配置参数到conf, 再把stormConf也put到conf, 可见命令行参数的优先级更高 
   将stormConf转化为Json, 因为这个配置是要发送到服务器的

2. Submit Jar 

    StormSubmitter的本质是个Thrift Client, 而Nimbus则是Thrift Server, 所以所有的操作都是通过Thrift RPC来完成, Thrift参考, 
    先判断topologyNameExists, 通过Thrift client得到现在运行的topology的状况, 并check 
    然后Submit Jar, 通过底下三步          
    client.getClient().beginFileUpload(); 
    client.getClient().uploadChunk(uploadLocation, ByteBuffer.wrap(toSubmit)); 
    client.getClient().finishFileUpload(uploadLocation); 
    把数据通过RPC发过去, 具体怎么存是nimbus自己的逻辑的事...

3. Submit Topology 

    很简单只是简单的调用RPC 
    client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);

/**     * Submits a topology to run on the cluster. A topology runs forever or until      * explicitly killed.     *     *     * @param name the name of the storm.     * @param stormConf the topology-specific configuration. See {@link Config}.      * @param topology the processing to execute.     * @param options to manipulate the starting of the topology     * @throws AlreadyAliveException if a topology with this name is already running     * @throws InvalidTopologyException if an invalid topology was submitted     */    public static void submitTopology(String name, Map stormConf, StormTopology topology, SubmitOptions opts) throws AlreadyAliveException, InvalidTopologyException {        if(!Utils.isValidConf(stormConf)) {            throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");        }        stormConf = new HashMap(stormConf);        stormConf.putAll(Utils.readCommandLineOpts());        Map conf = Utils.readStormConfig();        conf.putAll(stormConf);        try {            String serConf = JSONValue.toJSONString(stormConf);            if(localNimbus!=null) {                LOG.info("Submitting topology " + name + " in local mode");                localNimbus.submitTopology(name, null, serConf, topology);            } else {                NimbusClient client = NimbusClient.getConfiguredClient(conf);                if(topologyNameExists(conf, name)) {                    throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");                }                submitJar(conf);                try {                    LOG.info("Submitting topology " +  name + " in distributed mode with conf " + serConf);                    if(opts!=null) {                        client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);                                        } else {                        // this is for backwards compatibility                        client.getClient().submitTopology(name, submittedJar, serConf, topology);                                                                }                } catch(InvalidTopologyException e) {                    LOG.warn("Topology submission exception", e);                    throw e;                } catch(AlreadyAliveException e) {                    LOG.warn("Topology already alive exception", e);                    throw e;                } finally {                    client.close();                }            }            LOG.info("Finished submitting topology: " +  name);        } catch(TException e) {            throw new RuntimeException(e);        }    }
 
本文章摘自博客园,原文发布日期:2013-06-05

转载地址:http://pquio.baihongyu.com/

你可能感兴趣的文章
深入理解Java内存模型(五)——锁
查看>>
Chalubo僵尸网络来袭 IOT设备或将受到DDoS攻击
查看>>
如何实现百万TPS?详解JMQ4的存储设计
查看>>
这么说吧,NIO很简单,其实就是个牛逼IO
查看>>
使用Python快速获取公众号文章定制电子书(二)
查看>>
iOS下JS与OC互相调用(七)--Cordova 基础
查看>>
Three.js 关于立方体贴图产生边缘锯齿问题
查看>>
Nacos v0.7.0:对接CMDB,实现基于标签的服务发现能力
查看>>
【开发问题记录①】关于滑动CollectionView时ContentSize变化的问题
查看>>
java中GC的基本概念
查看>>
building xxx gradle project info的解决办法
查看>>
Vagrant (一) - 基本知识
查看>>
在 CentOS 7 上搭建 Jenkins + Maven + Git 持续集成环境
查看>>
数据结构与算法 | Leetcode 19. Remove Nth Node From End of List
查看>>
一起来读you don't know javascript(一)
查看>>
[LeetCode] 862. Shortest Subarray with Sum at Least K
查看>>
【分享】终端命令工具 自动生成vue组件文件以及修改router.js
查看>>
[LeetCode] Student Attendance Record I
查看>>
PHP回顾之多进程编程
查看>>
spring boot + redis
查看>>