知识点
相关文章
更多最近更新
更多Storm【实践系列-如何写一个爬虫- Metric 系列】 - 2 Librato
2019-03-02 23:57|来源: 网路
本章主题: 辐射性质介绍一个Librato的Metric度量的实现
package com.digitalpebble.storm.crawler; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.StringUtils; import org.codehaus.jackson.map.ObjectMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import backtype.storm.metric.api.IMetricsConsumer; import backtype.storm.task.IErrorReporter; import backtype.storm.task.TopologyContext; import com.librato.metrics.HttpPoster; import com.librato.metrics.HttpPoster.Response; import com.librato.metrics.LibratoBatch; import com.librato.metrics.NingHttpPoster; import com.librato.metrics.Sanitizer; import com.librato.metrics.Versions; /** Sends the metrics to Librato **/ public class LibratoMetricsConsumer implements IMetricsConsumer { public static final int DEFAULT_BATCH_SIZE = 500; private static final Logger LOG = LoggerFactory .getLogger(LibratoMetricsConsumer.class); private static final String LIB_VERSION = Versions.getVersion( "META-INF/maven/com.librato.metrics/librato-java/pom.properties", LibratoBatch.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); private final Sanitizer sanitizer = new Sanitizer() { public String apply(String name) { return Sanitizer.LAST_PASS.apply(name); } }; private int postBatchSize = DEFAULT_BATCH_SIZE; private long timeout = 30; private final TimeUnit timeoutUnit = TimeUnit.SECONDS; private String userAgent = null; private HttpPoster httpPoster; private Set<String> metricsToKeep = new HashSet<String>(); public void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter) { // TODO configure timeouts // this.timeout = timeout; // this.timeoutUnit = timeoutUnit; // this.postBatchSize = postBatchSize; String agentIdentifier = (String) stormConf.get("librato.agent"); if (agentIdentifier == null) agentIdentifier = "storm"; String token = (String) stormConf.get("librato.token"); String username = (String) stormConf.get("librato.username"); String apiUrl = (String) stormConf.get("librato.api.url"); if (apiUrl == null) apiUrl = "https://metrics-api.librato.com/v1/metrics"; // check that the values are not null if (StringUtils.isBlank(token)) throw new RuntimeException("librato.token not set"); if (StringUtils.isBlank(username)) throw new RuntimeException("librato.username not set"); this.userAgent = String.format("%s librato-java/%s", agentIdentifier, LIB_VERSION); this.httpPoster = NingHttpPoster.newPoster(username, token, apiUrl); // get the list of metrics names to keep if any String metrics2keep = (String) stormConf.get("librato.metrics.to.keep"); if (metrics2keep != null) { String[] mets = metrics2keep.split(","); for (String m : mets) metricsToKeep.add(m.trim().toLowerCase()); } } // post(String source, long epoch) public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints) { final Map<String, Object> payloadMap = new HashMap<String, Object>(); payloadMap.put("source", taskInfo.srcComponentId + "_" + taskInfo.srcWorkerHost + "_" + taskInfo.srcTaskId); payloadMap.put("measure_time", taskInfo.timestamp); final List<Map<String, Object>> gaugeData = new ArrayList<Map<String, Object>>(); final List<Map<String, Object>> counterData = new ArrayList<Map<String, Object>>(); int counter = 0; final Iterator<DataPoint> datapointsIterator = dataPoints.iterator(); while (datapointsIterator.hasNext()) { final DataPoint dataPoint = datapointsIterator.next(); // ignore datapoint with a value which is not a map if (!(dataPoint.value instanceof Map)) continue; // a counter or a gauge // convention if its name contains '_counter' // then treat it as a counter boolean isCounter = false; if (dataPoint.name.contains("_counter")) { isCounter = true; dataPoint.name = dataPoint.name.replaceFirst("_counter", ""); } if (!metricsToKeep.isEmpty()) { if (!metricsToKeep.contains(dataPoint.name.toLowerCase())) { continue; } } try { Map<String, Number> metric = (Map<String, Number>) dataPoint.value; for (Map.Entry<String, Number> entry : metric.entrySet()) { String metricId = entry.getKey(); Number val = entry.getValue(); final Map<String, Object> data = new HashMap<String, Object>(); data.put("name", sanitizer.apply(dataPoint.name + "_" + metricId)); data.put("value", val); if (isCounter) counterData.add(data); else // use as gauge gaugeData.add(data); counter++; if (counter % postBatchSize == 0 || (!datapointsIterator.hasNext() && (!counterData .isEmpty() || !gaugeData.isEmpty()))) { final String countersKey = "counters"; final String gaugesKey = "gauges"; payloadMap.put(countersKey, counterData); payloadMap.put(gaugesKey, gaugeData); postPortion(payloadMap); payloadMap.remove(gaugesKey); payloadMap.remove(countersKey); gaugeData.clear(); counterData.clear(); } } } catch (RuntimeException e) { LOG.error(e.getMessage()); } } LOG.debug("Posted {} measurements", counter); } public void cleanup() { } private void postPortion(Map<String, Object> chunk) { try { final String payload = OBJECT_MAPPER.writeValueAsString(chunk); final Future<Response> future = httpPoster.post(userAgent, payload); final Response response = future.get(timeout, timeoutUnit); final int statusCode = response.getStatusCode(); if (statusCode < 200 || statusCode >= 300) { LOG.error( "Received an error from Librato API. Code : {}, Message: {}", statusCode, response.getBody()); } } catch (Exception e) { LOG.error("Unable to post to Librato API", e); } } }
Metirc代码带有一点介绍的性质。
转自:http://my.oschina.net/u/1791874/blog/305221
相关问答
更多-
谁有关于Drwamweaver的系列教程么?[2022-06-17]
www.78soft.com -
电脑基础.实例.上机系列教程[2022-06-23]
针对这门课程的附带,当然是教程盘啦 -
谁有这个系列的全部Breaking教程[2022-01-20]
一个不错的教学篇: http://cook.163.cn/163cn_consume/sports/page/dance/show/3.asx 跳转+风车+直升机+托马斯教程: http://szhiphop.com/guide_temp/toprock+uprock+footwork.wmv 韩国的BREAKING教学: http://211.239.154.86/data/break2.wmv http://www.gzyp.cn/consume/sports/page/dance/show/1.rm ... -
head first 系列电子书的下载[2022-06-05]
我加你qq吧~ 已加~~~ 已经将下载地址 ftp://csdn:csdn@61.132.59.166/计算机类/shubulo/Head.First设计模式_PDF_shubulo.com.rar 给您了!是pdf版本的! 补充:要装pdf浏览器~~~呵呵~ 见笑~~~ -
矩阵必须是正定的 ,这与具有正的输入不一样。 在对称矩阵的情况下,这意味着所有的特征值都必须是正的。 但在你的情况下矩阵只有1级。如果你计算特征值,你可以看到这个特征值,两个特征值是零: eig(covu) ans = -0.0000 0.0000 27.0000 The matrix must be positive definite which is not the same as having positive entries. In the case of a symmet ...
-
联想 有什么系列?[2023-10-07]
笔记本:IdeaPad系列 3000系列 典藏系列 IdeapadV系列 IdeapadU系列 ThinkPad系列 昭阳系列 加固笔记本系列 一体机:IdeaCentre A6系列 Lenovo C3系列 Lenovo C1系列 台式机:IdeaCentre A6系列 Lenovo C3系列 Lenovo C1系列 扬天系列 ThinkCentre 系列 启天系列 ThinkStation工作站 -
Grafana - 通过公制选择更新了仪表板上的所有图表(Grafana - Updated all graphs on dashboard from metric selection)[2022-10-18]
是! 您可以在grafana中使用标签值进行模板查询。 文档在这里: http : //docs.grafana.org/features/datasources/prometheus/#templated-queries 我们做类似的事情,在仪表板中选择一个命名空间,以查看该命名空间中服务的内存和CPU消耗。 我们的仪表板如下所示: 查询看起来像: namespace_name:container_cpu_usage_seconds_total:sum_rate{namespace=\"$namespac ... -
如果仔细观察这一行: h = (document.getElementById("number").value * 453.592; 有一个不必要的( 。在你有这个代码的所有地方摆脱它,你的代码将起作用。 工作代码片段: function calc() { var h; var ee = isNaN(document.getElementById("number")).value; if (ee == "true") { document.writel ...
-
系列算法(Algorithm for series)[2022-05-10]
将这些字符串视为基数为26的数字, A=0 。 这不是一个精确的翻译,因为在实际基数26 A=AA=AAA=0 ,所以你必须根据需要做一些调整。 这是一个Java实现: static String convert(int n) { int digits = 1; for (int j = 26; j <= n; j *= 26) { digits++; n -= j; } String s = ""; for (; digits --> ... -
写请求指标(Write Request metric)[2023-12-05]
第一张图(20分钟)使用平均值。 1h图表将有3行 - 每个样本的最小值,平均值和每个样本的最大值。 您可能会看到的是某些东西(可能是opscenter本身)正在进行大量写入,大约700秒/秒几秒钟,而在20分钟图形上它被平均掉,但是使用最小/最大行,你会看到异常值。 The first graph (20 minute) uses an average. The 1h graph will have 3 lines - min per sample, average, and max per sample ...