23
23
import org .joda .time .format .DateTimeFormatter ;
24
24
import org .joda .time .format .ISODateTimeFormat ;
25
25
26
+ import java .util .Map ;
26
27
import java .util .Properties ;
27
28
28
29
public class JsonEventSerde
29
30
extends JsonSerde
30
31
{
31
32
public static final DateTimeFormatter ISO_FORMATTER = ISODateTimeFormat .dateTime ().withZone (DateTimeZone .UTC );
32
33
public static final DateTimeFormatter HIVE_FORMATTER = DateTimeFormat .forPattern ("yyyy-MM-dd' 'HH:mm:ss" ).withZone (DateTimeZone .UTC );
34
+ private Integer uuidColumn ;
35
+ private Integer hostColumn ;
33
36
private Integer timestampColumn ;
34
37
35
38
@ Override
@@ -38,7 +41,10 @@ public void initialize(Configuration configuration, Properties table)
38
41
{
39
42
super .initialize (configuration , table );
40
43
41
- timestampColumn = columnNameMap .getColumnNames (rootTypeInfo ).get ("ts" );
44
+ Map <String , Integer > columnNames = columnNameMap .getColumnNames (rootTypeInfo );
45
+ uuidColumn = columnNames .get ("uuid" );
46
+ hostColumn = columnNames .get ("host" );
47
+ timestampColumn = columnNames .get ("ts" );
42
48
}
43
49
44
50
@ Override
@@ -55,29 +61,41 @@ protected Object[] buildStruct(JsonNode tree)
55
61
56
62
Object [] struct = processFields (dataNode );
57
63
64
+ if (uuidColumn != null ) {
65
+ struct [uuidColumn ] = getTextNode (tree , "uuid" );
66
+ }
67
+ if (hostColumn != null ) {
68
+ struct [hostColumn ] = getTextNode (tree , "host" );
69
+ }
58
70
if (timestampColumn != null ) {
59
- struct [timestampColumn ] = HIVE_FORMATTER .print (parseTimestamp (tree ));
71
+ long ts = parseTimestamp (getTextNode (tree , "timestamp" ));
72
+ struct [timestampColumn ] = HIVE_FORMATTER .print (ts );
60
73
}
61
74
62
75
return struct ;
63
76
}
64
77
65
- private static long parseTimestamp (JsonNode tree )
78
+ private static long parseTimestamp (String timestamp )
66
79
throws SerDeException
67
80
{
68
- if (!tree .has ("timestamp" )) {
69
- throw new SerDeException ("timestamp field is missing" );
70
- }
71
- JsonNode node = tree .get ("timestamp" );
72
- if (!node .isTextual ()) {
73
- throw new SerDeException ("timestamp field is not text" );
74
- }
75
- String timestamp = node .getTextValue ();
76
81
try {
77
82
return ISO_FORMATTER .parseMillis (timestamp );
78
83
}
79
84
catch (Exception e ) {
80
85
throw new SerDeException ("invalid timestamp: " + timestamp );
81
86
}
82
87
}
88
+
89
+ private static String getTextNode (JsonNode tree , String field )
90
+ throws SerDeException
91
+ {
92
+ if (!tree .has (field )) {
93
+ throw new SerDeException (field + " field is missing" );
94
+ }
95
+ JsonNode node = tree .get (field );
96
+ if (!node .isTextual ()) {
97
+ throw new SerDeException (field + " field is not text" );
98
+ }
99
+ return node .getTextValue ();
100
+ }
83
101
}
0 commit comments