Skip to content

Commit 1362c0b

Browse files
committed
STORM-1723 Introduce ClusterMetricsConsumer
* ClusterMetricsConsumer publishes cluster-side related metrics into consumers * like MetricsConsumer for topology metrics * Users can implement IClusterMetricsConsumer and configure to cluster conf. file to take effect * Please refer conf/storm.yaml.example for more details on configuring * Nimbus should be launched with additional jars which are needed for IClusterMetricsConsumer * Also did some refactor to nimbus.clj
1 parent a9ef86d commit 1362c0b

11 files changed

+449
-65
lines changed

conf/defaults.yaml

+3
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,9 @@ pacemaker.kerberos.users: []
287287
storm.daemon.metrics.reporter.plugins:
288288
- "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter"
289289

290+
# configuration of cluster metrics consumer
291+
storm.cluster.metrics.consumer.publish.interval.secs: 60
292+
290293
storm.resource.isolation.plugin: "org.apache.storm.container.cgroup.CgroupManager"
291294
# Also determines whether the unit tests for cgroup runs.
292295
# If storm.resource.isolation.plugin.enable is set to false the unit tests for cgroups will not run

conf/storm.yaml.example

+9
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,12 @@
4646
# parallelism.hint: 1
4747
# argument:
4848
# - endpoint: "metrics-collector.mycompany.org"
49+
50+
## Cluster Metrics Consumers
51+
# storm.cluster.metrics.consumer.register:
52+
# - class: "org.apache.storm.metric.LoggingClusterMetricsConsumer"
53+
# - class: "org.mycompany.MyMetricsConsumer"
54+
# argument:
55+
# - endpoint: "metrics-collector.mycompany.org"
56+
#
57+
# storm.cluster.metrics.consumer.publish.interval.secs: 60

log4j2/cluster.xml

+14
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,17 @@
5454
</Policies>
5555
<DefaultRolloverStrategy max="9"/>
5656
</RollingFile>
57+
<RollingFile name="METRICS"
58+
fileName="${sys:storm.log.dir}/${sys:logfile.name}.metrics"
59+
filePattern="${sys:storm.log.dir}/${sys:logfile.name}.metrics.%i.gz">
60+
<PatternLayout>
61+
<pattern>${patternMetrics}</pattern>
62+
</PatternLayout>
63+
<Policies>
64+
<SizeBasedTriggeringPolicy size="2 MB"/>
65+
</Policies>
66+
<DefaultRolloverStrategy max="9"/>
67+
</RollingFile>
5768
<Syslog name="syslog" format="RFC5424" charset="UTF-8" host="localhost" port="514"
5869
protocol="UDP" appName="[${sys:daemon.name}]" mdcId="mdc" includeMDC="true"
5970
facility="LOCAL5" enterpriseNumber="18060" newLine="true" exceptionPattern="%rEx{full}"
@@ -69,6 +80,9 @@
6980
<AppenderRef ref="THRIFT-ACCESS"/>
7081
<AppenderRef ref="syslog"/>
7182
</Logger>
83+
<Logger name="org.apache.storm.metric.LoggingClusterMetricsConsumer" level="info" additivity="false">
84+
<appender-ref ref="METRICS"/>
85+
</Logger>
7286
<root level="info"> <!-- We log everything -->
7387
<appender-ref ref="A1"/>
7488
<appender-ref ref="syslog"/>

storm-core/src/clj/org/apache/storm/daemon/nimbus.clj

