python的apache-storm混合拓扑 - ModuleNotFoundError:没有名为'storm'的模块(apache-storm mixed topology with python - ModuleNotFoundError: No module named 'storm')
我正在尝试创建一个混合风暴拓扑结构,它使用基于Java的spout和基于python的螺栓。
对于基于python的螺栓,我写了一个Java包装器:
class PythonBolt extends ShellBolt implements IRichBolt { public PythonBolt() { super("python", "C:\\somepath\\sample.py"); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
这就是我的sample.py的样子:
导入风暴
class SplitSentenceBolt(storm.BasicBolt): def process(self, tup): print("Python rocks!") words = tup.values[0].split(" ") print(tup.values[0]) SplitSentenceBolt().run()
然后我把它放在一起,并尝试通过以下代码片段运行:
public class SampleBolt { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("Hello", new RawDataLevelSpout(), 12); builder.setBolt("World", new PythonBolt(), 12); Config config = new Config(); config.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Hello-World-BaiJian", config, builder.createTopology()); Utils.sleep(100000); cluster.killTopology("Hello-World-BaiJian"); cluster.shutdown(); } }
这一切都正确启动,但是,我得到以下例外:
import storm ModuleNotFoundError: No module named 'storm' at org.apache.storm.utils.ShellProcess.launch(ShellProcess.java:94) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.task.ShellBolt.prepare(ShellBolt.java:154) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.executor.bolt.BoltExecutor.init(BoltExecutor.java:84) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.executor.bolt.BoltExecutor.call(BoltExecutor.java:93) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.executor.bolt.BoltExecutor.call(BoltExecutor.java:45) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.utils.Utils$2.run(Utils.java:329) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
有关如何克服这一点的任何提示? 我如何安装python storm软件包? 是否可以通过Anaconda安装它(我找不到包)?
I'm trying to create a mixed storm topology, which is using Java based spout and python based bolt.
For a python based bolt, I wrote a Java wrapper:
class PythonBolt extends ShellBolt implements IRichBolt { public PythonBolt() { super("python", "C:\\somepath\\sample.py"); } @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
This is how my sample.py looks like:
import storm
class SplitSentenceBolt(storm.BasicBolt): def process(self, tup): print("Python rocks!") words = tup.values[0].split(" ") print(tup.values[0]) SplitSentenceBolt().run()
Then I put it all together and try to run via the following snippet:
public class SampleBolt { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("Hello", new RawDataLevelSpout(), 12); builder.setBolt("World", new PythonBolt(), 12); Config config = new Config(); config.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("Hello-World-BaiJian", config, builder.createTopology()); Utils.sleep(100000); cluster.killTopology("Hello-World-BaiJian"); cluster.shutdown(); } }
It all boots-up correctly, however, I get the following exception:
import storm ModuleNotFoundError: No module named 'storm' at org.apache.storm.utils.ShellProcess.launch(ShellProcess.java:94) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.task.ShellBolt.prepare(ShellBolt.java:154) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.executor.bolt.BoltExecutor.init(BoltExecutor.java:84) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.executor.bolt.BoltExecutor.call(BoltExecutor.java:93) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.executor.bolt.BoltExecutor.call(BoltExecutor.java:45) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT] at org.apache.storm.utils.Utils$2.run(Utils.java:329) ~[storm-client-2.0.0-SNAPSHOT.jar:2.0.0-SNAPSHOT]
Any hints on how to overcome this? How do I install that python storm package? Is it possible to install it through Anaconda (I failed to find the package)?
原文:https://stackoverflow.com/questions/49098352
最满意答案
你必须使用触发器:后插入触发器这样的东西(语法取决于使用的dbms):
CREATE or REPLACE TRIGGER trg_transaction AFTER INSERT ON ***transaction*** DECLARE -- variable declarations BEGIN -- trigger code UPDATE ***bankaccount*** SET .... END;
希望对你有帮助!!!
you must use triggers: an after insert trigger Something like this (syntax depends on the used dbms):
CREATE or REPLACE TRIGGER trg_transaction AFTER INSERT ON ***transaction*** DECLARE -- variable declarations BEGIN -- trigger code UPDATE ***bankaccount*** SET .... END;
Hope this will help you!!!
相关问答
更多-
select中每行的sql生成多个插入到另一个表(sql for each row in a select generate multiple inserts to another table)[2023-07-13]
您可以使用unpivot执行此操作: -- insert into TableB (tableAId, key, value) select id, [key], value from ( select id, [key], value from TableA unpivot ( [Value] for [Key] in ([Value1],[Value2],[Value3]) ) as unpvt ) derived 这个列的名称是硬编码的,但如果需要的 ... -
如果使用SELECT.. INTO..这样,列长度由第一个插入的每列的最大长度决定。 因此,首先插入'BK问题',创建长度为'BK问题'的Test列,或10。 (这样看: PRINT LEN('BK Problem') ) 然后将“CCA问题”插入Test失败,因为它的长度为11并且太长。 我不明白您的案例陈述的必要性,试试这个: SELECT #Pr.*, CAST('BK Problem' as VARCHAR(11)) Test INTO #PP FROM #Pr WHERE TypeID = 't' ...
-
表插入期间锁定表...网站不可用(sql server 2008)(Table locked during inserts… website unusable (sql server 2008))[2022-10-27]
特别是对于SQL Server,您可能需要更改数据库以启用READ_COMMITTED_SHAPSHOT 。 这基本上可以防止您的选择查询在数据上存在未决的插入/更新时被锁定。 Especially for SQL Server you might want to change your database to turn READ_COMMITTED_SHAPSHOT on. This basically prevents your select queries from locking up if the ... -
insert语句实际上有一个用于执行此操作的语法。 如果您指定列名称而不是选择“*”,这很容易: INSERT INTO new_table (Foo, Bar, Fizz, Buzz) SELECT Foo, Bar, Fizz, Buzz FROM initial_table -- optionally WHERE ... 我最好澄清一点,因为某些原因,这个帖子得到了一些投票。 INSERT INTO ... SELECT FROM语法是指当您插入的表(上面的示例中的“new_table”)已经存在。 ...
-
您可以在批量插入之前停止创建索引,并在插入后启用它们 -- Disable Index ALTER INDEX [IXYourIndex] ON YourTable DISABLE GO -- Insert Data -- Enable Index ALTER INDEX [IXYourIndex] ON YourTable REBUILD GO Given that the ultimate goal of storing results table variables was to perfo ...
-
你必须使用触发器:后插入触发器这样的东西(语法取决于使用的dbms): CREATE or REPLACE TRIGGER trg_transaction AFTER INSERT ON ***transaction*** DECLARE -- variable declarations BEGIN -- trigger code UPDATE ***bankaccount*** SET .... END; 希望对你有帮助!!! you must use triggers: ...
-
并发SQL插入到同一个表中的事务安全吗?(Are Concurrent SQL inserts into the same table transactionally safe?)[2023-05-03]
从MyISAM存储引擎的MySQL手册: "MyISAM supports concurrent inserts..." From the MySQL manual for the MyISAM storage engine: "MyISAM supports concurrent inserts..." -
您在电子邮件地址上没有唯一的密钥。 这是你最好的希望,因为许多John Doe都可以加入系统。 因此,全名的唯一复合索引并不是一个好主意。 可以发出一个INSERT IGNORE,它会在重复违规之后发生变化,就像没有任何反应一样(这是非常不推荐的,并且在您的情况下不可能,因为您对电子邮件的唯一约束不存在)。 这样做的主要问题是你的倾向(读:可能会)失去对正在发生的事情流的所有控制,并留下不需要的数据。 所以不要使用它,除非你是专业人士。 可以发出并插入也需要唯一密钥的DUPLICATE KEY UPDATE ...
-
尝试使用SqlDependency。 使用SqlDependency检测更改 我在很多项目中都使用过它们,它们看起来效果很好。 这使用QueryNotifications,它是SQL Server Service Broker的一部分。 如果Broker尚未激活,则必须启用它。 让DBA运行如下所示的内容: USE master; GO ALTER DATABASE MyDatabase SET ENABLE_BROKER; GO 文章的示例代码: void Initialization() { ...
-
将记录设置为简单。 将日志大小设置得足够高以处理插入。 系统上有其他人吗? 一个tablock将帮助插入 - TargetTable with(tablock)。 如果您在TargetTable上有聚簇索引,则在select中使用该方式的数据。 如果你可以用(nolock)接受脏读SourceTable。 如果要插入超过100,000条记录,则可能需要使用where来拆分插入。 Set logging as simple. Set the log size high enough to handle the ...