DAG: Site_Analytics ROOT: MongoDB_Processes

schedule: 30 * * * *


Site_Analytics

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# coding: utf-8


from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2017, 10, 10),
    'email': ['fodorsz@mapsolutions.hu'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 5,
    'retry_delay': timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

dag = DAG('Site_Analytics', default_args=default_args,schedule_interval="30 * * * *")

t1 = BashOperator(
    task_id='IP_filtering',
    bash_command='python /var/local/ot-analytics-aggregator/OnMongodbProc/IP_bot_fiter.py',
    dag=dag)

t2 = BashOperator(
    task_id='MongoDB_Processes',
    bash_command='python /var/local/ot-analytics-aggregator/OnMongodbProc/mongo_proc.py',
    dag=dag)

#t3 = BashOperator(
#    task_id='Sessions_mongoToPostgres',
#    bash_command='python /var/local/ot-analytics-aggregator/OT_aggregations/ot_sessions.py',
#    dag=dag)

t4 = BashOperator(
    task_id='Conversions_mongoToPostgres',
    bash_command='python /var/local/ot-analytics-aggregator/OT_aggregations/ot_conversion.py',
    dag=dag)

t5 = BashOperator(
    task_id='Property_mongoToPostgres',
    bash_command='python /var/local/ot-analytics-aggregator/OT_aggregations/ot_property_pageview.py',
    dag=dag)

t2.set_upstream(t1)
#t3.set_upstream(t2)
t4.set_upstream(t2)
t5.set_upstream(t2)