+127-65
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,16 @@
5454
(:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils])
5555
(:use [org.apache.storm util config log converter])
5656
(:require [org.apache.storm [converter :as converter]])
57+
(:require [org.apache.storm.ui.core :as ui])
5758
(:require [clojure.set :as set])
5859
(:import [org.apache.storm.daemon.common StormBase Assignment])
5960
(:import [org.apache.storm.zookeeper Zookeeper])
6061
(:use [org.apache.storm.daemon common])
6162
(:use [org.apache.storm config])
6263
(:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
64+
(:import [org.apache.storm.metric ClusterMetricsConsumerExecutor]
65+
[org.apache.storm.metric.api IClusterMetricsConsumer$ClusterInfo DataPoint IClusterMetricsConsumer$SupervisorInfo]
66+
[org.apache.storm Config])
6367
(:import [org.apache.storm.utils VersionInfo LocalState]
6468
[org.json.simple JSONValue])
6569
(:require [clj-time.core :as time])
@@ -167,6 +171,13 @@
167171
(catch Exception e
168172
(log-warn-error e "Ingoring exception, Could not initialize " (conf NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN)))))))
169173

174+
(defn mk-cluster-metrics-consumer-executors [storm-conf]
175+
(map
176+
(fn [consumer]
177+
(ClusterMetricsConsumerExecutor. (get consumer "class")
178+
(get consumer "argument")))
179+
(get storm-conf STORM-CLUSTER-METRICS-CONSUMER-REGISTER)))
180+
170181
(defn nimbus-data [conf inimbus]
171182
(let [forced-scheduler (.getForcedScheduler inimbus)]
172183
{:conf conf
@@ -209,6 +220,7 @@
209220
:topo-history-state (ConfigUtils/nimbusTopoHistoryState conf)
210221
:nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf)
211222
:nimbus-topology-action-notifier (create-tology-action-notifier conf)
223+
:cluster-consumer-executors (mk-cluster-metrics-consumer-executors conf)
212224
}))
213225

