1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125 | # coding: utf-8
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2017, 11, 17),
'email': ['fodorsz@mapsolutions.hu'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
dag = DAG('penzugyi_adatok_toltese', default_args=default_args,schedule_interval="0 5 * * *")
t1 = BashOperator(
task_id='AMA_szamlak',
bash_command='python /var/local/erp-data-loader/tajolo_admin/amango_szamlak.py',
dag=dag)
t2 = BashOperator(
task_id='MAPS_szamlak',
bash_command='python /var/local/erp-data-loader/dolphin/maps_szamlak.py',
dag=dag)
t3 = BashOperator(
task_id='szamla_union',
bash_command='PGPASSWORD=oe5Su4Aaghae psql worker -h 127.0.0.1 -d dwh -f /var/local/erp-data-loader/erp_data_union/szamlak_union.sql',
dag=dag)
t4 = BashOperator(
task_id='AMA_szerzodesek',
bash_command='python /var/local/erp-data-loader/tajolo_admin/amango_szerzodesek.py',
dag=dag)
t5 = BashOperator(
task_id='MAPS_szerzodesek',
bash_command='python /var/local/erp-data-loader/dolphin/maps_szerzodesek.py',
dag=dag)
t7 = BashOperator(
task_id='szerzodesek_union',
bash_command='PGPASSWORD=oe5Su4Aaghae psql worker -h 127.0.0.1 -d dwh -f /var/local/erp-data-loader/erp_data_union/szerzodesek_union.sql',
dag=dag)
t10 = BashOperator(
task_id='Szamla_chk',
bash_command='python /var/local/erp-data-loader/erp_other/szamlak_nincs_szerz_azon.py',
dag=dag)
t11 = BashOperator(
task_id='AMA_partnerek',
bash_command='python /var/local/erp-data-loader/tajolo_admin/amango_partnerek.py',
dag=dag)
t12 = BashOperator(
task_id='MAPS_partnerek',
bash_command='python /var/local/erp-data-loader/dolphin/maps_partnerek.py',
dag=dag)
t13 = BashOperator(
task_id='partnerek_union',
bash_command='PGPASSWORD=oe5Su4Aaghae psql worker -h 127.0.0.1 -d dwh -f /var/local/erp-data-loader/erp_data_union/partnerek_union.sql',
dag=dag)
t14 = BashOperator(
task_id='MAPS_szerz_cf',
bash_command='python /var/local/erp-data-loader/dolphin/maps_szerz_cf.py',
dag=dag)
t15 = BashOperator(
task_id='AMA_tetelek',
bash_command='python /var/local/erp-data-loader/tajolo_admin/amango_szerz_cf.py',
dag=dag)
t16 = BashOperator(
task_id='AMA_szerz_cf',
bash_command='PGPASSWORD=oe5Su4Aaghae psql worker -h 127.0.0.1 -d dwh -f /var/local/erp-data-loader/erp_data_union/szerzodes_tetel_eloiras_ama.sql',
dag=dag)
t17 = BashOperator(
task_id='Szerz_tetel_eloirasok',
bash_command='PGPASSWORD=oe5Su4Aaghae psql worker -h 127.0.0.1 -d dwh -f /var/local/erp-data-loader/erp_data_union/szerzodes_tetel_eloiras_union.sql',
dag=dag)
t18 = BashOperator(
task_id='Szerz_tetel_havi_eloirasok',
bash_command='PGPASSWORD=oe5Su4Aaghae psql worker -h 127.0.0.1 -d dwh -f /var/local/erp-data-loader/erp_data_union/szerzodes_tetel_havi_eloiras.sql',
dag=dag)
t19 = BashOperator(
task_id='Bevetel_fokonyv',
bash_command='PGPASSWORD=oe5Su4Aaghae psql worker -h 127.0.0.1 -d dwh -f /var/local/erp-data-loader/erp_data_union/fokonyv_bevetel_union.sql',
dag=dag)
t20 = BashOperator(
task_id='Dijbekerok_attolt',
bash_command='python //var/local/erp-data-loader/tajolo_admin/amango_dijbekerok.py',
dag=dag)
t21 = BashOperator(
task_id='Dijbekerok_update',
bash_command='PGPASSWORD=oe5Su4Aaghae psql worker -h 127.0.0.1 -d dwh -f /var/local/erp-data-loader/erp_data_union/dijbekerok_insert.sql',
dag=dag)
t3.set_upstream([t1,t2])
t7.set_upstream([t5,t4])
t10.set_upstream(t3)
t13.set_upstream([t3,t7,t11,t12])
t16.set_upstream(t15)
t17.set_upstream([t14,t16])
t18.set_upstream(t17)
t19.set_upstream([t3,t7,t13])
t21.set_upstream([t20])
|