commit 53ab65186479b766ff85864147320f53781f458f Author: Cizz22 Date: Tue Dec 3 04:28:23 2024 +0000 init diff --git a/.env b/.env new file mode 100644 index 0000000..1e5948a --- /dev/null +++ b/.env @@ -0,0 +1 @@ +AIRFLOW_UID=1000 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..333c1e9 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +logs/ diff --git a/airflow.sh b/airflow.sh new file mode 100755 index 0000000..2e0721c --- /dev/null +++ b/airflow.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# Run airflow command in container +# + +PROJECT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +set -euo pipefail + +# check is there a docker-compose command, if not, use "docker compose" instead. +if [ -x "$(command -v docker-compose)" ]; then + dc=docker-compose +else + dc="docker compose" +fi + +export COMPOSE_FILE="${PROJECT_DIR}/docker-compose.yaml" +if [ $# -gt 0 ]; then + exec $dc run --rm airflow-cli "${@}" +else + exec $dc run --rm airflow-cli +fi diff --git a/dags/__pycache__/efficiency_auto_simulate.cpython-312.pyc b/dags/__pycache__/efficiency_auto_simulate.cpython-312.pyc new file mode 100644 index 0000000..d3fa2a8 Binary files /dev/null and b/dags/__pycache__/efficiency_auto_simulate.cpython-312.pyc differ diff --git a/dags/__pycache__/oh_wo_incremental_data.cpython-312.pyc b/dags/__pycache__/oh_wo_incremental_data.cpython-312.pyc new file mode 100644 index 0000000..dea07fe Binary files /dev/null and b/dags/__pycache__/oh_wo_incremental_data.cpython-312.pyc differ diff --git a/dags/__pycache__/oh_wo_staging.cpython-312.pyc b/dags/__pycache__/oh_wo_staging.cpython-312.pyc new file mode 100644 index 0000000..f87a343 Binary files /dev/null and b/dags/__pycache__/oh_wo_staging.cpython-312.pyc differ diff --git a/dags/efficiency_auto_simulate.py b/dags/efficiency_auto_simulate.py new file mode 100644 index 0000000..49ef887 --- /dev/null +++ b/dags/efficiency_auto_simulate.py @@ -0,0 +1,46 @@ +from airflow import DAG +from airflow.operators.http_operator import SimpleHttpOperator +from datetime import datetime, timedelta +from airflow.utils.dates import days_ago + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +with DAG( + 'daily_efficiency_api_calls', + default_args=default_args, + description='Efficiency API calls at specific times', + schedule_interval='30 0,8,15 * * *', # 07:30, 15:30, 22:30 GMT+7 + start_date=days_ago(1), + tags=['api', 'efficiency'], +) as dag: + + # Morning call at 07:30 GMT+7 (00:30 UTC) + morning_api_call = SimpleHttpOperator( + task_id='morning_api_call', + http_conn_id='efficiency_api', + endpoint='efficiency/data/auto', # No need to include the base URL + method='GET', + ) + + # Afternoon call at 15:30 GMT+7 (08:30 UTC) + afternoon_api_call = SimpleHttpOperator( + task_id='afternoon_api_call', + http_conn_id='efficiency_api', + endpoint='efficiency/data/auto', + method='GET', + ) + + # Night call at 22:30 GMT+7 (15:30 UTC) + night_api_call = SimpleHttpOperator( + task_id='night_api_call', + http_conn_id='efficiency_api', + endpoint='efficiency/data/auto', + method='GET', + ) diff --git a/dags/oh_wo_incremental_data.py b/dags/oh_wo_incremental_data.py new file mode 100644 index 0000000..943e8c3 --- /dev/null +++ b/dags/oh_wo_incremental_data.py @@ -0,0 +1,103 @@ +from datetime import datetime, timedelta +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.hooks.postgres_hook import PostgresHook +import uuid +import logging + +def get_last_processed_id(): + """Get the maximum source_id from target table""" + target_hook = PostgresHook(postgres_conn_id='target_postgres') + result = target_hook.get_first(""" + SELECT MAX(source_id) + FROM oh_wo_master + """) + return result[0] if result[0] else 0 + +def incremental_load(**context): + """Load incremental data by comparing IDs""" + source_hook = PostgresHook(postgres_conn_id='source_postgres') + target_hook = PostgresHook(postgres_conn_id='target_postgres') + + # Get last processed ID + last_id = get_last_processed_id() + logging.info(f"Last processed ID: {last_id}") + + # Fetch new records + records = source_hook.get_records(""" + SELECT id, assetnum, worktype, workgroup, total_cost_max, created_at, updated_at + FROM dl_wo_staging + WHERE id > %s + ORDER BY id + LIMIT 1000 + """, parameters=(last_id,)) + + if not records: + logging.info("No new records to process") + return + + # Transform and load records + target_hook.run(""" + CREATE TEMP TABLE temp_records ( + id UUID, + source_id BIGSERIAL, + worktype VARCHAR(5), + workgroup VARCHAR(30), + total_cost_max FLOAT4, + created_at TIMESTAMP, + updated_at TIMESTAMP + ) ON COMMIT DROP + """) + + # Prepare batch insert + insert_values = [] + for record in records: + old_id, assetnum, worktype, workgroup, total_cost_max, created_at, updated_at = record + new_uuid = uuid.uuid4() + insert_values.append( + f"('{new_uuid}', {old_id}, '{assetnum}', '{worktype}', '{workgroup}', '{total_cost_max}', '{created_at}', '{updated_at}')" + ) + + if insert_values: + # Bulk insert into temp table + target_hook.run(f""" + INSERT INTO temp_records (id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at) + VALUES {','.join(insert_values)} + """) + + # Upsert from temp table to target table + target_hook.run(""" + INSERT INTO oh_wo_master (id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at) + SELECT id, source_id, worktype, workgroup, total_cost_max, created_at, updated_at + FROM temp_records + ON CONFLICT (source_id) DO UPDATE + SET worktype = EXCLUDED.worktype, + workgroup = EXCLUDED.workgroup, + total_cost_max = EXCLUDED.total_cost_max, + updated_at = EXCLUDED.updated_at + """) + + processed_count = len(records) + logging.info(f"Processed {processed_count} records") + +# Create DAG +default_args = { + 'owner': 'airflow', + 'retries': 1, + 'retry_delay': timedelta(minutes=5), +} + +dag = DAG( + 'oh_wo_incremental_etl', + default_args=default_args, + description='Simple incremental ETL using target table as state', + schedule_interval='*/15 * * * *', # Run every 15 minutes + start_date=datetime(2024, 1, 1), + catchup=False +) + +load_task = PythonOperator( + task_id='incremental_load', + python_callable=incremental_load, + dag=dag +) diff --git a/dags/oh_wo_staging.py b/dags/oh_wo_staging.py new file mode 100644 index 0000000..d33137e --- /dev/null +++ b/dags/oh_wo_staging.py @@ -0,0 +1,161 @@ +from datetime import datetime +from airflow import DAG +from airflow.operators.python import PythonOperator +from airflow.hooks.postgres_hook import PostgresHook +import uuid +import logging +from typing import List, Tuple + +def get_source_hook(): + return PostgresHook(postgres_conn_id='source_postgres') + +def get_target_hook(): + return PostgresHook(postgres_conn_id='target_postgres') + +def extract_batch(start_id: int, batch_size: int) -> List[Tuple]: + """Extract a batch of records from source database""" + source_hook = get_source_hook() + + query = """ + SELECT id, assetnum, worktype, workgroup, total_cost_max, created_at, updated_at + FROM dl_wo_staging + WHERE id >= %s + AND worktype IN ('PM', 'CM', 'EM', 'PROACTIVE') + ORDER BY id + LIMIT %s + """ + + try: + return source_hook.get_records( + sql=query, + parameters=(start_id, batch_size) + ) + except Exception as e: + logging.error(f"Error extracting data: {str(e)}") + raise + +def get_max_id() -> int: + """Get the maximum ID from source table""" + source_hook = get_source_hook() + try: + result = source_hook.get_first( + "SELECT MAX(id) FROM dl_wo_staging WHERE worktype IN ('PM', 'CM', 'EM', 'PROACTIVE')" + ) + return result[0] if result and result[0] else 0 + except Exception as e: + logging.error(f"Error getting max ID: {str(e)}") + raise + +def transform_batch(batch_data: List[Tuple]) -> List[Tuple]: + """Transform batch of records""" + transformed = [] + for row in batch_data: + try: + # Create temporary table for batch processing + transformed.append(( + str(uuid.uuid4()), # id (UUID) + row[0], # source_id + row[1], # assetnum + row[2], # worktype + row[3], # workgroup + row[4], # total_cost_max + row[5], # created_at + row[6] # updated_at + )) + except Exception as e: + logging.error(f"Error transforming record {row[0]}: {str(e)}") + continue + return transformed + +def load_batch(transformed_data: List[Tuple]) -> None: + """Load transformed data into target database using temp table and upsert""" + if not transformed_data: + return + + target_hook = get_target_hook() + + try: + # Bulk insert into temp table + target_hook.insert_rows( + table='oh_wo_master', + rows=transformed_data, + target_fields=['id', 'source_id', 'assetnum', 'worktype', 'workgroup', 'total_cost_max', 'created_at', 'updated_at'] + ) + + except Exception as e: + logging.error(f"Error loading data: {str(e)}") + raise + +def process_batch(start_id: int, batch_size: int) -> int: + """Process a single batch of records""" + logging.info(f"Processing batch starting from ID {start_id}") + + try: + # Extract + batch_data = extract_batch(start_id, batch_size) + if not batch_data: + return 0 + last_processed_id = batch_data[-1][0] + + # Transform + transformed_data = transform_batch(batch_data) + + # Load + load_batch(transformed_data) + + return last_processed_id + except Exception as e: + logging.error(f"Error processing batch starting at ID {start_id}: {str(e)}") + raise + +def get_id_range() -> Tuple[int, int]: + """Get the minimum and maximum ID from source table""" + source_hook = get_source_hook() + try: + result = source_hook.get_first(""" + SELECT MIN(id), MAX(id) + FROM dl_wo_staging + WHERE worktype IN ('PM', 'CM', 'EM', 'PROACTIVE') + """) + return result[0] if result and result[0] else 0, result[1] if result and result[1] else 0 + except Exception as e: + logging.error(f"Error getting ID range: {str(e)}") + raise + +def etl_process(**context): + """Main ETL process with batch processing""" + batch_size = 1000 + min_id, max_id = get_id_range() + current_id = min_id + total_processed = 0 + + while current_id <= max_id: + try: + records_processed = process_batch(current_id, batch_size) + + if records_processed == 0: + break + + total_processed += 1000 + current_id = records_processed + 1 + logging.info(f"Processed {total_processed} records so far") + except Exception as e: + logging.error(f"Error in batch processing: {str(e)}") + raise + + logging.info(f"ETL complete. Total records processed: {total_processed}") + +# DAG definition +dag = DAG( + 'wo_staging_historical_data', + start_date=datetime(2024, 1, 1), + schedule_interval=None, # Manual trigger for historical data + catchup=False, + tags=['historical', 'work_order'] +) + +etl_task = PythonOperator( + task_id='etl_process', + python_callable=etl_process, + dag=dag +) diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..fb50223 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,290 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL. +# +# WARNING: This configuration is for local development. Do not use it in a production deployment. +# +# This configuration supports basic configuration using environment variables or an .env file +# The following variables are supported: +# +# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow. +# Default: apache/airflow:2.10.3 +# AIRFLOW_UID - User ID in Airflow containers +# Default: 50000 +# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed. +# Default: . +# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode +# +# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested). +# Default: airflow +# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested). +# Default: airflow +# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers. +# Use this option ONLY for quick checks. Installing requirements at container +# startup is done EVERY TIME the service is started. +# A better way is to build a custom image or extend the official image +# as described in https://airflow.apache.org/docs/docker-stack/build.html. +# Default: '' +# +# Feel free to modify this file to suit your needs. +--- +x-airflow-common: + &airflow-common + # In order to add custom dependencies or upgrade provider packages you can use your extended image. + # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml + # and uncomment the "build" line below, Then run `docker-compose build` to build the images. + image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.3} + # build: . + environment: + &airflow-common-env + AIRFLOW__CORE__EXECUTOR: CeleryExecutor + AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow + AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 + AIRFLOW__CORE__FERNET_KEY: '' + AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true' + AIRFLOW__CORE__LOAD_EXAMPLES: 'true' + AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session' + # AIRFLOW__WEBSERVER__BASE_URL: 'http://192.168.1.82:8000/airflow' + # AIRFLOW__WEBSERVER__ENABLE_PROXY_FIX: True + # yamllint disable rule:line-length + # Use simple http server on scheduler for health checks + # See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server + # yamllint enable rule:line-length + AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true' + # WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks + # for other purpose (development, test and especially production usage) build/extend Airflow image. + _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-} + # The following line can be used to set a custom config file, stored in the local config folder + # If you want to use it, outcomment it and replace airflow.cfg with the name of your config file + # AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg' + volumes: + - ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags + - ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs + - ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config + - ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins + user: "${AIRFLOW_UID:-50000}:0" + depends_on: + &airflow-common-depends-on + redis: + condition: service_healthy + postgres: + condition: service_healthy + +services: + postgres: + image: postgres:13 + environment: + POSTGRES_USER: airflow + POSTGRES_PASSWORD: airflow + POSTGRES_DB: airflow + volumes: + - postgres-db-volume:/var/lib/postgresql/data + healthcheck: + test: ["CMD", "pg_isready", "-U", "airflow"] + interval: 10s + retries: 5 + start_period: 5s + restart: always + + redis: + # Redis is limited to 7.2-bookworm due to licencing change + # https://redis.io/blog/redis-adopts-dual-source-available-licensing/ + image: redis:7.2-bookworm + expose: + - 6379 + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 30s + retries: 50 + start_period: 30s + restart: always + + airflow-webserver: + <<: *airflow-common + command: webserver + ports: + - "8080:8080" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8080/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-scheduler: + <<: *airflow-common + command: scheduler + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:8974/health"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-worker: + <<: *airflow-common + command: celery worker + healthcheck: + # yamllint disable rule:line-length + test: + - "CMD-SHELL" + - 'celery --app airflow.providers.celery.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}" || celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"' + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + environment: + <<: *airflow-common-env + # Required to handle warm shutdown of the celery workers properly + # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation + DUMB_INIT_SETSID: "0" + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-triggerer: + <<: *airflow-common + command: triggerer + healthcheck: + test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"'] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-init: + <<: *airflow-common + entrypoint: /bin/bash + # yamllint disable rule:line-length + command: + - -c + - | + if [[ -z "${AIRFLOW_UID}" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m" + echo "If you are on Linux, you SHOULD follow the instructions below to set " + echo "AIRFLOW_UID environment variable, otherwise files will be owned by root." + echo "For other operating systems you can get rid of the warning with manually created .env file:" + echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user" + echo + fi + one_meg=1048576 + mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg)) + cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat) + disk_available=$$(df / | tail -1 | awk '{print $$4}') + warning_resources="false" + if (( mem_available < 4000 )) ; then + echo + echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m" + echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))" + echo + warning_resources="true" + fi + if (( cpus_available < 2 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m" + echo "At least 2 CPUs recommended. You have $${cpus_available}" + echo + warning_resources="true" + fi + if (( disk_available < one_meg * 10 )); then + echo + echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m" + echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))" + echo + warning_resources="true" + fi + if [[ $${warning_resources} == "true" ]]; then + echo + echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m" + echo "Please follow the instructions to increase amount of resources available:" + echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin" + echo + fi + mkdir -p /sources/logs /sources/dags /sources/plugins + chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins} + exec /entrypoint airflow version + # yamllint enable rule:line-length + environment: + <<: *airflow-common-env + _AIRFLOW_DB_MIGRATE: 'true' + _AIRFLOW_WWW_USER_CREATE: 'true' + _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} + _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow} + _PIP_ADDITIONAL_REQUIREMENTS: '' + user: "0:0" + volumes: + - ${AIRFLOW_PROJ_DIR:-.}:/sources + + airflow-cli: + <<: *airflow-common + profiles: + - debug + environment: + <<: *airflow-common-env + CONNECTION_CHECK_MAX_COUNT: "0" + # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252 + command: + - bash + - -c + - airflow + + # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up + # or by explicitly targeted on the command line e.g. docker-compose up flower. + # See: https://docs.docker.com/compose/profiles/ + flower: + <<: *airflow-common + command: celery flower + profiles: + - flower + ports: + - "5555:5555" + healthcheck: + test: ["CMD", "curl", "--fail", "http://localhost:5555/"] + interval: 30s + timeout: 10s + retries: 5 + start_period: 30s + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + +volumes: + postgres-db-volume: