Skip to content

Commit 6c164d8

Browse files
author
yi.wu
committed
add sample topo
1 parent 3bc7343 commit 6c164d8

File tree

11 files changed

+653
-0
lines changed

11 files changed

+653
-0
lines changed

storm_logcount_demo/README.md

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# storm logcount demo
2+
This is a demo storm topo run on k8s cluster
3+
read data from kafka DS.Input.Item topic and record the msg count in log file

storm_logcount_demo/bin/start.sh

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
source /etc/profile
2+
cd /opt/sys/topology/stormdemo
3+
/opt/sys/apache-storm-0.9.4/bin/storm jar stormdemo-1.0-jar-with-dependencies.jar storm.real_time_info.StromDemo 1>/tmp/stormdemo.log 2>/tmp/stormdemo.err
4+

storm_logcount_demo/bin/stop.py

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
#coding: utf8
2+
3+
import time
4+
import os
5+
import datetime
6+
7+
def kill_topology_yestoday(topo_name):
8+
yesterday = (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d')
9+
yesterday_topology = '%s%s'%(topo_name, yesterday)
10+
cmd = "PATH=/opt/java/bin /opt/sys/storm/bin/storm kill -w 5 %s"%yesterday_topology
11+
os.system(cmd)
12+
13+
kill_topology_yestoday("stormdemo_")
+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# 配置spout。
2+
kafka_zk_addr = 172.24.2.35:2181,172.24.2.36:2181,172.24.2.37:2181/kafka08
3+
kafka_zk_path = /brokers
4+
kafka_offset_zk_rootpath =/stormdemo/spout
5+
kafka_startoffset=-1
6+
7+
# 配置拓扑
8+
topology_name = stormdemo
9+
spout_name = stormdemo
10+
spout_kafka_parl=4
11+
bolt_parl=32
12+
max_spout_pending=4
13+
num_workers=4
14+
num_ackers=4
15+
debug=false
16+
17+
# other configurations
18+
something_component_need = something

storm_logcount_demo/conf/crontab.conf

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
1 0 * * * /bin/bash -x /opt/sys/topology/stormdemo/start.sh
2+
2 0 * * * python /opt/sys/topology/stormdemo/stop.py
+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
依赖的组件:
2+
3+
数据流:
4+
从线上kafka读取按天的topic
5+
结果存入XXXX集群
6+
7+
数据位置:
8+
kafka集群:
9+
zk:
10+
11+
监控是否就绪:否,这是个统计类拓扑,监控可以择时加
12+
13+
需要达到的性能指标:消息时延最大可接受1分钟, 吞吐量必须能跟上线上的按天数据(10000/s)
14+
15+
是否可以再平衡/重启:可以
16+
17+
测试报告:
18+
测试环境:
19+
测试结果:
20+
测试性能:
21+
22+
部署过程:
23+
cp stormdemo.tgz /opt/sys/topology
24+
cd /opt/sys/topology
25+
tar xvf stormdemo.tgz
26+
修改 stormdemo/conf/conf.properties以下配置项:
27+
kafka_zk_addr: 线上kafka对应的zk集群
28+
用crontab -e把conf/crontab.conf中的内容加入
29+
运行sh bin/start.sh 启动拓扑
30+
31+
特别注意:

storm_logcount_demo/pom.xml

+100
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
5+
<groupId>storm.bfd</groupId>
6+
<artifactId>stormdemo</artifactId>
7+
<version>1.0</version>
8+
<packaging>jar</packaging>
9+
10+
<properties>
11+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
12+
</properties>
13+
<repositories>
14+
<repository>
15+
<id>clojars.org</id>
16+
<url>http://clojars.org/repo</url>
17+
</repository>
18+
<repository>
19+
<id>maven-central-repo</id>
20+
<url>http://repo1.maven.org/maven2/</url>
21+
</repository>
22+
</repositories>
23+
24+
<dependencies>
25+
<dependency>
26+
<groupId>org.apache.storm</groupId>
27+
<artifactId>storm-core</artifactId>
28+
<version>0.9.4</version>
29+
<scope>provided</scope>
30+
</dependency>
31+
<dependency>
32+
<groupId>org.slf4j</groupId>
33+
<artifactId>slf4j-api</artifactId>
34+
<version>1.7.13</version>
35+
</dependency>
36+
<dependency>
37+
<groupId>org.apache.storm</groupId>
38+
<artifactId>storm-kafka</artifactId>
39+
<version>0.9.4</version>
40+
</dependency>
41+
<dependency>
42+
<groupId>org.apache.kafka</groupId>
43+
<artifactId>kafka_2.10</artifactId>
44+
<version>0.8.2.1</version>
45+
<exclusions>
46+
<exclusion>
47+
<groupId>org.apache.zookeeper</groupId>
48+
<artifactId>zookeeper</artifactId>
49+
</exclusion>
50+
<exclusion>
51+
<groupId>log4j</groupId>
52+
<artifactId>log4j</artifactId>
53+
</exclusion>
54+
</exclusions>
55+
</dependency>
56+
</dependencies>
57+
58+
<build>
59+
<sourceDirectory>src</sourceDirectory>
60+
<resources>
61+
<resource>
62+
<directory>src/python</directory>
63+
</resource>
64+
</resources>
65+
<plugins>
66+
<plugin>
67+
<artifactId>maven-assembly-plugin</artifactId>
68+
<configuration>
69+
<descriptorRefs>
70+
<descriptorRef>jar-with-dependencies</descriptorRef>
71+
</descriptorRefs>
72+
<archive>
73+
<manifest>
74+
<mainClass></mainClass>
75+
</manifest>
76+
</archive>
77+
</configuration>
78+
<executions>
79+
<execution>
80+
<id>make-assembly</id>
81+
<phase>package</phase>
82+
<goals>
83+
<goal>single</goal>
84+
</goals>
85+
</execution>
86+
</executions>
87+
88+
</plugin>
89+
90+
<plugin>
91+
<groupId>org.apache.maven.plugins</groupId>
92+
<artifactId>maven-compiler-plugin</artifactId>
93+
<configuration>
94+
<source>1.7</source>
95+
<target>1.7</target>
96+
</configuration>
97+
</plugin>
98+
</plugins>
99+
</build>
100+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# encoding:utf-8
2+
3+
import storm
4+
import codis_client
5+
import zk_connect
6+
import traceback
7+
import datetime
8+
import json
9+
import config_filter_tool
10+
import threading
11+
import time
12+
13+
14+
15+
class MultiDimensionAnalyse(storm.BasicBolt):
16+
def initialize(self, stormconf, context):
17+
try:
18+
something_component_need = stormconf.get("something_component_need")
19+
20+
except Exception as e:
21+
storm.log('init shell bolt error: %s'%traceback.format_exc())
22+
23+
def _do_real_process(self, doc):
24+
try:
25+
pass
26+
except Exception as e:
27+
storm.logError('can not del doc:%s, error: %s'%(doc, traceback.format_exc()))
28+
29+
def process(self, tup):
30+
try:
31+
# 把kafka一条消息转换为一个dict
32+
with zk_connect.conf_lock:
33+
data_msg = tup.values[0]
34+
index = data_msg.find('}')
35+
data_obj = None
36+
if index != -1:
37+
topic = data_msg[1:index]
38+
doc = data_msg[index+1:]
39+
data_obj = json.loads(doc)
40+
data_obj['topic'] = topic
41+
42+
if not data_obj:
43+
return
44+
45+
self._do_real_process(data_obj)
46+
47+
except Exception as e:
48+
storm.logError('process message error: %s'%traceback.format_exc())
49+
50+
51+
MultiDimensionAnalyse().run()

0 commit comments

Comments
 (0)