1
+ from datetime import datetime
2
+ import logging
3
+
4
+ from airflow import DAG
5
+ from airflow .contrib .operators .ecs_operator import ECSOperator
6
+ from airflow .models import Variable
7
+
8
+ logger = logging .getLogger (__name__ )
9
+
10
+ default_args = {
11
+ 'owner' : 'andresionek91' ,
12
+ 'start_date' : datetime (2020 , 5 , 20 ),
13
+ 'depends_on_past' : False ,
14
+ 'provide_context' : True
15
+ }
16
+
17
+ execution_date = '{{ ds }}' # Access execution date
18
+
19
+ with DAG ('crypto_extract' ,
20
+ description = 'DAG to extract Daily Summaries from MercadoBitcoin' ,
21
+ schedule_interval = '* 1 * * *' ,
22
+ catchup = True ,
23
+ default_args = default_args ) as dag :
24
+
25
+ t1 = ECSOperator (
26
+ task_id = "crypto_extract" ,
27
+ dag = dag ,
28
+ aws_conn_id = "aws_default" ,
29
+ cluster = "airflow-dev-ecs-cluster" ,
30
+ task_definition = "dev-crypto-extract-image" ,
31
+ launch_type = "FARGATE" ,
32
+ overrides = {
33
+ "containerOverrides" : [
34
+ {
35
+ "name" : "dev-crypto-extract-image" ,
36
+ "command" : ["python" , "main.py" , execution_date ],
37
+ "environment" : [
38
+ {
39
+ 'name' : 'AWS_ACCESS_KEY_ID' ,
40
+ 'value' : 'string'
41
+ }
42
+ ]
43
+ }
44
+ ],
45
+ },
46
+ network_configuration = {
47
+ "awsvpcConfiguration" : {
48
+ "securityGroups" : [Variable .get ("security-group" )],
49
+ "subnets" : [Variable .get ("subnet" )],
50
+ "assignPublicIp" : "ENABLED"
51
+ }
52
+ }
53
+ )
54
+
55
+ t1
0 commit comments