@@ -18,34 +18,47 @@ import (
18
18
const (
19
19
solaceExtMetricType = "External"
20
20
solaceScalerID = "solace"
21
+
21
22
// REST ENDPOINT String Patterns
22
- solaceSempEndpointURLTemplate = "%s/%s/%s/monitor/msgVpns/%s/%ss/%s"
23
+ solaceSempQueryFieldURLSuffix = "?select=msgs,msgSpoolUsage,averageRxMsgRate"
24
+ solaceSempEndpointURLTemplate = "%s/%s/%s/monitor/msgVpns/%s/%ss/%s" + solaceSempQueryFieldURLSuffix
25
+
23
26
// SEMP REST API Context
24
27
solaceAPIName = "SEMP"
25
28
solaceAPIVersion = "v2"
26
29
solaceAPIObjectTypeQueue = "queue"
30
+
27
31
// Log Message Templates
28
32
solaceFoundMetaFalse = "required Field %s NOT FOUND in Solace Metadata"
33
+
29
34
// YAML Configuration Metadata Field Names
30
35
// Broker Identifiers
31
36
solaceMetaSempBaseURL = "solaceSempBaseURL"
37
+
32
38
// Credential Identifiers
33
39
solaceMetaUsername = "username"
34
40
solaceMetaPassword = "password"
35
41
solaceMetaUsernameFromEnv = "usernameFromEnv"
36
42
solaceMetaPasswordFromEnv = "passwordFromEnv"
43
+
37
44
// Target Object Identifiers
38
45
solaceMetaMsgVpn = "messageVpn"
39
46
solaceMetaQueueName = "queueName"
47
+
40
48
// Metric Targets
41
49
solaceMetaMsgCountTarget = "messageCountTarget"
42
50
solaceMetaMsgSpoolUsageTarget = "messageSpoolUsageTarget"
51
+ solaceMetaMsgRxRateTarget = "messageReceiveRateTarget"
52
+
43
53
// Metric Activation Targets
44
54
solaceMetaActivationMsgCountTarget = "activationMessageCountTarget"
45
55
solaceMetaActivationMsgSpoolUsageTarget = "activationMessageSpoolUsageTarget"
56
+ solaceMetaActivationMsgRxRateTarget = "activationMessageReceiveRateTarget"
57
+
46
58
// Trigger type identifiers
47
59
solaceTriggermsgcount = "msgcount"
48
60
solaceTriggermsgspoolusage = "msgspoolusage"
61
+ solaceTriggermsgrxrate = "msgrcvrate"
49
62
)
50
63
51
64
// Struct for Observed Metric Values
@@ -54,6 +67,8 @@ type SolaceMetricValues struct {
54
67
msgCount int
55
68
// Observed Message Spool Usage
56
69
msgSpoolUsage int
70
+ // Observed Message Received Rate
71
+ msgRcvRate int
57
72
}
58
73
59
74
type SolaceScaler struct {
@@ -67,19 +82,25 @@ type SolaceMetadata struct {
67
82
// Full SEMP URL to target queue (CONSTRUCTED IN CODE)
68
83
endpointURL string
69
84
solaceSempURL string
85
+
70
86
// Solace Message VPN
71
87
messageVpn string
72
88
queueName string
89
+
73
90
// Basic Auth Username
74
91
username string
75
92
// Basic Auth Password
76
93
password string
94
+
77
95
// Target Message Count
78
96
msgCountTarget int64
79
97
msgSpoolUsageTarget int64 // Spool Use Target in Megabytes
98
+ msgRxRateTarget int64 // Ingress Rate Target per consumer in msgs/second
99
+
80
100
// Activation Target Message Count
81
101
activationMsgCountTarget int
82
102
activationMsgSpoolUsageTarget int // Spool Use Target in Megabytes
103
+ activationMsgRxRateTarget int // Ingress Rate Target per consumer in msgs/second
83
104
// Scaler index
84
105
scalerIndex int
85
106
}
@@ -99,6 +120,7 @@ type solaceSEMPCollections struct {
99
120
// SEMP API Response Queue Data Struct
100
121
type solaceSEMPData struct {
101
122
MsgSpoolUsage int `json:"msgSpoolUsage"`
123
+ MsgRcvRate int `json:"averageRxMsgRate"`
102
124
}
103
125
104
126
// SEMP API Messages Struct
@@ -162,6 +184,7 @@ func parseSolaceMetadata(config *ScalerConfig) (*SolaceMetadata, error) {
162
184
163
185
// GET METRIC TARGET VALUES
164
186
// GET msgCountTarget
187
+ meta .msgCountTarget = 0
165
188
if val , ok := config .TriggerMetadata [solaceMetaMsgCountTarget ]; ok && val != "" {
166
189
if msgCount , err := strconv .ParseInt (val , 10 , 64 ); err == nil {
167
190
meta .msgCountTarget = msgCount
@@ -170,16 +193,26 @@ func parseSolaceMetadata(config *ScalerConfig) (*SolaceMetadata, error) {
170
193
}
171
194
}
172
195
// GET msgSpoolUsageTarget
196
+ meta .msgSpoolUsageTarget = 0
173
197
if val , ok := config .TriggerMetadata [solaceMetaMsgSpoolUsageTarget ]; ok && val != "" {
174
198
if msgSpoolUsage , err := strconv .ParseInt (val , 10 , 64 ); err == nil {
175
199
meta .msgSpoolUsageTarget = msgSpoolUsage * 1024 * 1024
176
200
} else {
177
201
return nil , fmt .Errorf ("can't parse [%s], not a valid integer: %w" , solaceMetaMsgSpoolUsageTarget , err )
178
202
}
179
203
}
204
+ // GET msgRcvRateTarget
205
+ meta .msgRxRateTarget = 0
206
+ if val , ok := config .TriggerMetadata [solaceMetaMsgRxRateTarget ]; ok && val != "" {
207
+ if msgRcvRate , err := strconv .ParseInt (val , 10 , 64 ); err == nil {
208
+ meta .msgRxRateTarget = msgRcvRate
209
+ } else {
210
+ return nil , fmt .Errorf ("can't parse [%s], not a valid integer: %w" , solaceMetaMsgRxRateTarget , err )
211
+ }
212
+ }
180
213
181
214
// Check that we have at least one positive target value for the scaler
182
- if meta .msgCountTarget < 1 && meta .msgSpoolUsageTarget < 1 {
215
+ if meta .msgCountTarget < 1 && meta .msgSpoolUsageTarget < 1 && meta . msgRxRateTarget < 1 {
183
216
return nil , fmt .Errorf ("no target value found in the scaler configuration" )
184
217
}
185
218
@@ -202,6 +235,14 @@ func parseSolaceMetadata(config *ScalerConfig) (*SolaceMetadata, error) {
202
235
return nil , fmt .Errorf ("can't parse [%s], not a valid integer: %w" , solaceMetaActivationMsgSpoolUsageTarget , err )
203
236
}
204
237
}
238
+ meta .activationMsgRxRateTarget = 0
239
+ if val , ok := config .TriggerMetadata [solaceMetaActivationMsgRxRateTarget ]; ok && val != "" {
240
+ if activationMsgRxRateTarget , err := strconv .Atoi (val ); err == nil {
241
+ meta .activationMsgRxRateTarget = activationMsgRxRateTarget
242
+ } else {
243
+ return nil , fmt .Errorf ("can't parse [%s], not a valid integer: %w" , solaceMetaActivationMsgRxRateTarget , err )
244
+ }
245
+ }
205
246
206
247
// Format Solace SEMP Queue Endpoint (REST URL)
207
248
meta .endpointURL = fmt .Sprintf (
@@ -296,6 +337,18 @@ func (s *SolaceScaler) GetMetricSpecForScaling(context.Context) []v2.MetricSpec
296
337
metricSpec := v2.MetricSpec {External : externalMetric , Type : solaceExtMetricType }
297
338
metricSpecList = append (metricSpecList , metricSpec )
298
339
}
340
+ // Message Receive Rate Target Spec
341
+ if s .metadata .msgRxRateTarget > 0 {
342
+ metricName := kedautil .NormalizeString (fmt .Sprintf ("solace-%s-%s" , s .metadata .queueName , solaceTriggermsgrxrate ))
343
+ externalMetric := & v2.ExternalMetricSource {
344
+ Metric : v2.MetricIdentifier {
345
+ Name : GenerateMetricNameWithIndex (s .metadata .scalerIndex , metricName ),
346
+ },
347
+ Target : GetMetricTarget (s .metricType , s .metadata .msgRxRateTarget ),
348
+ }
349
+ metricSpec := v2.MetricSpec {External : externalMetric , Type : solaceExtMetricType }
350
+ metricSpecList = append (metricSpecList , metricSpec )
351
+ }
299
352
return metricSpecList
300
353
}
301
354
@@ -341,6 +394,7 @@ func (s *SolaceScaler) getSolaceQueueMetricsFromSEMP(ctx context.Context) (Solac
341
394
// Set Return Values
342
395
metricValues .msgCount = sempResponse .Collections .Msgs .Count
343
396
metricValues .msgSpoolUsage = sempResponse .Data .MsgSpoolUsage
397
+ metricValues .msgRcvRate = sempResponse .Data .MsgRcvRate
344
398
return metricValues , nil
345
399
}
346
400
@@ -363,13 +417,19 @@ func (s *SolaceScaler) GetMetricsAndActivity(ctx context.Context, metricName str
363
417
metric = GenerateMetricInMili (metricName , float64 (metricValues .msgCount ))
364
418
case strings .HasSuffix (metricName , solaceTriggermsgspoolusage ):
365
419
metric = GenerateMetricInMili (metricName , float64 (metricValues .msgSpoolUsage ))
420
+ case strings .HasSuffix (metricName , solaceTriggermsgrxrate ):
421
+ metric = GenerateMetricInMili (metricName , float64 (metricValues .msgRcvRate ))
366
422
default :
367
423
// Should never end up here
368
424
err := fmt .Errorf ("unidentified metric: %s" , metricName )
369
425
s .logger .Error (err , "returning error to calling app" )
370
426
return []external_metrics.ExternalMetricValue {}, false , err
371
427
}
372
- return []external_metrics.ExternalMetricValue {metric }, (metricValues .msgCount > s .metadata .activationMsgCountTarget || metricValues .msgSpoolUsage > s .metadata .activationMsgSpoolUsageTarget ), nil
428
+ return []external_metrics.ExternalMetricValue {metric },
429
+ (metricValues .msgCount > s .metadata .activationMsgCountTarget ||
430
+ metricValues .msgSpoolUsage > s .metadata .activationMsgSpoolUsageTarget ||
431
+ metricValues .msgRcvRate > s .metadata .activationMsgRxRateTarget ),
432
+ nil
373
433
}
374
434
375
435
// Do Nothing - Satisfies Interface
0 commit comments