(二) storm的基本使用

2019-03-02 23:40|来源: 网路

SimpleTopology.java

View Code
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;

/**
 * Hello world!
 *
 */
public class SimpleTopology
{
    public static void main( String[] args ) throws Exception
    {
        TopologyBuilder topologyBuilder=new TopologyBuilder();
        topologyBuilder.setSpout("simple-spout", new SimpleSpout(),1);
        topologyBuilder.setBolt("simple-bilt",new SimpleBolt1(), 3).shuffleGrouping("simple-spout");
        topologyBuilder.setBolt("wordcounter", new SimpleBolt2(), 3).fieldsGrouping("simple-bilt", new Fields("info"));
        topologyBuilder.setBolt("word-to-upper", new SimpleBolt4(),5).shuffleGrouping("simple-spout");
        topologyBuilder.setBolt("store", new SimpleBolt3(),10).shuffleGrouping("word-to-upper");
        Config config=new Config();
        config.setDebug(true);
        if(null!=args&&args.length>0){
            //使用集群模式运行
            config.setNumWorkers(1);
            StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
        }
        else{
            //使用本地模式运行
            config.setMaxTaskParallelism(1);
            LocalCluster cluster=new LocalCluster();
            cluster.submitTopology("simple", config, topologyBuilder.createTopology());
        }
    }
}

 

SimpleSpout.java

View Code
import java.util.List;
import java.util.Map;

import redis.clients.jedis.Jedis;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

public class SimpleSpout extends BaseRichSpout{

    /**
     *
     */
    private static final long serialVersionUID = -6335251364034714629L;
    private SpoutOutputCollector collector;
    private Jedis jedis;
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("source"));
    }

    @SuppressWarnings("rawtypes")
    public void open(Map conf, TopologyContext context,
            SpoutOutputCollector collector) {
            this.collector=collector;
            jedis=new Jedis("192.168.180.101", 6379);
    }

    public void nextTuple() {
          List<String> messages=jedis.brpop(3600,"msg_queue");
          if(!messages.isEmpty()){
              for (String msg : messages) {
                  collector.emit(new Values(msg));
            }
          }
    }

}

 

SimpleBolt1.java

View Code
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class SimpleBolt1 extends BaseBasicBolt {

    /**
     *
     */
    private static final long serialVersionUID = -5266922733759958473L;

    public void execute(Tuple input, BasicOutputCollector collector) {
        String message=input.getString(0);
        if(null!=message.trim()){
            collector.emit(new Values(message));
        }
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("info"));
    }

}

 

SimpleBolt2.java

View Code
import java.util.HashMap;
import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class SimpleBolt2 extends BaseBasicBolt {

    /**
     *
     */
    private static final long serialVersionUID = 2246728833921545676L;
    Integer id;
    String name;
    Map<String, Integer> counters;

    @SuppressWarnings("rawtypes")
    public void prepare(Map stormConf, TopologyContext context) {
        this.counters=new HashMap<String, Integer>();
        this.name=context.getThisComponentId();
        this.id=context.getThisTaskId();
        System.out.println(String.format("componentId:%s",this.name));
    }

    public void execute(Tuple input, BasicOutputCollector collector) {
        String word=input.getString(0);
        if(counters.containsKey(word)){
            Integer c=counters.get(word);
            counters.put(word, c+1);
        }
        else{
            counters.put(word, 1);
        }
        collector.emit(new Values(word,counters.get(word)));
        System.out.println(String.format("stats result is:%s:%s", word,counters.get(word)));
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word","count"));
    }

}

 

SimpleBolt3.java

View Code
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;

public class SimpleBolt3 extends BaseBasicBolt{

    /**
     *
     */
    private static final long serialVersionUID = 9140971206523366543L;

    public void execute(Tuple input, BasicOutputCollector collector) {
        String word=input.getString(0);
        StoreDatabase.insertRow(word);
    }

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
    }

}

 

StoreDatabase.java

View Code
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;

