知识点
相关文章
更多最近更新
更多顶 Storm【实践系列-如何写一个爬虫- Metric 系列】1
2019-03-02 23:57|来源: 网路
package com.digitalpebble.storm.crawler; import backtype.storm.Config; import backtype.storm.metric.MetricsConsumerBolt; import backtype.storm.metric.api.IMetricsConsumer; import backtype.storm.task.IErrorReporter; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.tuple.Tuple; import backtype.storm.utils.Utils; import com.google.common.base.Joiner; import com.google.common.base.Supplier; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSortedMap; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectWriter; import org.mortbay.jetty.Server; import org.mortbay.jetty.servlet.Context; import org.mortbay.jetty.servlet.ServletHolder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.servlet.ServletException; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicLong; /** * @author Enno Shioji (enno.shioji@peerindex.com) */ public class DebugMetricConsumer implements IMetricsConsumer { private static final Logger log = LoggerFactory .getLogger(DebugMetricConsumer.class); private IErrorReporter errorReporter; private Server server; // Make visible to servlet threads private volatile TopologyContext context; private volatile ConcurrentMap<String, Number> metrics; private volatile ConcurrentMap<String, Map<String, Object>> metrics_metadata; public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { this.context = context; this.errorReporter = errorReporter; this.metrics = new ConcurrentHashMap<String, Number>(); this.metrics_metadata = new ConcurrentHashMap<String, Map<String, Object>>(); try { // TODO Config file not tested final String PORT_CONFIG_STRING = "topology.metrics.consumers.debug.servlet.port"; Integer port = (Integer) stormConf.get(PORT_CONFIG_STRING); if (port == null) { log.warn("Metrics debug servlet's port not specified, defaulting to 7070. You can specify it via " + PORT_CONFIG_STRING + " in storm.yaml"); port = 7070; } server = startServlet(port); } catch (Exception e) { log.error("Failed to start metrics server", e); throw new AssertionError(e); } } private static final Joiner ON_COLONS = Joiner.on("::"); public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) { // In order String componentId = taskInfo.srcComponentId; Integer taskId = taskInfo.srcTaskId; Integer updateInterval = taskInfo.updateIntervalSecs; Long timestamp = taskInfo.timestamp; for (DataPoint point : dataPoints) { String metric_name = point.name; try { Map<String, Number> metric = (Map<String, Number>) point.value; for (Map.Entry<String, Number> entry : metric.entrySet()) { String metricId = ON_COLONS.join(componentId, taskId, metric_name, entry.getKey()); Number val = entry.getValue(); metrics.put(metricId, val); metrics_metadata.put(metricId, ImmutableMap .<String, Object> of("updateInterval", updateInterval, "lastreported", timestamp)); } } catch (RuntimeException e) { // One can easily send something else than a Map<String,Number> // down the __metrics stream and make this part break. // If you ask me either the message should carry type // information or there should be different stream per message // type // This is one of the reasons why I want to write a further // abstraction on this facility errorReporter.reportError(e); metrics_metadata .putIfAbsent("ERROR_METRIC_CONSUMER_" + e.getClass().getSimpleName(), ImmutableMap .of("offending_message_sample", point.value)); } } } private static final ObjectMapper OM = new ObjectMapper(); private Server startServlet(int serverPort) throws Exception { // Setup HTTP server Server server = new Server(serverPort); Context root = new Context(server, "/"); server.start(); HttpServlet servlet = new HttpServlet() { @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { SortedMap<String, Number> metrics = ImmutableSortedMap .copyOf(DebugMetricConsumer.this.metrics); SortedMap<String, Map<String, Object>> metrics_metadata = ImmutableSortedMap .copyOf(DebugMetricConsumer.this.metrics_metadata); Map<String, Object> toplevel = ImmutableMap .of("retrieved", new Date(), // TODO this call fails with mysterious // exception // "java.lang.IllegalArgumentException: Could not find component common for __metrics" // Mailing list suggests it's a library version // issue but couldn't find anything suspicious // Need to eventually investigate // "sources", // context.getThisSources().toString(), "metrics", metrics, "metric_metadata", metrics_metadata); ObjectWriter prettyPrinter = OM .writerWithDefaultPrettyPrinter(); prettyPrinter.writeValue(resp.getWriter(), toplevel); } }; root.addServlet(new ServletHolder(servlet), "/metrics"); log.info("Started metric server..."); return server; } public void cleanup() { try { server.stop(); } catch (Exception e) { throw new AssertionError(e); } } }
前提说明:
storm从0.9.0开始,增加了指标统计框架,用来收集应用程序的特定指标,并将其输出到外部系统。
一般来说,您只需要去实现 LoggingMetricsConsumer,统计将指标值输出到metric.log日志文件之中。
当然,您也可以自定义一个监听的类:只需要去实现IMetricsConsumer接口就可以了。这些类可以在代码里注册(registerMetricsConsumer),也可以在 storm.yaml配置文件中注册:
转自:http://my.oschina.net/u/1791874/blog/305197
相关问答
更多-
谁有关于Drwamweaver的系列教程么?[2022-06-17]
www.78soft.com -
电脑基础.实例.上机系列教程[2022-06-23]
针对这门课程的附带,当然是教程盘啦 -
谁有这个系列的全部Breaking教程[2022-01-20]
一个不错的教学篇: http://cook.163.cn/163cn_consume/sports/page/dance/show/3.asx 跳转+风车+直升机+托马斯教程: http://szhiphop.com/guide_temp/toprock+uprock+footwork.wmv 韩国的BREAKING教学: http://211.239.154.86/data/break2.wmv http://www.gzyp.cn/consume/sports/page/dance/show/1.rm ... -
矩阵必须是正定的 ,这与具有正的输入不一样。 在对称矩阵的情况下,这意味着所有的特征值都必须是正的。 但在你的情况下矩阵只有1级。如果你计算特征值,你可以看到这个特征值,两个特征值是零: eig(covu) ans = -0.0000 0.0000 27.0000 The matrix must be positive definite which is not the same as having positive entries. In the case of a symmet ...
-
联想 有什么系列?[2023-10-07]
笔记本:IdeaPad系列 3000系列 典藏系列 IdeapadV系列 IdeapadU系列 ThinkPad系列 昭阳系列 加固笔记本系列 一体机:IdeaCentre A6系列 Lenovo C3系列 Lenovo C1系列 台式机:IdeaCentre A6系列 Lenovo C3系列 Lenovo C1系列 扬天系列 ThinkCentre 系列 启天系列 ThinkStation工作站 -
Grafana - 通过公制选择更新了仪表板上的所有图表(Grafana - Updated all graphs on dashboard from metric selection)[2022-10-18]
是! 您可以在grafana中使用标签值进行模板查询。 文档在这里: http : //docs.grafana.org/features/datasources/prometheus/#templated-queries 我们做类似的事情,在仪表板中选择一个命名空间,以查看该命名空间中服务的内存和CPU消耗。 我们的仪表板如下所示: 查询看起来像: namespace_name:container_cpu_usage_seconds_total:sum_rate{namespace=\"$namespac ... -
flex-wrap: wrap-reverse只flex-wrap: wrap-reverse包裹横向轴上的内容。 您还需要添加: flex-direction: row-reverse以flex-direction: row-reverse布局沿主轴的内容。 当项目数不是四的倍数时,这仍然会给您带来问题。 要解决这个问题,我们可以使用nth-last-child和first-child css选择器,根据项目数量为最后一个元素添加一个边距。 :nth-last-child(4n+3):first-ch ...
-
如果仔细观察这一行: h = (document.getElementById("number").value * 453.592; 有一个不必要的( 。在你有这个代码的所有地方摆脱它,你的代码将起作用。 工作代码片段: function calc() { var h; var ee = isNaN(document.getElementById("number")).value; if (ee == "true") { document.writel ...
-
我认为你需要与使用index s创建的DataFrame merge left连接: df = pd.DataFrame({'prod_id':s.index}).merge(df, how='left') print (df) prod_id text rating 0 AD777 some text 2 1 AD777 some text 5 2 AD777 some text 5 3 AD777 some text ...
-
写请求指标(Write Request metric)[2023-12-05]
第一张图(20分钟)使用平均值。 1h图表将有3行 - 每个样本的最小值,平均值和每个样本的最大值。 您可能会看到的是某些东西(可能是opscenter本身)正在进行大量写入,大约700秒/秒几秒钟,而在20分钟图形上它被平均掉,但是使用最小/最大行,你会看到异常值。 The first graph (20 minute) uses an average. The 1h graph will have 3 lines - min per sample, average, and max per sample ...