23
23
CLOUDWATCH = boto3 .client ("cloudwatch" )
24
24
METRICS = ADFMetrics (CLOUDWATCH , "PIPELINE_MANAGEMENT/RULE" )
25
25
26
- _CACHE = None
26
+ _CACHE_S3 = None
27
+ _CACHE_CODECOMMIT = None
27
28
28
29
29
30
def lambda_handler (event , _ ):
@@ -38,31 +39,52 @@ def lambda_handler(event, _):
38
39
event (dict): The ADF Pipeline Management State Machine execution
39
40
input object.
40
41
"""
41
-
42
42
# pylint: disable=W0603
43
43
# Global variable here to cache across lambda execution runtimes.
44
- global _CACHE
45
- if not _CACHE :
46
- _CACHE = Cache ()
44
+ global _CACHE_S3 , _CACHE_CODECOMMIT
45
+
46
+ if not _CACHE_S3 :
47
+ _CACHE_S3 = Cache ()
48
+ METRICS .put_metric_data (
49
+ {"MetricName" : "S3CacheInitialized" , "Value" : 1 , "Unit" : "Count" }
50
+ )
51
+
52
+ if not _CACHE_CODECOMMIT :
53
+ _CACHE_CODECOMMIT = Cache ()
47
54
METRICS .put_metric_data (
48
- {"MetricName" : "CacheInitialized " , "Value" : 1 , "Unit" : "Count" }
55
+ {"MetricName" : "CodeCommitCacheInitialized " , "Value" : 1 , "Unit" : "Count" }
49
56
)
50
57
51
58
LOGGER .info (event )
52
59
53
60
pipeline = event ['pipeline_definition' ]
54
61
55
- source_provider = (
56
- pipeline .get ("default_providers" , {})
57
- .get ("source" , {})
58
- .get ("provider" , "codecommit" )
59
- )
60
- source_account_id = (
61
- pipeline .get ("default_providers" , {})
62
- .get ("source" , {})
63
- .get ("properties" , {})
64
- .get ("account_id" )
65
- )
62
+ default_source_provider = pipeline .get ("default_providers" , {}).get ("source" , {})
63
+ source_provider = default_source_provider .get ("provider" , "codecommit" )
64
+ source_provider_properties = default_source_provider .get ("properties" , {})
65
+ source_account_id = source_provider_properties .get ("account_id" )
66
+ source_bucket_name = source_provider_properties .get ("bucket_name" )
67
+ if source_provider == "s3" :
68
+ if not source_account_id :
69
+ source_account_id = DEPLOYMENT_ACCOUNT_ID
70
+ pipeline ["default_providers" ]["source" ].setdefault ("properties" , {})["account_id" ] = source_account_id
71
+ if not source_bucket_name :
72
+ try :
73
+ parameter_store = ParameterStore (DEPLOYMENT_ACCOUNT_REGION , boto3 )
74
+ default_s3_source_bucket_name = parameter_store .fetch_parameter (
75
+ "/adf/scm/default-s3-source-bucket-name"
76
+ )
77
+ except ParameterNotFoundError :
78
+ default_s3_source_bucket_name = os .environ ["S3_BUCKET_NAME" ]
79
+ LOGGER .debug ("default_s3_source_bucket_name not found in SSM - Fall back to s3_bucket_name." )
80
+ pipeline ["default_providers" ]["source" ].setdefault ("properties" , {})["bucket_name" ] = default_s3_source_bucket_name
81
+ source_bucket_name = default_s3_source_bucket_name
82
+ event_params = {
83
+ "SourceS3BucketName" : source_bucket_name
84
+ }
85
+ else :
86
+ event_params = {}
87
+
66
88
67
89
# Resolve codecommit source_account_id in case it is not set
68
90
if source_provider == "codecommit" and not source_account_id :
@@ -98,25 +120,36 @@ def lambda_handler(event, _):
98
120
)
99
121
100
122
if (
101
- source_provider == "codecommit"
102
- and source_account_id
123
+ source_account_id
103
124
and int (source_account_id ) != int (DEPLOYMENT_ACCOUNT_ID )
104
- and not _CACHE .exists (source_account_id )
125
+ and (
126
+ (source_provider == "codecommit" and not _CACHE_CODECOMMIT .exists (source_account_id ))
127
+ or (source_provider == "s3" and not _CACHE_S3 .exists (source_account_id ))
128
+ )
105
129
):
106
130
LOGGER .info (
107
- "Source is CodeCommit and the repository is hosted in the %s "
131
+ "Source is %s and the repository/bucket is hosted in the %s "
108
132
"account instead of the deployment account (%s). Creating or "
109
133
"updating EventBridge forward rule to forward change events "
110
134
"from the source account to the deployment account in "
111
135
"EventBridge." ,
136
+ source_provider ,
112
137
source_account_id ,
113
138
DEPLOYMENT_ACCOUNT_ID ,
114
139
)
115
- rule = Rule (source_account_id )
140
+
141
+ rule = Rule (source_account_id , source_provider , event_params )
116
142
rule .create_update ()
117
- _CACHE .add (source_account_id , True )
118
- METRICS .put_metric_data (
119
- {"MetricName" : "CreateOrUpdate" , "Value" : 1 , "Unit" : "Count" }
120
- )
143
+
144
+ if source_provider == "codecommit" :
145
+ _CACHE_CODECOMMIT .add (source_account_id , True )
146
+ METRICS .put_metric_data (
147
+ {"MetricName" : "CodeCommitCreateOrUpdate" , "Value" : 1 , "Unit" : "Count" }
148
+ )
149
+ elif source_provider == "s3" :
150
+ _CACHE_S3 .add (source_account_id , True )
151
+ METRICS .put_metric_data (
152
+ {"MetricName" : "S3CreateOrUpdate" , "Value" : 1 , "Unit" : "Count" }
153
+ )
121
154
122
155
return event
0 commit comments