From 53ab65186479b766ff85864147320f53781f458f Mon Sep 17 00:00:00 2001 From: Cizz22 Date: Tue, 3 Dec 2024 04:28:23 +0000 Subject: [PATCH] init --- .env | 1 + .gitignore | 1 + airflow.sh | 39 +++ .../efficiency_auto_simulate.cpython-312.pyc | Bin 0 -> 1168 bytes .../oh_wo_incremental_data.cpython-312.pyc | Bin 0 -> 3858 bytes .../__pycache__/oh_wo_staging.cpython-312.pyc | Bin 0 -> 6485 bytes dags/efficiency_auto_simulate.py | 46 +++ dags/oh_wo_incremental_data.py | 103 +++++++ dags/oh_wo_staging.py | 161 ++++++++++ docker-compose.yaml | 290 ++++++++++++++++++ 10 files changed, 641 insertions(+) create mode 100644 .env create mode 100644 .gitignore create mode 100755 airflow.sh create mode 100644 dags/__pycache__/efficiency_auto_simulate.cpython-312.pyc create mode 100644 dags/__pycache__/oh_wo_incremental_data.cpython-312.pyc create mode 100644 dags/__pycache__/oh_wo_staging.cpython-312.pyc create mode 100644 dags/efficiency_auto_simulate.py create mode 100644 dags/oh_wo_incremental_data.py create mode 100644 dags/oh_wo_staging.py create mode 100644 docker-compose.yaml diff --git a/.env b/.env new file mode 100644 index 0000000..1e5948a --- /dev/null +++ b/.env @@ -0,0 +1 @@ +AIRFLOW_UID=1000 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..333c1e9 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +logs/ diff --git a/airflow.sh b/airflow.sh new file mode 100755 index 0000000..2e0721c --- /dev/null +++ b/airflow.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# Run airflow command in container +# + +PROJECT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +set -euo pipefail + +# check is there a docker-compose command, if not, use "docker compose" instead. +if [ -x "$(command -v docker-compose)" ]; then + dc=docker-compose +else + dc="docker compose" +fi + +export COMPOSE_FILE="${PROJECT_DIR}/docker-compose.yaml" +if [ $# -gt 0 ]; then + exec $dc run --rm airflow-cli "${@}" +else + exec $dc run --rm airflow-cli +fi diff --git a/dags/__pycache__/efficiency_auto_simulate.cpython-312.pyc b/dags/__pycache__/efficiency_auto_simulate.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..d3fa2a8372855abdd8d03e38785f334a06ecd54a GIT binary patch literal 1168 zcmZWnJ8xV?6rQ=y-Ss}MU%QKO;1US-MvgZTvg86qvP1}J0tlL{gsw(Av+EJ}rI}fr zHBEY&bWl)0iR_jikeVh1#mz_-1(iFm0*RC=XV&X=q|A}dob#Q>oX7lCt5p!3-yh!_ ze&-_euTd^ec_uu60m9FSB8p>_;2y?Cwqh%>d-hbeV<&NXPU805sqVyH;`e-zUFuEA zw-9q_>9~A^&$H6XuU@YND5eXnM60w0vQPcFy-piL>o_>_&i!{#uM8eGU8GBNnXVkK z9y#ZhUS*0m-@+HBc3}7K$3`8%JrYdU=ZIiRjJ0nr&ZJ(L z>JdCn#(Ir{`#55Q0q=8`_Qw&)dDJIyEDt}vGe^U{ukM9YG$bmNIqP!>2u&06u<^;w z@aER;)~%iJx*=^t4qds-*>QE1vH=;zDk5SCB~T{&g6E27sa}-*eMUzyi+HM-_<_W_ zA{7xTGSkwIA`lTInMip$oROO2F z>St+cv=YoP&)`|@B~0ySR4aC)+UbL?i&3B7N@N9Ygx3B+R-k8+lggfk42^N zc>QVlz5i>zXPwo-Y2u;w^&+@htX?Y?H;Tr}e`UArJ#|(l0b1KCf{)%btbw5khUR7w kgl`&}-lO|3cOd6W1Z-m1D_MogpWthIUHeJb-P`qF18^){JOBUy literal 0 HcmV?d00001 diff --git a/dags/__pycache__/oh_wo_incremental_data.cpython-312.pyc b/dags/__pycache__/oh_wo_incremental_data.cpython-312.pyc new file mode 100644 index 0000000000000000000000000000000000000000..dea07fe26280a4512c78ade7a05005fb2d77c076 GIT binary patch literal 3858 zcmcH+OKcm*b(Xs%cloC%S^lQe#J0qkwdBUGYa=yEi<&x9il0*=B_BpOD?m! zj4XmIWT0qlpg8Wp)+$HHW$f4~J zeDikZ&6_uG-f#Zw^|}zW^A{3%j~Ai8kq@)EyU5;sK<*#`36zA2D1(YrhN8%~O|li~ z3~k`F#1!oryMZ&3qv*^y4csoV85U^m5|`lw$FEVwB{%`P1y?L@9 zPiH(ZN()}mBlrYA;7AAvoWSkSU7FzT(2k>nXS^_P7eb<2^zIIu6dB)!Jv{P)kc4eI zEAX1A6^f$H5h#d~#)m1Ljzwpn^DS&>t8yt-78PET6&Ud>$f}lCM0Hk{ug~W?8+9O& zfc*E;0PY~ID>VYm?ApvT)KLwsb!F`=>p+3FD5x4GFo$4Rv76Wu5|||vwpUNhh#J;b zMO@@>6pEE1R^^J46SD;YuPAa6YrK*t6#O+w)cu{WY`H7)=ae2W@oY{mm0(%*k_&g+ zQal--O5?fc)p3h!7+cfRi>Wy*uV&ZfY>`(rQ88u|8weV>e={^|t)o9ORDuQey5kB2UAzSHs^eByomq4)JRqK~tCi1u-t z(`^)_W?k+M zVGXTLYxWL?&|23t&2Q2ROpX2w-L*`FLN#iQ>!;}8g1ttsP`i#DqC{wQrpBlTzJs^Z zG%E^iur=G7>9!pf1_bBnnkt3zsBW(@G4%cw;P0q8egXGdch=cDS9jIjbx+-^K_KX~ zHJ^#~dh5)-ntd(U8#gh*Rr7s@1b58|&{JapdTX2kT?TX;&|^R^K;PY9PrBbG8C0_? z=f33}ej%_HGI#3rHM_|H_yzA;js|-E&-xfL=_#a*n0#uCIa>FBht5MaKkPKzztaXA z9;u#8%DjO4OdJyv$JaJ+PA-;trBKS_L`>EFUH3D6RUuJWAr?^$CBYA{R4QLBgMWn2>1u_ESVMNmOP^PKZM1_QO&OfANv={Qcu=N51}dN~=#npiAn z&2{4zA<+uRwY;2&MFs|~@P;oZW|rcMiD+_=z*HH2H@Y}A8(kbf7yc%Jp-GG4tw|u^ z$d-eu#nZ`DG<|k(c~9FootTR+rK58TgA~0x=uuc_2;QDvA0e~&SSVtzhw^08W%V}olHQ+?N7I{6kcA4fqi#u zX(^tz{1^WO$FEK$mt*nRY12SEM#FLByJL^95Ov($*qY;BvCKZw!5NclpAq)a_C~D} zRb2~aYQ|O8bm6euC<}~Ctve+-PvVBo6iO?y?j}jS>#{nnR7yHisT2g=PT*PH!I#To zNzj=!xlqz+Ni2l}x)W*^O}wG$?yi}V(n2Bm2)L8_0MXWuEoQntccIAAjUKu{raMaetJ&nQVqmZm(|_n!(A( z%$YXK9)99G@z8gojT~ca+k*}q+q%;BBLB%f`eXHri_b!% zcP@T%@zb|&u`PDw348P*d$i>XY#rQkHQs2fG_Gzh-8*>Cb^rMN8~4{9Fc02)@Xi-n zGkvYOlxybl%~hqDSDWg38=(zq$_CvHDn`1PZJKo68QVW;8|P{}QRwhXA7V$oVnZMM zZ~NQGc5L{o;n6#@pUgJie>8lm8H%(K^#iVLrw@7d5zTqw)ZB1~mLo$P=iNVmMh`c} zK9x38UA=8xY{<=Hr|+#cgXjOk{8%BH*PY#J^aZJfUXX$f@{_YzC{;934b!?^UN4CX zu`f+2fI+zlvXO-c0}mzYH>+1sld3;lDiq6-IFRz==_IbGB-QuV>X5E6BvwtVdfuEk za}K}R`4#>F_1ARRu6qP=g@*z<3-vwhEr@DPDU`K>T+)MTZdDX25PO6J;*c=aeQ#t9|0FL1|Hrez$E}WN1Px_1p6Er zx6$zv_U)of4t7bI0xEJZ>W2W@G(}N=MdQ!W=yUYeb9DA8az91BuNfy5{(9)A6m_9J zYeT-#mOt=i=;-#(9*4$WI+=iN)AQ1UoURsgq!m2Y3dC9mj(_PNZ7>afJNU@|M#~f2 g_d1=nP5R{!@*ZhZ?W!uC*!r<N}Go$?L&a{fsR#}*~qAi<{^G4z*r7oxIo)~ z_MoVvIobg@GduI|KeM~v|INq$!;%s=g7V_GJBOGGg#J!0?7~}}r8WwoD@Z~T6+>~< zk0{coV^o~(r;R=nW8!Q-YxLO|7kBhKj6N50_B+9ri}C$Dw2qi7?(TQTJ^db-cfN@F zy%G;~iR6-fk{f;=*)4l&VBtZBKUmtiZgRyeB7km=0#yYfA4x<%P1?;1eMlvmt8c9K^N~)S+%y%GhQngeg)eh0K zbu*kf)*l2rR;rgP<>2fF+p@oEoC|HdO-w?R&P$>uYtgt2edb{JFf_j7Jddb-4mj9wkxV?I@dQg8j}ai#mt)QhWwUlp}K-J_%3FVs9Ac( zy6QzCrbqXy$`~GyBcqmYSf|wPM+THcA`+Fb1Ma?xmWYr04Lg-lZKoK;gE8goPDvb6 zcPhh?vr0tO#GzBpT})OoeB)B+nOm}~jV)Ees9{}Z8|pkf0@mPxb@#KPHZUAfqe)qJ zU|Ac(i2)ky?Baxu@q%ZeQ~ZKwEQ>TPQEA47QuGM1TUAm_3ZHL{9PM(hx?2cVT0&0}AtSo@>Cie4*xU<$o}BsR^SXU|e+(K+T6I!lGP$%L59QRACS+al!m>0zguHR#fHWKiq;!phiao1iJ07z6yMS|l!>YZC^r3<#GZqShvijS{qze!=mflgGLR zDOLod!qn48I!<=LlEP!(5}MS)aJc7SVP&DSN7&kOyt}1MXgOfi9Y%fpjMZdS2gV8(Wb?%V$c>lN zF7$*P2DEh#0ipF0byvr^0eMu5Dhb^gQ-(-T=$uSEfM*SdI2?ozhpV8{Jw;KXbI*;* zcwDDsl~9NA&Q0)^SSv@24~DnEEQuO*5UL6EY1!uMWsMV0`QWSQlD9PYm%-OwOI7`+E6Z#v3Ng*`W2^0D}y&_Du9F{df8EUc}&jP=<2vF6q-lh6&=qj}h&A7sK%-ap&ATuB2plkClp`d}n;RQDk!X{6# z)C{9n&=3N!po8dpT?nOVfEG1UYz>B_Xygu~Xx5R8&Y zlC>rh;m|Oh5ICA#C)}qA2|9I$I65jPBm;UnJ2nPKC`amtR$(*5;%(qk_Y|DTlFne| ztO-!EFrhu`$SxRJy!Qx9?}DHDYp5pBt+I-VCqD7lTwwCF&sBNLTXn;`@tSwzVrlu$ z4qiF>^3hkiu9t4km6pF0e%bN$n|#0}_`X1{5`W%_N^A21@>XTp;Jv$WRRidDRW5qV zFLz(;&ereFsB^pCJd+uod-hM|@16K#@I5|T+qvL9y6CMS#ObQMuM(VJ47r9LQ@1&j1^>tFs2e< zX>xoNrhr=oF(H9gP-U$3=w4G}#zhql1d3 z-OF-BcQ4D45TkR((mD%}RP_=|FAhfKn4~5v%s7a_Baxkw&($t~0w79K_e2w9rx9}F zD$pBI5n^z=VOOj{8)0M>D--V)+>gTSAz~pyHG%%-smt-c<-^|1`EK#$H~3B0_)WPo zAs5(uhemZ_8c^#JzCbKF;k|24__o>JOkn1z=_A?T!wdZGB0?r7{N-RX`W0Z6`AtJO zz|03Y=&lgHM8p%IE&U6s;uC(PSRTV1DH4$L%9GZGmMNG>U|asC=@bp%hT5c9lW>6~ z3lcb0fE@^9vJ0au3ttMQ7=vbw|EXjWXtp)YEr-jxG|Q45N}NXIlu=UL`UDG7+L7kY zb4A4JOm04ol>u2*1yLa21KIjbvBv+Zr=h!m@p(JxG^DEkOP3yg61?CI1qzSidYK)`G5)__%wN# z&_3Z|n~6&-I}4b{VEPpN)So~F`jhqisO$NzZ27~P;Y@!v@c057&aCh4(P? zdpPJ1z_7 zwxw(|VMtlRtY0c?FS*UnZ`QWP?d%~Mw@t9c?h2!Il<$i9Z!CigcN_76x5HOTR<9Jr zBoxHNkPIROH^V&M3e`$N^%9vIl_G{6J^}Lzv3UWiqJK`bt4QeP5xdp9gxInYkh<;_ zQuoK>8F79vd*bQrGp9Z}(LWb_k9wyj>j&*CW`7`>owcxi;O+9viMN6oJ{x>wfp0GY zERlVI3v#T_E8!on}&Wnw97@ zmts@QX~+TLeL%c~wv7U#%f#zQIVActyf;iGLMcw7k|p4SgHa|4)49$Aao zR+zAU8h+|KP=NvuR8AaStla#{_n+^XICSAqE>JabG#9LzICjfjG3ofoy)jRFTxE;G z_UVzi*6Tt?*1G|?eJcEszbWUhxaF;z)UJ6OFC5F&v`(M6RKyvxjm z|KP}+%ho==;0@=z0Yj6Q-B*osA`sqzW*ecJ-{IQ75l)DP{WZ+{H5~Ny+U0y^WEhgH z3XTpyK8r}c=x1z+J- z<;eq6<&#HVR+bQ4-l?yodZ^DIKu&j#6+ZS?O|p~XRN%V5F&C@_kmcOLNpIHGbl2l> c(i6<*rN|4zHMxpSIo@;E$$IJMc_T6XAL7a!82|tP literal 0 HcmV?d00001 diff --git a/dags/efficiency_auto_simulate.py b/dags/efficiency_auto_simulate.py new file mode 100644 index 0000000..49ef887 --- /dev/null +++ b/dags/efficiency_auto_simulate.py @@ -0,0 +1,46 @@ +from airflow import DAG +from airflow.operators.http_operator import SimpleHttpOperator +from datetime import datetime, timedelta +from airflow.utils.dates import days_ago + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +with DAG( + 'daily_efficiency_api_calls', + default_args=default_args, + description='Efficiency API calls at specific times', + schedule_interval='30 0,8,15 * * *', # 07:30, 15:30, 22:30 GMT+7 + start_date=days_ago(1), + tags=['api', 'efficiency'], +) as dag: + + # Morning call at 07:30 GMT+7 (00:30 UTC) + morning_api_call = SimpleHttpOperator( + task_id='morning_api_call', + http_conn_id='efficiency_api', + endpoint='efficiency/data/auto', # No need to include the base URL + method='GET', + ) + + # Afternoon call at 15:30 GMT+7 (08:30 UTC) + afternoon_api_call = SimpleHttpOperator( + task_id='afternoon_api_call', + http_conn_id='efficiency_api', + endpoint='efficiency/data/auto', + method='GET', + ) + + # Night call at 22:30 GMT+7 (15:30 UTC) + night_api_call = SimpleHttpOperator( + task_id='night_api_call', + http_conn_id='efficiency_api', + endpoint='efficiency/data/auto', + method='GET', + ) diff --git a/dags/oh_wo_incremental_data.py b/dags/oh_wo_incremental_data.py new file mode 100644 index 0000000..943e8c3 --- /dev/null +++ b/dags/oh_wo_incremental_data.py @@ -0,0 +1,103 @@ +from datetime import datetime, timedelta +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.hooks.postgres_hook import PostgresHook +import uuid +import logging + +def get_last_processed_id(): + """Get the maximum source_id from target table""" + target_hook = PostgresHook(postgres_conn_id='target_postgres') + result = target_hook.get_first(""" + SELECT MAX(source_id) + FROM oh_wo_master + """) + return result[0] if result[0] else 0 + +def incremental_load(**context): + """Load incremental data by comparing IDs""" + source_hook = PostgresHook(postgres_conn_id='source_postgres') + target_hook = PostgresHook(postgres_conn_id='target_postgres') + + # Get last processed ID + last_id = get_last_processed_id() + logging.info(f"Last processed ID: {last_id}") + + # Fetch new records + records = source_hook.get_records(""" + SELECT id, assetnum, worktype, workgroup, total_cost_max, created_at, updated_at + FROM dl_wo_staging + WHERE id > %s + ORDER BY id + LIMIT 1000 + """, parameters=(last_id,)) + + if not records: + logging.info("No new records to process") + return + + # Transform and load records + target_hook.run(""" + CREATE TEMP TABLE temp_records ( + id UUID, + source_id BIGSERIAL, + worktype VARCHAR(5), + workgroup VARCHAR(30), + total_cost_max FLOAT4, + created_at TIMESTAMP, + updated_at TIMESTAMP + ) ON COMMIT DROP + """) + + # Prepare batch insert + insert_values = [] + for record in records: + old_id, assetnum, worktype, workgroup, total_cost_max, created_at, updated_at = record + new_uuid = uuid.uuid4() + insert_values.append( + f"('{new_uuid}', {old_id}, '{assetnum}', '{worktype}', '{workgroup}', '{total_cost_max}', '{created_at}', '{updated_at}')" + ) + + if insert_values: + # Bulk insert into temp table + target_hook.run(f""" + INSERT INTO temp_records (id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at) + VALUES {','.join(insert_values)} + """) + + # Upsert from temp table to target table + target_hook.run(""" + INSERT INTO oh_wo_master (id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at) + SELECT id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at + FROM temp_records + ON CONFLICT (source_id) DO UPDATE + SET worktype = EXCLUDED.worktype, + workgroup = EXCLUDED.workgroup, + total_cost_max = EXCLUDED.total_cost_max, + updated_at = EXCLUDED.updated_at + """) + + processed_count = len(records) + logging.info(f"Processed {processed_count} records") + +# Create DAG +default_args = { + 'owner': 'airflow', + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +dag = DAG( + 'oh_wo_incremental_etl', + default_args=default_args, + description='Simple incremental ETL using target table as state', + schedule_interval='*/15 * * * *', # Run every 15 minutes + start_date=datetime(2024, 1, 1), + catchup=False +) + +load_task = PythonOperator( + task_id='incremental_load', + python_callable=incremental_load, + dag=dag +) diff --git a/dags/oh_wo_staging.py b/dags/oh_wo_staging.py new file mode 100644 index 0000000..d33137e --- /dev/null +++ b/dags/oh_wo_staging.py @@ -0,0 +1,161 @@ +from datetime import datetime +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.hooks.postgres_hook import PostgresHook +import uuid +import logging +from typing import List, Tuple + +def get_source_hook(): + return PostgresHook(postgres_conn_id='source_postgres') + +def get_target_hook(): + return PostgresHook(postgres_conn_id='target_postgres') + +def extract_batch(start_id: int, batch_size: int) -> List[Tuple]: + """Extract a batch of records from source database""" + source_hook = get_source_hook() + + query = """ + SELECT id, assetnum, worktype, workgroup, total_cost_max, created_at, updated_at + FROM dl_wo_staging + WHERE id >= %s + AND worktype IN ('PM', 'CM', 'EM', 'PROACTIVE') + ORDER BY id + LIMIT %s + """ + + try: + return source_hook.get_records( + sql=query, + parameters=(start_id, batch_size) + ) + except Exception as e: + logging.error(f"Error extracting data: {str(e)}") + raise + +def get_max_id() -> int: + """Get the maximum ID from source table""" + source_hook = get_source_hook() + try: + result = source_hook.get_first( + "SELECT MAX(id) FROM dl_wo_staging WHERE worktype IN ('PM', 'CM', 'EM', 'PROACTIVE')" + ) + return result[0] if result and result[0] else 0 + except Exception as e: + logging.error(f"Error getting max ID: {str(e)}") + raise + +def transform_batch(batch_data: List[Tuple]) -> List[Tuple]: + """Transform batch of records""" + transformed = [] + for row in batch_data: + try: + # Create temporary table for batch processing + transformed.append(( + str(uuid.uuid4()), # id (UUID) + row[0], # source_id + row[1], # assetnum + row[2], # worktype + row[3], # workgroup + row[4], # total_cost_max + row[5], # created_at + row[6] # updated_at + )) + except Exception as e: + logging.error(f"Error transforming record {row[0]}: {str(e)}") + continue + return transformed + +def load_batch(transformed_data: List[Tuple]) -> None: + """Load transformed data into target database using temp table and upsert""" + if not transformed_data: + return + + target_hook = get_target_hook() + + try: + # Bulk insert into temp table + target_hook.insert_rows( + table='oh_wo_master', + rows=transformed_data, + target_fields=['id', 'source_id', 'assetnum', 'worktype', 'workgroup', 'total_cost_max', 'created_at', 'updated_at'] + ) + + except Exception as e: + logging.error(f"Error loading data: {str(e)}") + raise + +def process_batch(start_id: int, batch_size: int) -> int: + """Process a single batch of records""" + logging.info(f"Processing batch starting from ID {start_id}") + + try: + # Extract + batch_data = extract_batch(start_id, batch_size) + if not batch_data: + return 0 + last_processed_id = batch_data[-1][0] + + # Transform + transformed_data = transform_batch(batch_data) + + # Load + load_batch(transformed_data) + + return last_processed_id + except Exception as e: + logging.error(f"Error processing batch starting at ID {start_id}: {str(e)}") + raise + +def get_id_range() -> Tuple[int, int]: + """Get the minimum and maximum ID from source table""" + source_hook = get_source_hook() + try: + result = source_hook.get_first(""" + SELECT MIN(id), MAX(id) + FROM dl_wo_staging + WHERE worktype IN ('PM', 'CM', 'EM', 'PROACTIVE') + """) + return result[0] if result and result[0] else 0, result[1] if result and result[1] else 0 + except Exception as e: + logging.error(f"Error getting ID range: {str(e)}") + raise + +def etl_process(**context): + """Main ETL process with batch processing""" + batch_size = 1000 + min_id, max_id = get_id_range() + current_id = min_id + total_processed = 0 + + while current_id <= max_id: + try: + records_processed = process_batch(current_id, batch_size) + + if records_processed == 0: + break + + total_processed += 1000 + current_id = records_processed + 1 + logging.info(f"Processed {total_processed} records so far") + except Exception as e: + logging.error(f"Error in batch processing: {str(e)}") + raise + + logging.info(f"ETL complete. Total records processed: {total_processed}") + +# DAG definition +dag = DAG( + 'wo_staging_historical_data', + start_date=datetime(2024, 1, 1), + schedule_interval=None, # Manual trigger for historical data + catchup=False, + tags=['historical', 'work_order'] +) + +etl_task = PythonOperator( + task_id='etl_process', + python_callable=etl_process, + dag=dag +) diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..fb50223 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,290 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. +# +# WARNING: This configuration is for local development. Do not use it in a production deployment. +# +# This configuration supports basic configuration using environment variables or an .env file +# The following variables are supported: +# +# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. +# Default: apache/airflow:2.10.3 +# AIRFLOW_UID - User ID in Airflow containers +# Default: 50000 +# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed. +# Default: . +# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode +# +# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). +# Default: airflow +# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested). +# Default: airflow +# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers. +# Use this option ONLY for quick checks. Installing requirements at container +# startup is done EVERY TIME the service is started. +# A better way is to build a custom image or extend the official image +# as described in https://airflow.apache.org/docs/docker-stack/build.html. +# Default: '' +# +# Feel free to modify this file to suit your needs. +--- +x-airflow-common: + &airflow-common + # In order to add custom dependencies or upgrade provider packages you can use your extended image. + # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml + # and uncomment the "build" line below, Then run `docker-compose build` to build the images. + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.3} + # build: . + environment: + &airflow-common-env + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 + AIRFLOW__CORE__FERNET_KEY: '' + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' + AIRFLOW__CORE__LOAD_EXAMPLES: 'true' + AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' + # AIRFLOW__WEBSERVER__BASE_URL: 'http://192.168.1.82:8000/airflow' + # AIRFLOW__WEBSERVER__ENABLE_PROXY_FIX: True + # yamllint disable rule:line-length + # Use simple http server on scheduler for health checks + # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server + # yamllint enable rule:line-length + AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' + # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks + # for other purpose (development, test and especially production usage) build/extend Airflow image. + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} + # The following line can be used to set a custom config file, stored in the local config folder + # If you want to use it, outcomment it and replace airflow.cfg with the name of your config file + # AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg' + volumes: + - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags + - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs + - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config + - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins + user: "${AIRFLOW_UID:-50000}:0" + depends_on: + &airflow-common-depends-on + redis: + condition: service_healthy + postgres: + condition: service_healthy + +services: + postgres: + image: postgres:13 + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + volumes: + - postgres-db-volume:/var/lib/postgresql/data + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 10s + retries: 5 + start_period: 5s + restart: always + + redis: + # Redis is limited to 7.2-bookworm due to licencing change + # https://redis.io/blog/redis-adopts-dual-source-available-licensing/ + image: redis:7.2-bookworm + expose: + - 6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 30s + retries: 50 + start_period: 30s + restart: always + + airflow-webserver: + <<: *airflow-common + command: webserver + ports: + - "8080:8080" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-scheduler: + <<: *airflow-common + command: scheduler + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-worker: + <<: *airflow-common + command: celery worker + healthcheck: + # yamllint disable rule:line-length + test: + - "CMD-SHELL" + - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + environment: + <<: *airflow-common-env + # Required to handle warm shutdown of the celery workers properly + # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation + DUMB_INIT_SETSID: "0" + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-triggerer: + <<: *airflow-common + command: triggerer + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-init: + <<: *airflow-common + entrypoint: /bin/bash + # yamllint disable rule:line-length + command: + - -c + - | + if [[ -z "${AIRFLOW_UID}" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" + echo "If you are on Linux, you SHOULD follow the instructions below to set " + echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." + echo "For other operating systems you can get rid of the warning with manually created .env file:" + echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" + echo + fi + one_meg=1048576 + mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) + cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) + disk_available=$$(df / | tail -1 | awk '{print $$4}') + warning_resources="false" + if (( mem_available < 4000 )) ; then + echo + echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" + echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" + echo + warning_resources="true" + fi + if (( cpus_available < 2 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" + echo "At least 2 CPUs recommended. You have $${cpus_available}" + echo + warning_resources="true" + fi + if (( disk_available < one_meg * 10 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" + echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" + echo + warning_resources="true" + fi + if [[ $${warning_resources} == "true" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" + echo "Please follow the instructions to increase amount of resources available:" + echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" + echo + fi + mkdir -p /sources/logs /sources/dags /sources/plugins + chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} + exec /entrypoint airflow version + # yamllint enable rule:line-length + environment: + <<: *airflow-common-env + _AIRFLOW_DB_MIGRATE: 'true' + _AIRFLOW_WWW_USER_CREATE: 'true' + _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} + _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + _PIP_ADDITIONAL_REQUIREMENTS: '' + user: "0:0" + volumes: + - ${AIRFLOW_PROJ_DIR:-.}:/sources + + airflow-cli: + <<: *airflow-common + profiles: + - debug + environment: + <<: *airflow-common-env + CONNECTION_CHECK_MAX_COUNT: "0" + # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 + command: + - bash + - -c + - airflow + + # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up + # or by explicitly targeted on the command line e.g. docker-compose up flower. + # See: https://docs.docker.com/compose/profiles/ + flower: + <<: *airflow-common + command: celery flower + profiles: + - flower + ports: + - "5555:5555" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:5555/"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + +volumes: + postgres-db-volume: