知识点
相关文章
更多最近更新
更多(二) storm的基本使用
2019-03-02 23:40|来源: 网路
SimpleTopology.java
View Code
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.TopologyBuilder; import backtype.storm.tuple.Fields; /** * Hello world! * */ public class SimpleTopology { public static void main( String[] args ) throws Exception { TopologyBuilder topologyBuilder=new TopologyBuilder(); topologyBuilder.setSpout("simple-spout", new SimpleSpout(),1); topologyBuilder.setBolt("simple-bilt",new SimpleBolt1(), 3).shuffleGrouping("simple-spout"); topologyBuilder.setBolt("wordcounter", new SimpleBolt2(), 3).fieldsGrouping("simple-bilt", new Fields("info")); topologyBuilder.setBolt("word-to-upper", new SimpleBolt4(),5).shuffleGrouping("simple-spout"); topologyBuilder.setBolt("store", new SimpleBolt3(),10).shuffleGrouping("word-to-upper"); Config config=new Config(); config.setDebug(true); if(null!=args&&args.length>0){ //使用集群模式运行 config.setNumWorkers(1); StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology()); } else{ //使用本地模式运行 config.setMaxTaskParallelism(1); LocalCluster cluster=new LocalCluster(); cluster.submitTopology("simple", config, topologyBuilder.createTopology()); } } }
SimpleSpout.java
View Code
import java.util.List; import java.util.Map; import redis.clients.jedis.Jedis; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; public class SimpleSpout extends BaseRichSpout{ /** * */ private static final long serialVersionUID = -6335251364034714629L; private SpoutOutputCollector collector; private Jedis jedis; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("source")); } @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector=collector; jedis=new Jedis("192.168.180.101", 6379); } public void nextTuple() { List<String> messages=jedis.brpop(3600,"msg_queue"); if(!messages.isEmpty()){ for (String msg : messages) { collector.emit(new Values(msg)); } } } }
SimpleBolt1.java
View Code
import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class SimpleBolt1 extends BaseBasicBolt { /** * */ private static final long serialVersionUID = -5266922733759958473L; public void execute(Tuple input, BasicOutputCollector collector) { String message=input.getString(0); if(null!=message.trim()){ collector.emit(new Values(message)); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("info")); } }
SimpleBolt2.java
View Code
import java.util.HashMap; import java.util.Map; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class SimpleBolt2 extends BaseBasicBolt { /** * */ private static final long serialVersionUID = 2246728833921545676L; Integer id; String name; Map<String, Integer> counters; @SuppressWarnings("rawtypes") public void prepare(Map stormConf, TopologyContext context) { this.counters=new HashMap<String, Integer>(); this.name=context.getThisComponentId(); this.id=context.getThisTaskId(); System.out.println(String.format("componentId:%s",this.name)); } public void execute(Tuple input, BasicOutputCollector collector) { String word=input.getString(0); if(counters.containsKey(word)){ Integer c=counters.get(word); counters.put(word, c+1); } else{ counters.put(word, 1); } collector.emit(new Values(word,counters.get(word))); System.out.println(String.format("stats result is:%s:%s", word,counters.get(word))); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word","count")); } }
SimpleBolt3.java
View Code
import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; public class SimpleBolt3 extends BaseBasicBolt{ /** * */ private static final long serialVersionUID = 9140971206523366543L; public void execute(Tuple input, BasicOutputCollector collector) { String word=input.getString(0); StoreDatabase.insertRow(word); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
StoreDatabase.java
View Code
import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.sql.Statement; public class StoreDatabase { public static Connection connection; public static Statement stmt; static { String dbDriver = "com.mysql.jdbc.Driver"; String dbUrl = "jdbc:mysql://192.168.187.16/blog"; String user = "zhxia"; String password = "admin"; try { Class.forName(dbDriver); } catch (ClassNotFoundException ex) { ex.printStackTrace(); } try { connection = DriverManager.getConnection(dbUrl, user, password); } catch (SQLException e) { e.printStackTrace(); } } public static int insertRow(String word){ int effectRows=0; String sql=String.format("insert into words(word)values('%s')", word); try{ stmt=connection.createStatement(); effectRows=stmt.executeUpdate(sql); stmt.close(); } catch (SQLException e) { e.printStackTrace(); System.err.println("数据插入失败"); } return effectRows; } }
SimpleBolt4.java
View Code
import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class SimpleBolt4 extends BaseBasicBolt{ /** * */ private static final long serialVersionUID = -8025390241512976224L; public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } public void execute(Tuple input, BasicOutputCollector collector) { String word=input.getString(0); if(null!=word&&word.trim()!=""){ String upper=word.trim().toUpperCase(); System.out.println(String.format("upper word is:%s", upper)); collector.emit(new Values(upper)); } } }
pom.xml
View Code
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.haozu.app</groupId> <artifactId>app-storm</artifactId> <packaging>jar</packaging> <version>1.0</version> <name>app-storm</name> <url>http://maven.apache.org</url> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>storm</groupId> <artifactId>storm</artifactId> <scope>provided</scope> <version>0.8.1</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.21</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.0.0</version> <type>jar</type> <scope>compile</scope> </dependency> </dependencies> <repositories> <repository> <id>haozu</id> <name>haozu repositories</name> <url>http://nexus.dev.haozu.com:10010/nexus/content/groups/public</url> <layout>default</layout> </repository> </repositories> <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass>com.haozu.app.SimpleTopology</mainClass> </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>
打包:
mvn clean assembly:assembly -Dmaven.test.skip=true
提交作业并运行:
本地模式运行:storm jar target/app-storm-1.0-jar-with-dependencies.jar com.haozu.app.SimpleTopology
集群模式运行:storm jar target/app-storm-1.0-jar-with-dependencies.jar com.haozu.app.SimpleTopology "test"
转自:http://www.cnblogs.com/xiazh/archive/2013/03/04/2827732
相关问答
更多-
请问dance up a storm具体怎么翻译?[2022-06-05]
Hi: 下列还有三个短语相同意思: cook up a storm烹饪上露一手 dance up a storm舞姿翩翩 talk up a storm 侃侃而谈 [MAINLY US INFORMAL:主要用于美国,非正式用语] to do something with a lot of energy and often skill:[以非凡的能力和技能做某事] Rob was in the kitchen cooking up a storm. Rob在厨房里大显身手. (from Cambridge ... -
when the rain,the storm all is up怎么翻译[2022-01-20]
当下雨的时候,风暴都到了 -
Storm是什么文件[2023-06-19]
Storm译为汉语即‘暴风雨’、“暴风雪”,是暴风影音软件的英文名,是一种媒体播放器。 Storm还是一个分布式的、容错的实时计算系统,由BackType开发,广泛用于进行实时日志处理,实时统计、实时风控、实时推荐等场景中,目前最新版本是Storm 0.8.0。 Storm还是外文歌曲的名字,具体可在百度音乐中搜索。 -
好好利用风暴?(Good use of storm?)[2021-12-04]
在风暴谷歌集团找到答案。 似乎DRCP拓扑将发出一个元组,其中包含由DRCP spout作为流接收的参数,然后在处理完成时指示回(使用称为请求ID的唯一标识)。 在同一个线程中,hadoop可能最适合这些情况,除非数据不够大,并且可以一直处理。 Found the answer in the storm google group. Seems that DRCP topologies will emit a tuple with parameters that is received by DRCP spo ... -
回答你的问题有点难,因为Zookeeper,Maven和Nginx都做了很多不同的事情,但我会尽我所能。 Zookeeper是运行Storm的一部分。 它会跟踪通过Storm拓扑运行的当前事务。 Maven是JVM生态系统中常见的构建管理工具。 你需要这个来构建东西。 在某些情况下,Nginx可能会使用Storm拓扑,但对于所有用例来说肯定不是必需的或共同的。 因此,您可能需要项目中的所有三个,您肯定需要Zookeeper,并且几乎肯定会以某种方式与Maven交互,即使您在其他地方使用Ant或Leineng ...
-
关于加缪,是的,启动工作的调度员应该工作。 他们在LinkedIn上使用的是Azkaban,你也可以看一下。 如果在另一个完成之前启动,则会读取一些数据量两次。 由于第二个作业将从第一个作业使用的相同偏移开始读取。 关于加缪与S3,目前我不认为这已经到位。 Kafka actually retains events for a configurable period of time -- events are not purged immediately upon consumption like othe ...
-
好吧,没有办法像我想要的那样快速压缩。 但是我找到了解决办法,如果有人需要,我可以在这里分享。 这个问题不仅与Storm相关,而且是一个更一般的Hadoop问题。 我的所有数据都是使用HdfsBolt写入的: RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|"); //Synchronize data buffer with the filesystem every 1000 tuples ...
-
让我们说你的风暴在/ home / your / download / storm解压缩 我将其定义为env变量 STORM_HOME="/home/your/download/storm" 因此您可以通过以下方式导出此变量 export STORM_HOME 不要忘记导入路径: export PATH=$PATH:$STORM_HOME/bin 然后你可以在其中使用storm命令。 Let us say your storm is unpacked at /home/your/download/storm ...
-
您必须在所有螺栓中ack所有传入的元组,即将collector.ack(input)添加到TransformerBolt.execute(Tuple input) 。 您看到的日志消息是正确的:您的代码调用collector.ack(...)并记录此调用。 拓扑中对ack的调用不是对Spout.ack(...)的调用:每次Spout发出带有消息ID的元组时,此ID都会由拓扑的正在运行的ackers注册。 那些ackers会在Bolt的每个ack上收到一条消息,收集这些消息并在收到元组的所有ack时通知Spo ...
-
关于消息ID是通用的:在内部它可能是一个64位值,但是这个64位值是作为Spout内emit()提供的msgID对象的散列来计算的。 因此,您可以将任何对象作为消息标识(两个对象散列到相同值的概率接近于零)。 关于使用str :我认为在这个例子中, str包含一行(而不是一个单词),并且文档包含两次完全相同的行(如果没有空行可能很多)是不太可能的。 关于计数器作为消息ID:对于你的观察你是绝对正确的 - 如果多个喷嘴并行运行,这会给消息ID冲突并破坏容错。 如果你想“修复”计数器方法,每个计数器应该以不同的 ...