Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit d2845d8

Browse files
committedSep 13, 2017
Add NSQ as a stream source (closes #64)
1 parent d98fcf2 commit d2845d8

File tree

6 files changed

+357
-96
lines changed

6 files changed

+357
-96
lines changed
 

‎build.sbt

+2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ lazy val root = project.in(file("."))
3030
Dependencies.Libraries.elephantbird,
3131
Dependencies.Libraries.hadoopLZO,
3232
Dependencies.Libraries.jodaTime,
33+
Dependencies.Libraries.nsqClient,
34+
Dependencies.Libraries.jacksonCbor,
3335
// Scala
3436
Dependencies.Libraries.scopt,
3537
Dependencies.Libraries.config,

‎project/Dependencies.scala

+9-2
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import sbt._
1414

1515
object Dependencies {
1616
val resolvers = Seq(
17+
Resolver.jcenterRepo,
1718
"Snowplow Analytics Maven releases repo" at "http://maven.snplow.com/releases/",
1819
"Twitter maven repo" at "http://maven.twttr.com/"
1920
)
@@ -28,6 +29,8 @@ object Dependencies {
2829
val hadoopLZO = "0.4.20"
2930
val jodaTime = "2.9.9"
3031
val config = "1.3.1"
32+
val nsqClient = "1.1.0-rc1"
33+
val jacksonCbor = "2.8.8"
3134
// Thrift (test only)
3235
val collectorPayload = "0.0.0"
3336
// Scala
@@ -42,8 +45,11 @@ object Dependencies {
4245
object Libraries {
4346
// Java
4447
val slf4j = "org.slf4j" % "slf4j-simple" % V.slf4j
45-
val kinesisClient = "com.amazonaws" % "amazon-kinesis-client" % V.kinesisClient
46-
val kinesisConnector = "com.amazonaws" % "amazon-kinesis-connectors" % V.kinesisConnector
48+
val kinesisClient = ("com.amazonaws" % "amazon-kinesis-client" % V.kinesisClient)
49+
.exclude("com.fasterxml.jackson.dataformat", "jackson-dataformat-cbor")
50+
val kinesisConnector = ("com.amazonaws" % "amazon-kinesis-connectors" % V.kinesisConnector)
51+
.exclude("com.fasterxml.jackson.dataformat", "jackson-dataformat-cbor")
52+
val jacksonCbor = "com.fasterxml.jackson.dataformat" % "jackson-dataformat-cbor" % V.jacksonCbor
4753
val hadoop = ("org.apache.hadoop" % "hadoop-common" % V.hadoop)
4854
.exclude("org.slf4j", "slf4j-log4j12")
4955
.exclude("commons-beanutils", "commons-beanutils")
@@ -56,6 +62,7 @@ object Dependencies {
5662
val hadoopLZO = "com.hadoop.gplcompression" % "hadoop-lzo" % V.hadoopLZO
5763
val jodaTime = "joda-time" % "joda-time" % V.jodaTime
5864
val config = "com.typesafe" % "config" % V.config
65+
val nsqClient = "com.snowplowanalytics" % "nsq-java-client_2.10" % V.nsqClient
5966
// Thrift (test only)
6067
val collectorPayload = "com.snowplowanalytics" % "collector-payload-1" % V.collectorPayload % "test"
6168
// Scala
+82-62
Original file line numberDiff line numberDiff line change
@@ -1,86 +1,106 @@
11
# Default configuration for kinesis-lzo-s3-sink
22

3-
sink {
4-
5-
# The following are used to authenticate for the Amazon Kinesis sink.
6-
#
7-
# If both are set to 'default', the default provider chain is used
8-
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
9-
#
10-
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
11-
#
12-
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
13-
aws {
14-
access-key: "iam"
15-
secret-key: "iam"
16-
}
3+
# Sources currently supported are:
4+
# 'kinesis' for reading records from a Kinesis stream
5+
# 'nsq' for reading records from a NSQ topic
6+
source: "{{source}}"
177

18-
kinesis {
19-
in {
20-
# Kinesis input stream name
21-
stream-name: "{{sinkKinesisInStreamName}}"
8+
# Sinks currently supported are:
9+
# 'kinesis' for writing records to a Kinesis stream
10+
# 'nsq' for writing records to a NSQ topic
11+
sink: "{{sink}}"
2212

23-
# LATEST: most recent data.
24-
# TRIM_HORIZON: oldest available data.
25-
# Note: This only affects the first run of this application
26-
# on a stream.
27-
initial-position: "TRIM_HORIZON"
13+
# The following are used to authenticate for the Amazon Kinesis sink.
14+
#
15+
# If both are set to 'default', the default provider chain is used
16+
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
17+
#
18+
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
19+
#
20+
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
21+
aws {
22+
access-key: "iam"
23+
secret-key: "iam"
24+
}
2825

29-
# Maximum number of records to read per GetRecords call
30-
max-records: {{sinkKinesisMaxRecords}}
31-
}
26+
# Config for NSQ
27+
nsq {
28+
# Channel name for NSQ source
29+
channel-name: "{{NsqSourceChannelName}}"
30+
31+
# Host name for NSQ tools
32+
host: "{{NsqHost}}"
3233

33-
out {
34-
# Stream for events for which the storage process fails
35-
stream-name: "{{sinkKinesisOutStreamName}}"
36-
}
34+
# Port for nsqd
35+
port: "{{NsqdPort}}"
3736

38-
region: "{{sinkKinesisRegion}}"
37+
# Port for nsqlookupd
38+
lookup-port: {{NsqlookupdPort}}
39+
}
3940

40-
# "app-name" is used for a DynamoDB table to maintain stream state.
41-
# You can set it automatically using: "SnowplowLzoS3Sink-$\\{sink.kinesis.in.stream-name\\}"
42-
app-name: "{{sinkKinesisAppName}}"
43-
}
41+
kinesis {
42+
# LATEST: most recent data.
43+
# TRIM_HORIZON: oldest available data.
44+
# Note: This only affects the first run of this application
45+
# on a stream.
46+
initial-position: "TRIM_HORIZON"
4447

45-
s3 {
46-
# If using us-east-1, then endpoint should be "http://s3.amazonaws.com".
47-
# Otherwise "http://s3-<<region>>.s3.amazonaws.com", e.g.
48-
# http://s3-eu-west-1.amazonaws.com
49-
region: "{{sinkKinesisS3Region}}"
50-
bucket: "{{sinkKinesisS3Bucket}}"
48+
# Maximum number of records to read per GetRecords call
49+
max-records: {{sinkKinesisMaxRecords}}
5150

52-
# Format is one of lzo or gzip
53-
# Note, that you can use gzip only for enriched data stream.
54-
format: "{{sinkKinesisFormat}}"
51+
region: "{{sinkKinesisRegion}}"
5552

56-
# Maximum Timeout that the application is allowed to fail for
57-
max-timeout: {{sinkKinesisMaxTimeout}}
58-
}
53+
# "app-name" is used for a DynamoDB table to maintain stream state.
54+
# You can set it automatically using: "SnowplowLzoS3Sink-$\\{sink.kinesis.in.stream-name\\}"
55+
app-name: "{{sinkKinesisAppName}}"
56+
}
57+
58+
streams {
59+
# Input stream name
60+
stream-name-in = "{{InStreamName}}"
61+
62+
# Stream for events for which the storage process fails
63+
stream-name-out = "{{OutStreamName}}"
5964

6065
# Events are accumulated in a buffer before being sent to S3.
6166
# The buffer is emptied whenever:
6267
# - the combined size of the stored records exceeds byte-limit or
6368
# - the number of stored records exceeds record-limit or
6469
# - the time in milliseconds since it was last emptied exceeds time-limit
6570
buffer {
66-
byte-limit: {{sinkLzoBufferByteThreshold}}
71+
byte-limit: {{sinkLzoBufferByteThreshold}} # Not supported by NSQ; will be ignored
6772
record-limit: {{sinkLzoBufferRecordThreshold}}
68-
time-limit: {{sinkLzoBufferTimeThreshold}}
73+
time-limit: {{sinkLzoBufferTimeThreshold}} # Not supported by NSQ; will be ignored
6974
}
75+
}
7076

71-
# Set the Logging Level for the S3 Sink
72-
# Options: ERROR, WARN, INFO, DEBUG, TRACE
73-
logging {
74-
level: "{{sinkLzoLogLevel}}"
75-
}
77+
s3 {
78+
# If using us-east-1, then endpoint should be "http://s3.amazonaws.com".
79+
# Otherwise "http://s3-<<region>>.s3.amazonaws.com", e.g.
80+
# http://s3-eu-west-1.amazonaws.com
81+
region: "{{sinkKinesisS3Region}}"
82+
bucket: "{{sinkKinesisS3Bucket}}"
83+
84+
# Format is one of lzo or gzip
85+
# Note, that you can use gzip only for enriched data stream.
86+
format: "{{sinkKinesisFormat}}"
87+
88+
# Maximum Timeout that the application is allowed to fail for
89+
max-timeout: {{sinkKinesisMaxTimeout}}
90+
}
91+
92+
# Set the Logging Level for the S3 Sink
93+
# Options: ERROR, WARN, INFO, DEBUG, TRACE
94+
logging {
95+
level: "{{sinkLzoLogLevel}}"
96+
}
7697

77-
# Optional section for tracking endpoints
78-
monitoring {
79-
snowplow {
80-
collector-uri: "{{collectorUri}}"
81-
collector-port: 80
82-
app-id: "{{sinkLzoAppName}}"
83-
method: "GET"
84-
}
98+
# Optional section for tracking endpoints
99+
monitoring {
100+
snowplow {
101+
collector-uri: "{{collectorUri}}"
102+
collector-port: 80
103+
app-id: "{{sinkLzoAppName}}"
104+
method: "GET"
85105
}
86106
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/**
2+
* Copyright (c) 2014-2016 Snowplow Analytics Ltd.
3+
* All rights reserved.
4+
*
5+
* This program is licensed to you under the Apache License Version 2.0,
6+
* and you may not use this file except in compliance with the Apache
7+
* License Version 2.0.
8+
* You may obtain a copy of the Apache License Version 2.0 at
9+
* http://www.apache.org/licenses/LICENSE-2.0.
10+
*
11+
* Unless required by applicable law or agreed to in writing,
12+
* software distributed under the Apache License Version 2.0 is distributed
13+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
14+
* either express or implied.
15+
*
16+
* See the Apache License Version 2.0 for the specific language
17+
* governing permissions and limitations there under.
18+
*/
19+
20+
package com.snowplowanalytics.snowplow.storage.kinesis.s3
21+
22+
// NSQ
23+
import com.snowplowanalytics.client.nsq.NSQConsumer
24+
import com.snowplowanalytics.client.nsq.lookup.DefaultNSQLookup
25+
import com.snowplowanalytics.client.nsq.NSQMessage
26+
import com.snowplowanalytics.client.nsq.NSQConfig
27+
import com.snowplowanalytics.client.nsq.callbacks.NSQMessageCallback
28+
import com.snowplowanalytics.client.nsq.callbacks.NSQErrorCallback
29+
import com.snowplowanalytics.client.nsq.exceptions.NSQException
30+
31+
// Scala
32+
import scala.collection.mutable.ListBuffer
33+
import scala.collection.JavaConversions._
34+
import scala.util.Random
35+
36+
// Tracker
37+
import com.snowplowanalytics.snowplow.scalatracker.Tracker
38+
39+
// Scalaz
40+
import scalaz._
41+
import Scalaz._
42+
43+
// Joda-Time
44+
import org.joda.time.{DateTime, DateTimeZone}
45+
import org.joda.time.format.DateTimeFormat
46+
47+
// Logging
48+
import org.slf4j.LoggerFactory
49+
50+
// This project
51+
import sinks._
52+
import serializers._
53+
54+
/**
55+
* Executor for NSQSource
56+
*
57+
* @param config the kinesis config for getting informations for S3
58+
* @param nsqConfig the NSQ configuration
59+
* @param badSink the configured BadSink
60+
* @param serializer the instance of one of the serializer
61+
* @param maxConnectionTime max time for trying to connect S3 instance
62+
*/
63+
class NsqSourceExecutor(
64+
config: KinesisConnectorConfiguration,
65+
nsqConfig: S3LoaderNsqConfig,
66+
badSink: ISink,
67+
serializer: ISerializer,
68+
maxConnectionTime: Long,
69+
tracker: Option[Tracker]
70+
) extends Runnable {
71+
72+
lazy val log = LoggerFactory.getLogger(getClass())
73+
74+
//nsq messages will be buffered in msgBuffer until buffer size become equal to nsqBufferSize
75+
val msgBuffer = new ListBuffer[EmitterInput]()
76+
77+
val s3Emitter = new S3Emitter(config, badSink, maxConnectionTime, tracker)
78+
private val TimeFormat = DateTimeFormat.forPattern("HH:mm:ss.SSS").withZone(DateTimeZone.UTC)
79+
private val DateFormat = DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.UTC)
80+
81+
private def getBaseFilename(startTime: Long, endTime: Long): String = {
82+
def abs (e: Int) = if (e > 0) e else -e
83+
val currentTimeObject = new DateTime(System.currentTimeMillis())
84+
val startTimeObject = new DateTime(startTime)
85+
val endTimeObject = new DateTime(endTime)
86+
val rand = Random
87+
val randNum = rand.nextInt
88+
89+
DateFormat.print(currentTimeObject) + "-" +
90+
TimeFormat.print(startTimeObject) + "-" +
91+
TimeFormat.print(endTimeObject) + "-" + abs(randNum)
92+
}
93+
94+
override def run: Unit = {
95+
96+
val nsqCallback = new NSQMessageCallback {
97+
//start time of filling the buffer
98+
var bufferStartTime = System.currentTimeMillis()
99+
val nsqBufferSize = config.streams.buffer.recordLimit
100+
101+
override def message(msg: NSQMessage): Unit = {
102+
val validMsg = msg.getMessage.success
103+
msgBuffer.synchronized {
104+
msgBuffer += validMsg
105+
msg.finished()
106+
if (msgBuffer.size >= nsqBufferSize) {
107+
//finish time of filling the buffer
108+
val bufferEndTime = System.currentTimeMillis()
109+
val baseFilename = getBaseFilename(bufferStartTime, bufferEndTime)
110+
val serializationResults = serializer.serialize(msgBuffer.toList, baseFilename)
111+
val (successes, failures) = serializationResults.results.partition(_.isSuccess)
112+
113+
if (successes.size > 0) {
114+
serializationResults.namedStreams.foreach {
115+
val connectionAttemptStartTime = System.currentTimeMillis()
116+
s3Emitter.attemptEmit(_, connectionAttemptStartTime) match {
117+
case false => log.error(s"Error while sending to S3")
118+
case true =>
119+
}
120+
}
121+
}
122+
123+
if (failures.size > 0) {
124+
s3Emitter.sendFailures(failures)
125+
}
126+
127+
msgBuffer.clear()
128+
//make buffer start time of the next buffer the buffer finish time of the current buffer
129+
bufferStartTime = bufferEndTime
130+
}
131+
}
132+
}
133+
}
134+
135+
val errorCallback = new NSQErrorCallback {
136+
override def error(e: NSQException) =
137+
log.error(s"Exception while consuming topic $nsqConfig.nsqGoodSourceTopicName", e)
138+
}
139+
140+
val lookup = new DefaultNSQLookup
141+
// use NSQLookupd
142+
lookup.addLookupAddress(nsqConfig.nsqHost, nsqConfig.nsqlookupPort)
143+
val consumer = new NSQConsumer(lookup,
144+
nsqConfig.nsqSourceTopicName,
145+
nsqConfig.nsqSourceChannelName,
146+
nsqCallback,
147+
new NSQConfig(),
148+
errorCallback)
149+
consumer.start()
150+
}
151+
}

0 commit comments

Comments
 (0)
Please sign in to comment.