๐Ÿฅ

[Airflow] Telegram ์•Œ๋ฆผ operator ๋งŒ๋“ค๊ธฐ (requests ์‚ฌ์šฉ) ๋ณธ๋ฌธ

๋ฐ์ดํ„ฐ

[Airflow] Telegram ์•Œ๋ฆผ operator ๋งŒ๋“ค๊ธฐ (requests ์‚ฌ์šฉ)

•8• 2024. 3. 29. 21:49

๋ฒ„์ „์ •๋ณด

Airflow 2.5.3

Python 3.8

 

์™œ์ธ์ง€๋Š” ๋ชจ๋ฅด๊ฒ ์ง€๋งŒ 2.5.3 ๋ฒ„์ „์„ ์‚ฌ์šฉํ•˜๊ณ  ์žˆ๋Š” ํ™˜๊ฒฝ์—์„œ๋Š” telegram provider ์„ค์น˜๋Š” ๋˜๋Š”๋ฐ connection type ๋ฆฌ์ŠคํŠธ์—๋Š” ์•ˆ๋–ด๋‹ค ใ…  (๋ฒ„์ „ํ˜ธํ™˜์„ฑ๋„ ๋‹ค ๋งž์ถค)

2.6.3 ๋ฒ„์ „์œผ๋กœ ์˜ฌ๋ฆฌ๊ณ  provider ์žฌ์„ค์น˜ ํ›„ ๋‹ค์‹œ ํ™•์ธํ•ด๋ณด๋‹ˆ operator๊ฐ€ ์ž˜ ๋™์ž‘ํ•˜๋Š” ๊ฑด ํ™•์ธํ–ˆ๋Š”๋ฐ.. ์–ด์จŒ๋“  2.5.3 ๋ฒ„์ „์—์„œ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด request ๋ผ์ด๋ธŒ๋Ÿฌ๋ฆฌ๋ฅผ ์ด์šฉํ•ด์„œ post ์š”์ฒญ์„ ๋ณด๋‚ด๋Š” ๋ฐฉ์‹์œผ๋กœ Webhook์„ ๋งŒ๋“ค์—ˆ๋‹ค.

 

1. Telegram Connection ์ƒ์„ฑ

์ด๊ฑด ๊ทธ๋ƒฅ hook์—์„œ connection ์ •๋ณด ์–ด๋–ป๊ฒŒ ๊ฐ€์ ธ์˜ฌ์ง€ ์ƒํ™ฉ์— ๋งž๊ฒŒ ์„ค์ •ํ•˜๋ฉด ๋  ๋“ฏ ํ•˜๋‹ค.

๋‚˜๊ฐ™์€ ๊ฒฝ์šฐ๋Š” ๊ทธ๋ƒฅ http connection type์„ ์‚ฌ์šฉํ–ˆ๋‹ค.

airflow connection ์ถ”๊ฐ€

`Host`: https://api.telegram.org/ (ํ…”๋ ˆ๊ทธ๋žจ api url ์ž…๋ ฅ)

`Login`: chat_id ์ž…๋ ฅ

`Password`: token๊ฐ’ ์ž…๋ ฅ. ์™ ์ง€ ํ† ํฐ๊ฐ’์€ ์•”ํ˜ธํ™”ํ•ด์ค˜์•ผํ•  ๊ฒƒ ๊ฐ™์•„์„œ ์•”ํ˜ธํ™”๋œ ๊ฐ’์„ ์ €์žฅํ•  ์ˆ˜ ์žˆ๋Š” password ๋ž€์— ๋„ฃ์–ด์คฌ๋‹ค.

 

 

2. TelegramhookHook

import requests
from airflow.providers.http.hook.http import HttpHook

class TelegramhookHook(HttpHook):
	def __init__(
            self,
            http_conn_id = None,
            message = '',
            *args,
            **kwargs,
    	):
            super().__init__(http_conn_id=http_conn_id, *args, **kwargs)
            self.conn = self.get_connection(http_conn_id)
            self.telegram_url = self.conn.host
            self.message = message
            self.chat_id = self.conn.login
            self.token = self.conn.password

	def _build_telegram_data(self) -> str:
            return {
                        "chat_id": self.chat_id,
                        "text": self.message
            }

    def _build_full_url(self) -> str:
        if self.telegram_url[-1] == '/':
            return self.telegram_url + 'bot' + self.token + '/sendMessage'
        else:
            return self.telegram_url + '/bot' + self.token + '/sendMessage'


	def execute(self) -> None:
            telegram_message = self._build_telegram_data()
            url = self._build_full_url()

            push = requests.post(
                url = url,
                data = telegram_message
            )

 

HttpHook์„ ์ƒ์†๋ฐ›์Œ.

์—ฌ๋Ÿฌ๊ฐ€์ง€ ๊ฐ’ ์„ค์ • ํ›„์— `execute`์—์„œ `requests.post()`๋กœ ์š”์ฒญํ•˜๋Š” ๋ถ€๋ถ„์ด๋‹ค.

1์—์„œ ์„ค์ •ํ–ˆ๋‹ค์‹œํ”ผ conn์—์„œ host, login, password ๊ฐ’์„ ๊ฐ€์ ธ์™€์„œ _build_full_url ์—์„œ ์š”์ฒญ url์„ ๋งŒ๋“ ๋‹ค.