214226
(defn inbox [nimbus]
@@ -1358,13 +1370,42 @@
13581370
(and (>= val lower)
13591371
(<= val upper)))
13601372
1361-
;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
1362-
(defserverfn service-handler [conf inimbus]
1363-
(.prepare inimbus conf (ConfigUtils/masterInimbusDir conf))
1364-
(log-message "Starting Nimbus with conf " conf)
1365-
(let [nimbus (nimbus-data conf inimbus)
1366-
blob-store (:blob-store nimbus)
1367-
principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
1373+
(defn extract-cluster-metrics [^ClusterSummary summ]
1374+
(let [cluster-summ (ui/cluster-summary summ "nimbus")]
1375+
{:cluster-info (IClusterMetricsConsumer$ClusterInfo. (System/currentTimeMillis))
1376+
:data-points (map
1377+
(fn [[k v]] (DataPoint. k v))
1378+
(select-keys cluster-summ ["supervisors" "topologies" "slotsTotal" "slotsUsed" "slotsFree"
1379+
"executorsTotal" "tasksTotal"]))}))
1380+
(defn extract-supervisors-metrics [^ClusterSummary summ]
1381+
(let [sups (.get_supervisors summ)
1382+
supervisors-summ ((ui/supervisor-summary sups) "supervisors")]
1383+
(map (fn [supervisor-summ]
1384+
{:supervisor-info (IClusterMetricsConsumer$SupervisorInfo.
1385+
(supervisor-summ "host")
1386+
(supervisor-summ "id")
1387+
(System/currentTimeMillis))
1388+
:data-points (map
1389+
(fn [[k v]] (DataPoint. k v))
1390+
(select-keys supervisor-summ ["slotsTotal" "slotsUsed" "totalMem" "totalCpu"
1391+
"usedMem" "usedCpu"]))})
1392+
supervisors-summ)))
1393+
1394+
(defn send-cluster-metrics-to-executors [nimbus-service nimbus]
1395+
(let [cluster-summary (.getClusterInfo nimbus-service)
1396+
cluster-metrics (extract-cluster-metrics cluster-summary)
1397+
supervisors-metrics (extract-supervisors-metrics cluster-summary)]
1398+
(dofor
1399+
[consumer-executor (:cluster-consumer-executors nimbus)]
1400+
(do
1401+
(.handleDataPoints consumer-executor (:cluster-info cluster-metrics) (:data-points cluster-metrics))
1402+
(dofor
1403+
[supervisor-metrics supervisors-metrics]
1404+
(do
1405+
(.handleDataPoints consumer-executor (:supervisor-info supervisor-metrics) (:data-points supervisor-metrics))))))))
1406+
1407+
(defn mk-reified-nimbus [nimbus conf blob-store]
1408+
(let [principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)
13681409
admin-users (or (.get conf NIMBUS-ADMINS) [])
13691410
get-common-topo-info
13701411
(fn [^String storm-id operation]
@@ -1400,64 +1441,6 @@
14001441
(doto (ErrorInfo. (:error e) (:time-secs e))
14011442
(.set_host (:host e))
14021443
(.set_port (:port e)))))]
1403-
(.prepare ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus) conf)
1404-
1405-
;add to nimbuses
1406-
(.addNimbusHost (:storm-cluster-state nimbus) (.toHostPortString (:nimbus-host-port-info nimbus))
1407-
(NimbusSummary.
1408-
(.getHost (:nimbus-host-port-info nimbus))
1409-
(.getPort (:nimbus-host-port-info nimbus))
1410-
(Time/currentTimeSecs)
1411-
false ;is-leader
1412-
STORM-VERSION))
1413-
1414-
(.addToLeaderLockQueue (:leader-elector nimbus))
1415-
(cleanup-corrupt-topologies! nimbus)
1416-
(when (instance? LocalFsBlobStore blob-store)
1417-
;register call back for blob-store
1418-
(.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf nimbus)))
1419-
(setup-blobstore nimbus))
1420-
1421-
(when (is-leader nimbus :throw-exception false)
1422-
(doseq [storm-id (.activeStorms (:storm-cluster-state nimbus))]
1423-
(transition! nimbus storm-id :startup)))
1424-
1425-
(.scheduleRecurring (:timer nimbus)
1426-
0
1427-
(conf NIMBUS-MONITOR-FREQ-SECS)
1428-
(fn []
1429-
(when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
1430-
(locking (:submit-lock nimbus)
1431-
(mk-assignments nimbus)))
1432-
(do-cleanup nimbus)))
1433-
;; Schedule Nimbus inbox cleaner
1434-
(.scheduleRecurring (:timer nimbus)
1435-
0
1436-
(conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
1437-
(fn [] (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
1438-
;; Schedule nimbus code sync thread to sync code from other nimbuses.
1439-
(if (instance? LocalFsBlobStore blob-store)
1440-
(.scheduleRecurring (:timer nimbus)
1441-
0
1442-
(conf NIMBUS-CODE-SYNC-FREQ-SECS)
1443-
(fn [] (blob-sync conf nimbus))))
1444-
;; Schedule topology history cleaner
1445-
(when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
1446-
(.scheduleRecurring (:timer nimbus)
1447-
0
1448-
(conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
1449-
(fn [] (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus))))
1450-
(.scheduleRecurring (:timer nimbus)
1451-
0
1452-
(conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
1453-
(fn []
1454-
(renew-credentials nimbus)))
1455-
1456-
(def nimbus:num-supervisors (StormMetricsRegistry/registerGauge "nimbus:num-supervisors"
1457-
(fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil)))))
1458-
1459-
(StormMetricsRegistry/startMetricsReporters conf)
1460-
14611444
(reify Nimbus$Iface
14621445
(^void submitTopologyWithOpts
14631446
[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
@@ -2195,6 +2178,85 @@
21952178
(isWaiting [this]
21962179
(.isTimerWaiting (:timer nimbus))))))
21972180
2181+
;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
2182+
(defserverfn service-handler [conf inimbus]
2183+
(.prepare inimbus conf (ConfigUtils/masterInimbusDir conf))
2184+
(log-message "Starting Nimbus with conf " conf)
2185+
(let [nimbus (nimbus-data conf inimbus)
2186+
blob-store (:blob-store nimbus)]
2187+
(.prepare ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus) conf)
2188+
2189+
;add to nimbuses
2190+
(.addNimbusHost (:storm-cluster-state nimbus) (.toHostPortString (:nimbus-host-port-info nimbus))
2191+
(NimbusSummary.
2192+
(.getHost (:nimbus-host-port-info nimbus))
2193+
(.getPort (:nimbus-host-port-info nimbus))
2194+
(Time/currentTimeSecs)
2195+
false ;is-leader
2196+
STORM-VERSION))
2197+
2198+
(.addToLeaderLockQueue (:leader-elector nimbus))
2199+
(cleanup-corrupt-topologies! nimbus)
2200+
(when (instance? LocalFsBlobStore blob-store)
2201+
;register call back for blob-store
2202+
(.blobstore (:storm-cluster-state nimbus) (fn [] (blob-sync conf nimbus)))
2203+
(setup-blobstore nimbus))
2204+
2205+
(doseq [consumer (:cluster-consumer-executors nimbus)]
2206+
(.prepare consumer))
2207+
2208+
(when (is-leader nimbus :throw-exception false)
2209+
(doseq [storm-id (.activeStorms (:storm-cluster-state nimbus))]
2210+
(transition! nimbus storm-id :startup)))
2211+
2212+
(.scheduleRecurring (:timer nimbus)
2213+
0
2214+
(conf NIMBUS-MONITOR-FREQ-SECS)
2215+
(fn []
2216+
(when-not (conf ConfigUtils/NIMBUS_DO_NOT_REASSIGN)
2217+
(locking (:submit-lock nimbus)
2218+
(mk-assignments nimbus)))
2219+
(do-cleanup nimbus)))
2220+
;; Schedule Nimbus inbox cleaner
2221+
(.scheduleRecurring (:timer nimbus)
2222+
0
2223+
(conf NIMBUS-CLEANUP-INBOX-FREQ-SECS)
2224+
(fn [] (clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))))
2225+
;; Schedule nimbus code sync thread to sync code from other nimbuses.
2226+
(if (instance? LocalFsBlobStore blob-store)
2227+
(.scheduleRecurring (:timer nimbus)
2228+
0
2229+
(conf NIMBUS-CODE-SYNC-FREQ-SECS)
2230+
(fn [] (blob-sync conf nimbus))))
2231+
;; Schedule topology history cleaner
2232+
(when-let [interval (conf LOGVIEWER-CLEANUP-INTERVAL-SECS)]
2233+
(.scheduleRecurring (:timer nimbus)
2234+
0
2235+
(conf LOGVIEWER-CLEANUP-INTERVAL-SECS)
2236+
(fn [] (clean-topology-history (conf LOGVIEWER-CLEANUP-AGE-MINS) nimbus))))
2237+
(.scheduleRecurring (:timer nimbus)
2238+
0
2239+
(conf NIMBUS-CREDENTIAL-RENEW-FREQ-SECS)
2240+
(fn []
2241+
(renew-credentials nimbus)))
2242+
2243+
(def nimbus:num-supervisors (StormMetricsRegistry/registerGauge "nimbus:num-supervisors"
2244+
(fn [] (.size (.supervisors (:storm-cluster-state nimbus) nil)))))
2245+
2246+
(StormMetricsRegistry/startMetricsReporters conf)
2247+
2248+
(let [reified-inimbus (mk-reified-nimbus nimbus conf blob-store)]
2249+
(do
2250+
(if (:cluster-consumer-executors nimbus)
2251+
(.scheduleRecurring (:timer nimbus)
2252+
0
2253+
(conf STORM-CLUSTER-METRICS-CONSUMER-PUBLISH-INTERVAL-SECS)
2254+
(fn []
2255+
(when (is-leader nimbus :throw-exception false)
2256+
(send-cluster-metrics-to-executors reified-inimbus nimbus))))))
2257+
reified-inimbus)))
2258+
2259+
21982260
(defn validate-port-available[conf]
21992261
(try
22002262
(let [socket (ServerSocket. (conf NIMBUS-THRIFT-PORT))]

storm-core/src/jvm/org/apache/storm/Config.java

+16
Original file line numberDiff line numberDiff line change
@@ -1519,6 +1519,22 @@ public class Config extends HashMap<String, Object> {
15191519
@isPositiveNumber
15201520
public static final String BACKPRESSURE_DISRUPTOR_LOW_WATERMARK="backpressure.disruptor.low.watermark";
15211521

1522+
/**
1523+
* A list of classes implementing IClusterMetricsConsumer (See storm.yaml.example for exact config format).
1524+
* Each listed class will be routed cluster related metrics data.
1525+
* Each listed class maps 1:1 to a ClusterMetricsConsumerExecutor and they're executed in Nimbus.
1526+
* Only consumers which run in leader Nimbus receives metrics data.
1527+
*/
1528+
@isListEntryCustom(entryValidatorClasses = {ClusterMetricRegistryValidator.class})
1529+
public static final String STORM_CLUSTER_METRICS_CONSUMER_REGISTER = "storm.cluster.metrics.consumer.register";
1530+
1531+
/**
1532+
* How often cluster metrics data is published to metrics consumer.
1533+
*/
1534+
@NotNull
1535+
@isPositiveNumber
1536+
public static final String STORM_CLUSTER_METRICS_CONSUMER_PUBLISH_INTERVAL_SECS = "storm.cluster.metrics.consumer.publish.interval.secs";
1537+
15221538
/**
15231539
* A list of users that are allowed to interact with the topology. To use this set
15241540
* nimbus.authorizer to org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.storm.metric;
19+
20+
import org.apache.storm.Config;
21+
import org.apache.storm.metric.api.DataPoint;
22+
import org.apache.storm.metric.api.IClusterMetricsConsumer;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
import java.util.Collection;
27+
28+
public class ClusterMetricsConsumerExecutor {
29+
public static final Logger LOG = LoggerFactory.getLogger(ClusterMetricsConsumerExecutor.class);
30+
31+
private IClusterMetricsConsumer metricsConsumer;
32+
private String consumerClassName;
33+
private Object registrationArgument;
34+
35+
public ClusterMetricsConsumerExecutor(String consumerClassName, Object registrationArgument) {
36+
this.consumerClassName = consumerClassName;
37+
this.registrationArgument = registrationArgument;
38+
}
39+
40+
public void prepare() {
41+
try {
42+
metricsConsumer = (IClusterMetricsConsumer)Class.forName(consumerClassName).newInstance();
43+
} catch (Exception e) {
44+
throw new RuntimeException("Could not instantiate a class listed in config under section " +
45+
Config.STORM_CLUSTER_METRICS_CONSUMER_REGISTER + " with fully qualified name " + consumerClassName, e);
46+
}
47+
48+
metricsConsumer.prepare(registrationArgument);
49+
}
50+
51+
public void handleDataPoints(final IClusterMetricsConsumer.ClusterInfo clusterInfo, final Collection<DataPoint> dataPoints) {
52+
try {
53+
metricsConsumer.handleDataPoints(clusterInfo, dataPoints);
54+
} catch (Throwable e) {
55+
LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e);
56+
}
57+
}
58+
59+
public void handleDataPoints(final IClusterMetricsConsumer.SupervisorInfo supervisorInfo, final Collection<DataPoint> dataPoints) {
60+
try {
61+
metricsConsumer.handleDataPoints(supervisorInfo, dataPoints);
62+
} catch (Throwable e) {
63+
LOG.error("Error while handling cluster data points, consumer class: " + consumerClassName, e);
64+
}
65+
}
66+
67+
public void cleanup() {
68+
metricsConsumer.cleanup();
69+
}
70+
}

0 commit comments

Comments
 (0)