Storm java.io.NotSerializableException:运行拓扑时(Storm java.io.NotSerializableException: when running topology)
我终于认为我有一个写在redis数据库上的拓扑。 我有一个要打印的螺栓,还有一个要插入redis的螺栓。 但是当我尝试启动拓扑时,它会出现以下错误:
...5333 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 5376 [main] INFO b.s.d.supervisor - Starting supervisor with id 1917ef54-0f16-47b8-86ea-b6722aa33c68 at host amnor-A88XPLUS 5405 [main] ERROR o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Thread Thread[main,5,main] died java.lang.RuntimeException: java.io.NotSerializableException: Storm.practice.Storm.Prova.ProvaTopology at backtype.storm.utils.Utils.javaSerialize(Utils.java:91) ~[storm-core-0.10.0.jar:0.10.0] at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:107) ~[storm-core-0.10.0.jar:0.10.0] at Storm.practice.Storm.Prova.ProvaTopology.main(ProvaTopology.java:383) ~[classes/:?] Caused by: java.io.NotSerializableException: Storm.practice.Storm.Prova.ProvaTopology at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[?:1.7.0_91] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_91] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_91] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_91] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_91] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_91] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_91] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_91] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_91] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[?:1.7.0_91] at backtype.storm.utils.Utils.javaSerialize(Utils.java:87) ~[storm-core-0.10.0.jar:0.10.0] ... 2 more
我认为它可能是Spout,但我尝试了一个示例Spout,可以在Storm示例中使用并且发生相同。 我的代码只是为readed名称添加了笑脸,例如(John :) :)),我只是尝试将流存储到redis数据库,它只是一个从文件中读取名称的小测试拓扑。 之后,我正在为我的大学的大数据项目做一个严肃的拓扑。 这是我的代码(有许多未使用的导入,但这是beacuase我尝试了不同的方式写入数据库):
package Storm.practice.Storm.Prova; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.TestWordSpout; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.ITuple; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.topology.base.BaseRichSpout; import java.util.List; import java.util.Map; import java.util.Random; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; import org.apache.storm.redis.bolt.AbstractRedisBolt; import org.apache.storm.redis.bolt.RedisStoreBolt; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.common.config.JedisPoolConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisLookupMapper; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.redis.trident.state.RedisState; import org.apache.storm.redis.trident.state.RedisStateQuerier; import org.apache.storm.redis.trident.state.RedisStateUpdater; import org.apache.storm.shade.com.google.common.collect.Lists; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisCommands; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; //import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.JedisPubSub; import storm.trident.Stream; import storm.trident.TridentState; import storm.trident.TridentTopology; /** * This is a basic example of a Storm topology. */ public class ProvaTopology { public static class ProvaBolt extends BaseRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + " :-)")); _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("Morts")); } } public class ProvaSpout extends BaseRichSpout { SpoutOutputCollector _collector; //Random _rand; private String fileName; //private SpoutOutputCollector _collector; private BufferedReader reader; private AtomicLong linesRead; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; try { fileName= (String)"/home/prova.txt"; reader = new BufferedReader(new FileReader(fileName)); // read and ignore the header if one exists } catch (Exception e) { throw new RuntimeException(e); } // _rand = new Random(); } public void nextTuple() { Utils.sleep(100); try { String line = reader.readLine(); if (line != null) { long id = linesRead.incrementAndGet(); System.out.println("Finished reading line, " + line); _collector.emit(new Values((String)line)); } else { System.out.println("Finished reading file, " + linesRead.get() + " lines read"); Thread.sleep(10000); } } catch (Exception e) { e.printStackTrace(); } } public void ack(Object id) { } public void fail(Object id) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("Morts")); } } public class RedisBolt implements IRichBolt { protected String channel = "Somriures"; // protected String configChannel; protected OutputCollector collector; // protected Tuple currentTuple; // protected Logger log; protected JedisPool pool; // protected ConfigListenerThread configListenerThread; public RedisBolt(){} public RedisBolt(String channel) { // log = Logger.getLogger(getClass().getName()); // setupNonSerializableAttributes(); } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; pool = new JedisPool("localhost"); } public void execute(Tuple tuple) { String current = tuple.getString(0); if(current != null) { // for(Object obj: result) { publish(current); collector.emit(tuple, new Values(current)); // } collector.ack(tuple); } } public void cleanup() { if(pool != null) { pool.destroy(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(channel)); } public void publish(String msg) { Jedis jedis = pool.getResource(); jedis.publish(channel, msg); pool.returnResource(jedis); } protected void setupNonSerializableAttributes() { } public Map getComponentConfiguration() { return null; } } public class PrinterBolt extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { System.out.println(tuple); } public void declareOutputFields(OutputFieldsDeclarer ofd) { } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); ProvaTopology Pt = new ProvaTopology(); JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost("127.0.0.1").setPort(666).build(); builder.setSpout("Morts", Pt.new ProvaSpout(), 10);//emisorTestWordSpout builder.setBolt("happy", new ProvaBolt(), 3).shuffleGrouping("Morts");// de on llig? builder.setBolt("meal", new ProvaBolt(), 2).shuffleGrouping("happy");// de on llig? builder.setBolt("bd", Pt.new RedisBolt(), 2).shuffleGrouping("meal");// de on llig? builder.setBolt("print", Pt.new PrinterBolt(), 2).shuffleGrouping("meal"); // builder.setBolt("StoreM", (storeMapperS)); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(5); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); //WithProgressBar } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); } } }
提前致谢
I finally think that I have a toopology that writes on a redis database. I have a bolt to print, and a bolt to insert to redis. But when I try to launch the topology it comes with this error:
...5333 [main-EventThread] INFO o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED 5376 [main] INFO b.s.d.supervisor - Starting supervisor with id 1917ef54-0f16-47b8-86ea-b6722aa33c68 at host amnor-A88XPLUS 5405 [main] ERROR o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Thread Thread[main,5,main] died java.lang.RuntimeException: java.io.NotSerializableException: Storm.practice.Storm.Prova.ProvaTopology at backtype.storm.utils.Utils.javaSerialize(Utils.java:91) ~[storm-core-0.10.0.jar:0.10.0] at backtype.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:107) ~[storm-core-0.10.0.jar:0.10.0] at Storm.practice.Storm.Prova.ProvaTopology.main(ProvaTopology.java:383) ~[classes/:?] Caused by: java.io.NotSerializableException: Storm.practice.Storm.Prova.ProvaTopology at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[?:1.7.0_91] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_91] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_91] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_91] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_91] at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_91] at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_91] at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_91] at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_91] at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[?:1.7.0_91] at backtype.storm.utils.Utils.javaSerialize(Utils.java:87) ~[storm-core-0.10.0.jar:0.10.0] ... 2 more
I thaught it maybe was the Spout, but I have tried with an example Spout that is available on Storm Examples and happens the same. My code just adds smiling faces to the readed names, for example (John :) :) ), I am just tryng to really store streams to a redis database, it is just a little test topology that reads names from a file. Afterwards, I am doing a serious topology for a big-data project at my university. Here is my code(there are many unused imports, but that is beacuase I tried different ways to write to a databse):
package Storm.practice.Storm.Prova; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.testing.TestWordSpout; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.IRichBolt; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.ITuple; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.topology.base.BaseRichSpout; import java.util.List; import java.util.Map; import java.util.Random; import java.io.BufferedReader; import java.io.FileReader; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Logger; import org.apache.storm.redis.bolt.AbstractRedisBolt; import org.apache.storm.redis.bolt.RedisStoreBolt; import org.apache.storm.redis.common.config.JedisClusterConfig; import org.apache.storm.redis.common.config.JedisPoolConfig; import org.apache.storm.redis.common.mapper.RedisDataTypeDescription; import org.apache.storm.redis.common.mapper.RedisLookupMapper; import org.apache.storm.redis.common.mapper.RedisStoreMapper; import org.apache.storm.redis.trident.state.RedisState; import org.apache.storm.redis.trident.state.RedisStateQuerier; import org.apache.storm.redis.trident.state.RedisStateUpdater; import org.apache.storm.shade.com.google.common.collect.Lists; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisCommands; import redis.clients.jedis.Jedis; import redis.clients.jedis.JedisPool; //import redis.clients.jedis.JedisPoolConfig; import redis.clients.jedis.JedisPubSub; import storm.trident.Stream; import storm.trident.TridentState; import storm.trident.TridentTopology; /** * This is a basic example of a Storm topology. */ public class ProvaTopology { public static class ProvaBolt extends BaseRichBolt { OutputCollector _collector; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { _collector = collector; } public void execute(Tuple tuple) { _collector.emit(tuple, new Values(tuple.getString(0) + " :-)")); _collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("Morts")); } } public class ProvaSpout extends BaseRichSpout { SpoutOutputCollector _collector; //Random _rand; private String fileName; //private SpoutOutputCollector _collector; private BufferedReader reader; private AtomicLong linesRead; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; try { fileName= (String)"/home/prova.txt"; reader = new BufferedReader(new FileReader(fileName)); // read and ignore the header if one exists } catch (Exception e) { throw new RuntimeException(e); } // _rand = new Random(); } public void nextTuple() { Utils.sleep(100); try { String line = reader.readLine(); if (line != null) { long id = linesRead.incrementAndGet(); System.out.println("Finished reading line, " + line); _collector.emit(new Values((String)line)); } else { System.out.println("Finished reading file, " + linesRead.get() + " lines read"); Thread.sleep(10000); } } catch (Exception e) { e.printStackTrace(); } } public void ack(Object id) { } public void fail(Object id) { } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("Morts")); } } public class RedisBolt implements IRichBolt { protected String channel = "Somriures"; // protected String configChannel; protected OutputCollector collector; // protected Tuple currentTuple; // protected Logger log; protected JedisPool pool; // protected ConfigListenerThread configListenerThread; public RedisBolt(){} public RedisBolt(String channel) { // log = Logger.getLogger(getClass().getName()); // setupNonSerializableAttributes(); } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; pool = new JedisPool("localhost"); } public void execute(Tuple tuple) { String current = tuple.getString(0); if(current != null) { // for(Object obj: result) { publish(current); collector.emit(tuple, new Values(current)); // } collector.ack(tuple); } } public void cleanup() { if(pool != null) { pool.destroy(); } } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields(channel)); } public void publish(String msg) { Jedis jedis = pool.getResource(); jedis.publish(channel, msg); pool.returnResource(jedis); } protected void setupNonSerializableAttributes() { } public Map getComponentConfiguration() { return null; } } public class PrinterBolt extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { System.out.println(tuple); } public void declareOutputFields(OutputFieldsDeclarer ofd) { } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); ProvaTopology Pt = new ProvaTopology(); JedisPoolConfig poolConfig = new JedisPoolConfig.Builder() .setHost("127.0.0.1").setPort(666).build(); builder.setSpout("Morts", Pt.new ProvaSpout(), 10);//emisorTestWordSpout builder.setBolt("happy", new ProvaBolt(), 3).shuffleGrouping("Morts");// de on llig? builder.setBolt("meal", new ProvaBolt(), 2).shuffleGrouping("happy");// de on llig? builder.setBolt("bd", Pt.new RedisBolt(), 2).shuffleGrouping("meal");// de on llig? builder.setBolt("print", Pt.new PrinterBolt(), 2).shuffleGrouping("meal"); // builder.setBolt("StoreM", (storeMapperS)); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(5); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); //WithProgressBar } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); } } }
Thanks in advance
原文:https://stackoverflow.com/questions/35070386
最满意答案
有关详细信息,请参阅“ git push --help ”的“非快进”部分。
您可以执行“git pull”,解决潜在的冲突,并“git push”结果。 “git pull”将在提交A和B之间创建一个合并提交C。
或者,您可以使用“git pull -rebase”将您在A和X之间的更改重新移动,并将结果推回。 rebase将创建一个新的提交D,在A之上构建X和B之间的更改。
See the 'non-fast forward' section of 'git push --help' for details.
You can perform "git pull", resolve potential conflicts, and "git push" the result. A "git pull" will create a merge commit C between commits A and B.
Alternatively, you can rebase your change between X and B on top of A, with "git pull --rebase", and push the result back. The rebase will create a new commit D that builds the change between X and B on top of A.
相关问答
更多-
GitHub合并分支“主”(GitHub Merge branch 'master')[2022-03-12]
存储在git中的每个版本(“commit”)都构成了图形的一部分,通过这个图表来考虑你在git中的工作经常是有帮助的。 当UserA开始时,假设只有两个提交被创建,我们称之为P和Q : P--Q (master) 然后他修改FileA,改变并创建代表源代码的新状态的提交的阶段 - 让我们说commit是R 这有一个单亲,这是提交Q : P--Q--R (master) 成功推送后,GitHub存储库的提交图看起来是一样的。 UserB以相同的历史开始: P--Q (master) ...但创建了一个不 ... -
将远程github存储库中的更改合并到本地存储库(Merge changes from remote github repository to your local repository)[2022-06-15]
git remote add {name} {Public Clone URL} git pull {name} master git push 例: git remote add bret git://github.com/bret/watir.git git pull bret master git push git remote add {name} {Public Clone URL} git pull {name} master git push Example: git remote ad ... -
有关详细信息,请参阅“ git push --help ”的“非快进”部分。 您可以执行“git pull”,解决潜在的冲突,并“git push”结果。 “git pull”将在提交A和B之间创建一个合并提交C。 或者,您可以使用“git pull -rebase”将您在A和X之间的更改重新移动,并将结果推回。 rebase将创建一个新的提交D,在A之上构建X和B之间的更改。 See the 'non-fast forward' section of 'git push --help' for detai ...
-
如何在github中合并代码?(How to merge code in github?)[2022-09-29]
如果你在同一个分支上工作,你只需要从Github中提取代码。 如果使用不同的分支,则需要合并更改。 您可以使用Webstorm在本地执行此操作。 这是一个截图,在webstorm下找到git命令。 If you work on the same branch you simply have to pull the code from Github. If you use different branches, you will need to merge your changes. You can do i ... -
如何在github中将分支合并(或重新绑定)到其子分支上?(How to merge (or rebase) a branch onto its subbranch in github?)[2022-04-01]
如果你想将newversion分支提交合并到hotfix分支,那么你需要按照以下步骤来做... git checkout hotfix git merge newversion 第一个命令将激活hotfix分支,第二个命令将合并从newversion到hotfix分支的所有最新提交。 :-) if you want to merge newversion branch commits into hotfix branch, then you need following steps to do... g ... -
1. First you need to commit and push the local changes into the GitHub repository. 2. Need to send a pull request which is similar to the selecting the source and target branches. 您也可以参考https://help.github.com/articles/creating-a-pull-request 。 然后另一方可以查看并 ...
-
你可以尝试在远程主分支上重新定义你的本地分支并推送它: git pull --rebase hacker master 你也可以尝试与git merge --squash hacker/master You can try to rebase your local branch on remote master branch and push it : git pull --rebase hacker master You also can try to do a squash merge with git ...
-
Git:合并远程更改(Git: merge remote changes)[2022-06-13]
你可以使用rebase( git rebase ): git checkout branch git fetch origin git rebase origin/branch 您将在更新的远程跟踪分支origin/branch之上重放您的提交(将branch替换为branch的实际名称) 更短: git pull --rebase origin branch 这是有效的,因为您尚未推送“ branch ”,并允许您在本地解决任何合并问题。 You can use a rebase (git rebas ... -
在github上“谨慎合并”(“Merge with caution” on github)[2023-09-27]
该项目可能已配置为与在线持续集成服务链接,该服务在拉取请求上执行质量检查(单元测试通过等)。 所以消息可能意味着:没有冲突,你可以合并,但结果将是一个测试失败的项目。 如果你点击链接(或者告诉我们链接指向哪里),你可能会发现更多。 The project has probably been configured to be linked with an online continuous integration service which performs quality checks (are the u ... -
将本地存储库与Github存储库合并,丢弃远程文件(Merge local repository with Github repository discarding remote files)[2023-10-07]
你可以用我们的合并策略来拉动 git pull --strategy=ours origin master 来自kernel.org 这可以解析任意数量的头,但合并结果树始终是当前分支头的树,有效地忽略了来自所有其他分支的所有更改。 You could pull with the ours merge strategy git pull --strategy=ours origin master From kernel.org This resolves any number of heads, but ...