public class StoreDatabase {
    public static Connection connection;
    public static Statement stmt;
    static {
        String dbDriver = "com.mysql.jdbc.Driver";
        String dbUrl = "jdbc:mysql://192.168.187.16/blog";
        String user = "zhxia";
        String password = "admin";
        try {
            Class.forName(dbDriver);

        } catch (ClassNotFoundException ex) {
            ex.printStackTrace();
        }
        try {
            connection = DriverManager.getConnection(dbUrl, user, password);
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    public static int insertRow(String word){
        int effectRows=0;
        String sql=String.format("insert into words(word)values('%s')", word);
        try{
            stmt=connection.createStatement();
            effectRows=stmt.executeUpdate(sql);
            stmt.close();
        }
        catch (SQLException e) {
            e.printStackTrace();
            System.err.println("数据插入失败");
        }
        return effectRows;
    }
}

 

SimpleBolt4.java

View Code
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class SimpleBolt4  extends BaseBasicBolt{

    /**
     *
     */
    private static final long serialVersionUID = -8025390241512976224L;

    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    public void execute(Tuple input, BasicOutputCollector collector) {
        String word=input.getString(0);
        if(null!=word&&word.trim()!=""){
            String upper=word.trim().toUpperCase();
            System.out.println(String.format("upper word is:%s", upper));
            collector.emit(new Values(upper));
        }
    }

}

pom.xml

View Code
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.haozu.app</groupId>
    <artifactId>app-storm</artifactId>
    <packaging>jar</packaging>
    <version>1.0</version>
    <name>app-storm</name>
    <url>http://maven.apache.org</url>
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>storm</groupId>
            <artifactId>storm</artifactId>
            <scope>provided</scope>
            <version>0.8.1</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.21</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.0.0</version>
            <type>jar</type>
            <scope>compile</scope>
        </dependency>
    </dependencies>
    <repositories>
        <repository>
            <id>haozu</id>
            <name>haozu repositories</name>
            <url>http://nexus.dev.haozu.com:10010/nexus/content/groups/public</url>
            <layout>default</layout>
        </repository>
    </repositories>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                    <archive>
                        <manifest>
                            <mainClass>com.haozu.app.SimpleTopology</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

打包:

mvn clean assembly:assembly -Dmaven.test.skip=true

提交作业并运行:

本地模式运行:storm jar target/app-storm-1.0-jar-with-dependencies.jar  com.haozu.app.SimpleTopology 

集群模式运行:storm jar target/app-storm-1.0-jar-with-dependencies.jar  com.haozu.app.SimpleTopology "test"

 


转自:http://www.cnblogs.com/xiazh/archive/2013/03/04/2827732

相关问答

更多
  • Hi: 下列还有三个短语相同意思: cook up a storm烹饪上露一手 dance up a storm舞姿翩翩 talk up a storm 侃侃而谈 [MAINLY US INFORMAL:主要用于美国,非正式用语] to do something with a lot of energy and often skill:[以非凡的能力和技能做某事] Rob was in the kitchen cooking up a storm. Rob在厨房里大显身手. (from Cambridge ...
  • 当下雨的时候,风暴都到了
  • Storm是什么文件[2023-06-19]

    Storm译为汉语即‘暴风雨’、“暴风雪”,是暴风影音软件的英文名,是一种媒体播放器。   Storm还是一个分布式的、容错的实时计算系统,由BackType开发,广泛用于进行实时日志处理,实时统计、实时风控、实时推荐等场景中,目前最新版本是Storm 0.8.0。   Storm还是外文歌曲的名字,具体可在百度音乐中搜索。
  • 在风暴谷歌集团找到答案。 似乎DRCP拓扑将发出一个元组,其中包含由DRCP spout作为流接收的参数,然后在处理完成时指示回(使用称为请求ID的唯一标识)。 在同一个线程中,hadoop可能最适合这些情况,除非数据不够大,并且可以一直处理。 Found the answer in the storm google group. Seems that DRCP topologies will emit a tuple with parameters that is received by DRCP spo ...
  • 回答你的问题有点难,因为Zookeeper,Maven和Nginx都做了很多不同的事情,但我会尽我所能。 Zookeeper是运行Storm的一部分。 它会跟踪通过Storm拓扑运行的当前事务。 Maven是JVM生态系统中常见的构建管理工具。 你需要这个来构建东西。 在某些情况下,Nginx可能会使用Storm拓扑,但对于所有用例来说肯定不是必需的或共同的。 因此,您可能需要项目中的所有三个,您肯定需要Zookeeper,并且几乎肯定会以某种方式与Maven交互,即使您在其他地方使用Ant或Leineng ...
  • 关于加缪,是的,启动工作的调度员应该工作。 他们在LinkedIn上使用的是Azkaban,你也可以看一下。 如果在另一个完成之前启动,则会读取一些数据量两次。 由于第二个作业将从第一个作业使用的相同偏移开始读取。 关于加缪与S3,目前我不认为这已经到位。 Kafka actually retains events for a configurable period of time -- events are not purged immediately upon consumption like othe ...
  • 好吧,没有办法像我想要的那样快速压缩。 但是我找到了解决办法,如果有人需要,我可以在这里分享。 这个问题不仅与Storm相关,而且是一个更一般的Hadoop问题。 我的所有数据都是使用HdfsBolt写入的: RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|"); //Synchronize data buffer with the filesystem every 1000 tuples ...
  • 让我们说你的风暴在/ home / your / download / storm解压缩 我将其定义为env变量 STORM_HOME="/home/your/download/storm" 因此您可以通过以下方式导出此变量 export STORM_HOME 不要忘记导入路径: export PATH=$PATH:$STORM_HOME/bin 然后你可以在其中使用storm命令。 Let us say your storm is unpacked at /home/your/download/storm ...
  • 您必须在所有螺栓中ack所有传入的元组,即将collector.ack(input)添加到TransformerBolt.execute(Tuple input) 。 您看到的日志消息是正确的:您的代码调用collector.ack(...)并记录此调用。 拓扑中对ack的调用不是对Spout.ack(...)的调用:每次Spout发出带有消息ID的元组时,此ID都会由拓扑的正在运行的ackers注册。 那些ackers会在Bolt的每个ack上收到一条消息,收集这些消息并在收到元组的所有ack时通知Spo ...
  • 关于消息ID是通用的:在内部它可能是一个64位值,但是这个64位值是作为Spout内emit()提供的msgID对象的散列来计算的。 因此,您可以将任何对象作为消息标识(两个对象散列到相同值的概率接近于零)。 关于使用str :我认为在这个例子中, str包含一行(而不是一个单词),并且文档包含两次完全相同的行(如果没有空行可能很多)是不太可能的。 关于计数器作为消息ID:对于你的观察你是绝对正确的 - 如果多个喷嘴并行运行,这会给消息ID冲突并破坏容错。 如果你想“修复”计数器方法,每个计数器应该以不同的 ...