@@ -14,6 +14,8 @@ package com.snowplowanalytics.s3.loader
14
14
15
15
// Scala
16
16
import scala .collection .JavaConverters ._
17
+ import scala .util .Try
18
+ import scala .util .{Success => TrySuccess }
17
19
18
20
// Java libs
19
21
import java .util .Calendar
@@ -32,10 +34,21 @@ import scala.collection.JavaConversions._
32
34
// Tracker
33
35
import com .snowplowanalytics .snowplow .scalatracker .Tracker
34
36
37
+ // Json4s
38
+ import org .json4s .jackson .JsonMethods .parse
39
+
40
+ // Iglu Core
41
+ import com .snowplowanalytics .iglu .core ._
42
+ import com .snowplowanalytics .iglu .core .json4s .implicits ._
43
+
44
+ // Scalaz
45
+ import scalaz ._
46
+
35
47
// This project
36
48
import sinks ._
37
49
import serializers ._
38
50
import model ._
51
+ import KinesisS3Emitter ._
39
52
40
53
/**
41
54
* Emitter for flushing Kinesis event data to S3.
@@ -48,19 +61,12 @@ class KinesisS3Emitter(
48
61
badSink : ISink ,
49
62
serializer : ISerializer ,
50
63
maxConnectionTime : Long ,
51
- tracker : Option [Tracker ]
64
+ tracker : Option [Tracker ],
65
+ partition : Boolean ,
66
+ partitionErrorDir : String
52
67
) extends IEmitter [EmitterInput ] {
53
68
54
69
val s3Emitter = new S3Emitter (s3Config, provider, badSink, maxConnectionTime, tracker)
55
- val dateFormat = new SimpleDateFormat (" yyyy-MM-dd" );
56
-
57
- /**
58
- * Determines the filename in S3, which is the corresponding
59
- * Kinesis sequence range of records in the file.
60
- */
61
- protected def getBaseFilename (firstSeq : String , lastSeq : String ): String =
62
- dateFormat.format(Calendar .getInstance().getTime()) +
63
- " -" + firstSeq + " -" + lastSeq
64
70
65
71
/**
66
72
* Reads items from a buffer and saves them to s3.
@@ -77,7 +83,19 @@ class KinesisS3Emitter(
77
83
s3Emitter.log.info(s " Flushing buffer with ${buffer.getRecords.size} records. " )
78
84
79
85
val records = buffer.getRecords().asScala.toList
80
- val baseFilename = getBaseFilename(buffer.getFirstSequenceNumber, buffer.getLastSequenceNumber)
86
+ if (partition) {
87
+ partitionWithSchemaKey(records, partitionErrorDir).foldLeft(List [EmitterInput ]()) {
88
+ case (acc, (prefix, l)) =>
89
+ val baseFilename = getBaseFilename(buffer.getFirstSequenceNumber, buffer.getLastSequenceNumber, Some (prefix.getName))
90
+ acc ::: emitRecords(l, baseFilename)
91
+ }
92
+ } else {
93
+ val baseFilename = getBaseFilename(buffer.getFirstSequenceNumber, buffer.getLastSequenceNumber)
94
+ emitRecords(records, baseFilename)
95
+ }
96
+ }
97
+
98
+ private def emitRecords (records : List [EmitterInput ], baseFilename : String ) = {
81
99
val serializationResults = serializer.serialize(records, baseFilename)
82
100
val (successes, failures) = serializationResults.results.partition(_.isSuccess)
83
101
@@ -108,3 +126,65 @@ class KinesisS3Emitter(
108
126
override def fail (records : java.util.List [EmitterInput ]): Unit =
109
127
s3Emitter.sendFailures(records)
110
128
}
129
+
130
+ object KinesisS3Emitter {
131
+
132
+ /** Type of row which determined according to schema of self describing data */
133
+ sealed trait RowType extends Product with Serializable {
134
+ def getName : String
135
+ }
136
+
137
+ object RowType {
138
+ /** Represents cases where row type could not be determined
139
+ * since either row is not valid json or it is not self
140
+ * describing json
141
+ */
142
+ case class PartitionError (errorDir : String ) extends RowType {
143
+ override def getName : String = errorDir
144
+ }
145
+
146
+ /** Represents cases where type of row can be determined successfully
147
+ * e.g. does have proper schema key
148
+ */
149
+ case class SelfDescribing (rowType : String ) extends RowType {
150
+ override def getName : String = rowType
151
+ }
152
+
153
+ case object UnexpectedError extends RowType {
154
+ override def getName : String = " unexpected_error"
155
+ }
156
+ }
157
+
158
+ val dateFormat = new SimpleDateFormat (" yyyy-MM-dd" );
159
+
160
+ /**
161
+ * Determines the filename in S3, which is the corresponding
162
+ * Kinesis sequence range of records in the file.
163
+ */
164
+ private def getBaseFilename (firstSeq : String , lastSeq : String , prefix : Option [String ] = None ): String =
165
+ prefix.map(p => if (p.isEmpty) " " else p + " /" ).getOrElse(" " ) + dateFormat.format(Calendar .getInstance().getTime()) +
166
+ " -" + firstSeq + " -" + lastSeq
167
+
168
+ /**
169
+ * Assume records are self describing data and group them according
170
+ * to their schema key. Put records which are not self describing data
171
+ * to under "old bad row type".
172
+ */
173
+ private [loader] def partitionWithSchemaKey (records : List [EmitterInput ], errorDir : String ) = {
174
+ records.groupBy {
175
+ case Success (byteRecord) =>
176
+ val strRecord = new String (byteRecord, " UTF-8" )
177
+ Try (parse(strRecord)) match {
178
+ case TrySuccess (e) =>
179
+ val json = parse(strRecord)
180
+ val schemaKey = SchemaKey .extract(json)
181
+ schemaKey.fold(
182
+ e => RowType .PartitionError (errorDir),
183
+ k => RowType .SelfDescribing (s " ${k.vendor}. ${k.name}" )
184
+ )
185
+ case _ => RowType .PartitionError (errorDir)
186
+ }
187
+ case _ => RowType .UnexpectedError
188
+ }
189
+ }
190
+ }
0 commit comments