聊聊storm的GraphiteStormReporter

作者 : 开心源码 本文共4119个字,预计阅读时间需要11分钟 发布时间: 2022-05-14 共136人阅读

本文主要研究一下storm的GraphiteStormReporter

GraphiteStormReporter

storm-core-1.2.2-sources.jar!/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java

public class GraphiteStormReporter extends ScheduledStormReporter {
private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class);
public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with";
public static final String GRAPHITE_HOST = "graphite.host";
public static final String GRAPHITE_PORT = "graphite.port";
public static final String GRAPHITE_TRANSPORT = "graphite.transport";
@Override
public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) {
LOG.debug("Preparing...");
GraphiteReporter.Builder builder = GraphiteReporter.forRegistry(metricsRegistry);
TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf);
if (durationUnit != null) {
builder.convertDurationsTo(durationUnit);
}
TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf);
if (rateUnit != null) {
builder.convertRatesTo(rateUnit);
}
StormMetricsFilter filter = getMetricsFilter(reporterConf);
if(filter != null){
builder.filter(filter);
}
String prefix = getMetricsPrefixedWith(reporterConf);
if (prefix != null) {
builder.prefixedWith(prefix);
}
//defaults to 10
reportingPeriod = getReportPeriod(reporterConf);
//defaults to seconds
reportingPeriodUnit = getReportPeriodUnit(reporterConf);
// Not exposed:
// * withClock(Clock)
String host = getMetricsTargetHost(reporterConf);
Integer port = getMetricsTargetPort(reporterConf);
String transport = getMetricsTargetTransport(reporterConf);
GraphiteSender sender = null;
if (transport.equalsIgnoreCase("udp")) {
sender = new GraphiteUDP(host, port);
} else {
sender = new Graphite(host, port);
}
reporter = builder.build(sender);
}
private static String getMetricsPrefixedWith(Map reporterConf) {
return Utils.getString(reporterConf.get(GRAPHITE_PREFIXED_WITH), null);
}
private static String getMetricsTargetHost(Map reporterConf) {
return Utils.getString(reporterConf.get(GRAPHITE_HOST), null);
}
private static Integer getMetricsTargetPort(Map reporterConf) {
return Utils.getInt(reporterConf.get(GRAPHITE_PORT), null);
}
private static String getMetricsTargetTransport(Map reporterConf) {
return Utils.getString(reporterConf.get(GRAPHITE_TRANSPORT), "tcp");
}
}
  • 继承了ScheduledStormReporter,实现prepare方法
  • prepare方法根据配置文件创立com.codahale.metrics.graphite.GraphiteSender,而后创立com.codahale.metrics.graphite.GraphiteReporter

ScheduledStormReporter

storm-core-1.2.2-sources.jar!/org/apache/storm/metrics2/reporters/ScheduledStormReporter.java

public abstract class ScheduledStormReporter implements StormReporter{
private static final Logger LOG = LoggerFactory.getLogger(ScheduledStormReporter.class);
protected ScheduledReporter reporter;
protected long reportingPeriod;
protected TimeUnit reportingPeriodUnit;
@Override
public void start() {
if (reporter != null) {
LOG.debug("Starting...");
reporter.start(reportingPeriod, reportingPeriodUnit);
} else {
throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName());
}
}
@Override
public void stop() {
if (reporter != null) {
LOG.debug("Stopping...");
reporter.stop();
} else {
throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName());
}
}
public static TimeUnit getReportPeriodUnit(Map reporterConf) {
TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS);
return unit == null ? TimeUnit.SECONDS : unit;
}
private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) {
String rateUnitString = Utils.getString(reporterConf.get(configName), null);
if (rateUnitString != null) {
return TimeUnit.valueOf(rateUnitString);
}
return null;
}
public static long getReportPeriod(Map reporterConf) {
return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue();
}
public static StormMetricsFilter getMetricsFilter(Map reporterConf){
StormMetricsFilter filter = null;
Map filterConf = (Map)reporterConf.get("filter");
if(filterConf != null) {
String clazz = (String) filterConf.get("class");
if (clazz != null) {
filter = Utils.newInstance(clazz);
filter.prepare(filterConf);
}
}
return filter;
}
}
  • ScheduledStormReporter封装了对reporter的生命周期的控制,启动时调用start,关闭时调用stop

小结

  • storm从1.2版本开始启用了新的metrics,即metrics2,新版的metrics基于Dropwizard Metrics
  • 默认提供了Console Reporter、CSV Reporter、Ganglia Reporter 、Graphite Reporter、JMX Reporter

doc

  • New Metrics Reporting API
  • ubuntu-graphite-grafana
说明
1. 本站所有资源来源于用户上传和网络,如有侵权请邮件联系站长!
2. 分享目的仅供大家学习和交流,您必须在下载后24小时内删除!
3. 不得使用于非法商业用途,不得违反国家法律。否则后果自负!
4. 本站提供的源码、模板、插件等等其他资源,都不包含技术服务请大家谅解!
5. 如有链接无法下载、失效或广告,请联系管理员处理!
6. 本站资源售价只是摆设,本站源码仅提供给会员学习使用!
7. 如遇到加密压缩包,请使用360解压,如遇到无法解压的请联系管理员
开心源码网 » 聊聊storm的GraphiteStormReporter

发表回复