๊ทผ๋ฐ ํ…”๋ ˆ๊ทธ๋žจ์ด๋ฉด ์–ด์ฐจํ”ผ host๊ฐ’์€ ํ•ญ์ƒ ๊ฐ™์„๊ฑฐ๋ผ์„œ.. ๊ทธ๋ƒฅ ๊ท€์ฐฎ๊ฒŒ ๋งค๋ฒˆ ์ž‘์„ฑํ•˜์ง€ ์•Š๊ณ  init์— ์ถ”๊ฐ€ํ•ด์ค˜๋„ ๋  ๊ฒƒ ๊ฐ™๋‹ค.

 

3. TelegramWebhookOperator

from typing import Any, Dict, Optional
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from TelegramhookHook import TelegramhookHook

class TelegramWebhookOperator(SimpleHttpOperator):
	@apply_defaults
    def __init__(
    	self,
        *,
        http_conn_id: str,
        message: str = '',
        **kwargs,
    ) -> None:
    	super().__init__(**kwargs)
        self.http_conn_id = http_conn_id
        self.message = message
        self.hook: Optional[TelegramhookHook] = None

	def execute(self, context: Dict[str, Any]) -> None:
    	self.hook = TelegramhookHook(
        	self.http_conn_id,
            self.message
        )
        self.hook.execute()

SimpleHttpOperator ์„ ์ƒ์†๋ฐ›์Œ.

http_conn_id, message ๊ฐ’์„ ๋ฐ›์•„์„œ TelegramhookHook์— ์ „ํ•ด์ฃผ๊ณ  ์‹คํ–‰ํ•œ๋‹ค.

 

๊ทธ๋ƒฅ Task๋กœ ํ…”๋ ˆ๊ทธ๋žจ ๋ณด๋‚ด๋ ค๋ฉด ์—ฌ๊ธฐ์„œ TelegramWebhookOperator ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค.

 

4. TelegramFailedNotification

ํ…”๋ ˆ๊ทธ๋žจ์•Œ๋ฆผ์˜ ์ฃผ์š” ์‚ฌ์šฉ์ฒ˜๋Š” task ์‹คํŒจ ์‹œ์˜ ์•Œ๋žŒ์ด์—ˆ๋‹ค.

๊ทธ๋ž˜์„œ ์•„๋ž˜์™€ ๊ฐ™์ด on_failure ํ•จ์ˆ˜๋ฅผ ๋งŒ๋“ค์—ˆ๋‹ค.

`message_content` ๋ถ€๋ถ„์— ํ•„์š”ํ•œ task_instance ์ •๋ณด๋ฅผ ๋„ฃ๊ณ 

์–ด์ฐจํ”ผ ์‹คํŒจ์•Œ๋ฆผ์€ ๋™์ผํ•œ ๋ด‡/๋™์ผํ•œ ์ฑ„ํŒ…๋ฐฉ์œผ๋กœ ๋ณด๋‚ผ๊ฑฐ๋ผ `telegram_conn`์€ 1์—์„œ ์„ค์ •ํ–ˆ๋˜ ์ปค๋„ฅ์…˜id ๊ทธ๋Œ€๋กœ ๋„ฃ์–ด๋†จ๋‹ค.

์ถ”๊ฐ€๋กœ airflow ๋‚ด๋ถ€์ ์œผ๋กœ๋Š” utc๋กœ ๊ณ„์‚ฐ๋˜๊ธฐ ๋•Œ๋ฌธ์— execution_date๋„ utc timezone์œผ๋กœ ๋ฆฌํ„ด๋˜์—ˆ๋‹ค.

ํ•œ๊ตญ์‹œ๊ฐ„๊ณผ ๋งž์ถ”๊ธฐ ์œ„ํ•ด 9์‹œ๊ฐ„์„ ๋”ํ•ด์คฌ๋‹ค.

from datetime import timedelta
from TelegramWebhookOperator import TelegramWebhookOperator

def on_failure(context):
	task_instance = context.get('task_instance')
    dag_id = task_instance.dag_id
    task_id = task_instance.task_id
    log_id = task_instance.run_id
    
    kst_execute_date = (context.get('execution_date') + timedelta(hours=9)).strftime('%Y%m%d %H%M%S')
    kst_next_execute_date = (context.get('next_execution_date') + timedelta(hours=9)).strftime('%Y%m%d %H%M%S')
    
    message_content = f"
    	ํ•„์š”ํ•œ๊ฑฐ ์ž‘์„ฑ....
        ex)
		dag_id = {dag_id}
        task_id = {task_id}
        log_id = {log_id}
        ์‹คํ–‰์‹œ๊ฐ„ = {kst_execute_date}
        ๋‹ค์Œ์‹คํ–‰์‹œ๊ฐ„ = {kst_next_execute_date}
    "
    
    telegram_conn = "telegram_alert_conn_id"
    
    alert = TelegramWebhookOperator(
    	task_id = "AirflowFailedTelegramAlert",
        http_conn_id = telegram_conn
        message = message_content
    )
    
    return alert.execute(context=context)

 

 

์ฐธ๊ณ : https://dydwnsekd.tistory.com/69