๐ฅ
[Airflow] Telegram ์๋ฆผ operator ๋ง๋ค๊ธฐ (requests ์ฌ์ฉ) ๋ณธ๋ฌธ
๋ฒ์ ์ ๋ณด
Airflow 2.5.3
Python 3.8
์์ธ์ง๋ ๋ชจ๋ฅด๊ฒ ์ง๋ง 2.5.3 ๋ฒ์ ์ ์ฌ์ฉํ๊ณ ์๋ ํ๊ฒฝ์์๋ telegram provider ์ค์น๋ ๋๋๋ฐ connection type ๋ฆฌ์คํธ์๋ ์๋ด๋ค ใ (๋ฒ์ ํธํ์ฑ๋ ๋ค ๋ง์ถค)
2.6.3 ๋ฒ์ ์ผ๋ก ์ฌ๋ฆฌ๊ณ provider ์ฌ์ค์น ํ ๋ค์ ํ์ธํด๋ณด๋ operator๊ฐ ์ ๋์ํ๋ ๊ฑด ํ์ธํ๋๋ฐ.. ์ด์จ๋ 2.5.3 ๋ฒ์ ์์ ์ฌ์ฉํ๊ธฐ ์ํด request ๋ผ์ด๋ธ๋ฌ๋ฆฌ๋ฅผ ์ด์ฉํด์ post ์์ฒญ์ ๋ณด๋ด๋ ๋ฐฉ์์ผ๋ก Webhook์ ๋ง๋ค์๋ค.
1. Telegram Connection ์์ฑ
์ด๊ฑด ๊ทธ๋ฅ hook์์ connection ์ ๋ณด ์ด๋ป๊ฒ ๊ฐ์ ธ์ฌ์ง ์ํฉ์ ๋ง๊ฒ ์ค์ ํ๋ฉด ๋ ๋ฏ ํ๋ค.
๋๊ฐ์ ๊ฒฝ์ฐ๋ ๊ทธ๋ฅ http connection type์ ์ฌ์ฉํ๋ค.
`Host`: https://api.telegram.org/ (ํ ๋ ๊ทธ๋จ api url ์ ๋ ฅ)
`Login`: chat_id ์ ๋ ฅ
`Password`: token๊ฐ ์ ๋ ฅ. ์ ์ง ํ ํฐ๊ฐ์ ์ํธํํด์ค์ผํ ๊ฒ ๊ฐ์์ ์ํธํ๋ ๊ฐ์ ์ ์ฅํ ์ ์๋ password ๋์ ๋ฃ์ด์คฌ๋ค.
2. TelegramhookHook
import requests
from airflow.providers.http.hook.http import HttpHook
class TelegramhookHook(HttpHook):
def __init__(
self,
http_conn_id = None,
message = '',
*args,
**kwargs,
):
super().__init__(http_conn_id=http_conn_id, *args, **kwargs)
self.conn = self.get_connection(http_conn_id)
self.telegram_url = self.conn.host
self.message = message
self.chat_id = self.conn.login
self.token = self.conn.password
def _build_telegram_data(self) -> str:
return {
"chat_id": self.chat_id,
"text": self.message
}
def _build_full_url(self) -> str:
if self.telegram_url[-1] == '/':
return self.telegram_url + 'bot' + self.token + '/sendMessage'
else:
return self.telegram_url + '/bot' + self.token + '/sendMessage'
def execute(self) -> None:
telegram_message = self._build_telegram_data()
url = self._build_full_url()
push = requests.post(
url = url,
data = telegram_message
)
HttpHook์ ์์๋ฐ์.
์ฌ๋ฌ๊ฐ์ง ๊ฐ ์ค์ ํ์ `execute`์์ `requests.post()`๋ก ์์ฒญํ๋ ๋ถ๋ถ์ด๋ค.
1์์ ์ค์ ํ๋ค์ํผ conn์์ host, login, password ๊ฐ์ ๊ฐ์ ธ์์ _build_full_url ์์ ์์ฒญ url์ ๋ง๋ ๋ค.
๊ทผ๋ฐ ํ ๋ ๊ทธ๋จ์ด๋ฉด ์ด์ฐจํผ host๊ฐ์ ํญ์ ๊ฐ์๊ฑฐ๋ผ์.. ๊ทธ๋ฅ ๊ท์ฐฎ๊ฒ ๋งค๋ฒ ์์ฑํ์ง ์๊ณ init์ ์ถ๊ฐํด์ค๋ ๋ ๊ฒ ๊ฐ๋ค.
3. TelegramWebhookOperator
from typing import Any, Dict, Optional
from airflow.providers.http.hooks.http import HttpHook
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from TelegramhookHook import TelegramhookHook
class TelegramWebhookOperator(SimpleHttpOperator):
@apply_defaults
def __init__(
self,
*,
http_conn_id: str,
message: str = '',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.http_conn_id = http_conn_id
self.message = message
self.hook: Optional[TelegramhookHook] = None
def execute(self, context: Dict[str, Any]) -> None:
self.hook = TelegramhookHook(
self.http_conn_id,
self.message
)
self.hook.execute()
SimpleHttpOperator ์ ์์๋ฐ์.
http_conn_id, message ๊ฐ์ ๋ฐ์์ TelegramhookHook์ ์ ํด์ฃผ๊ณ ์คํํ๋ค.
๊ทธ๋ฅ Task๋ก ํ ๋ ๊ทธ๋จ ๋ณด๋ด๋ ค๋ฉด ์ฌ๊ธฐ์ TelegramWebhookOperator ์ฌ์ฉํ๋ฉด ๋๋ค.
4. TelegramFailedNotification
ํ ๋ ๊ทธ๋จ์๋ฆผ์ ์ฃผ์ ์ฌ์ฉ์ฒ๋ task ์คํจ ์์ ์๋์ด์๋ค.
๊ทธ๋์ ์๋์ ๊ฐ์ด on_failure ํจ์๋ฅผ ๋ง๋ค์๋ค.
`message_content` ๋ถ๋ถ์ ํ์ํ task_instance ์ ๋ณด๋ฅผ ๋ฃ๊ณ
์ด์ฐจํผ ์คํจ์๋ฆผ์ ๋์ผํ ๋ด/๋์ผํ ์ฑํ ๋ฐฉ์ผ๋ก ๋ณด๋ผ๊ฑฐ๋ผ `telegram_conn`์ 1์์ ์ค์ ํ๋ ์ปค๋ฅ์ id ๊ทธ๋๋ก ๋ฃ์ด๋จ๋ค.
์ถ๊ฐ๋ก airflow ๋ด๋ถ์ ์ผ๋ก๋ utc๋ก ๊ณ์ฐ๋๊ธฐ ๋๋ฌธ์ execution_date๋ utc timezone์ผ๋ก ๋ฆฌํด๋์๋ค.
ํ๊ตญ์๊ฐ๊ณผ ๋ง์ถ๊ธฐ ์ํด 9์๊ฐ์ ๋ํด์คฌ๋ค.
from datetime import timedelta
from TelegramWebhookOperator import TelegramWebhookOperator
def on_failure(context):
task_instance = context.get('task_instance')
dag_id = task_instance.dag_id
task_id = task_instance.task_id
log_id = task_instance.run_id
kst_execute_date = (context.get('execution_date') + timedelta(hours=9)).strftime('%Y%m%d %H%M%S')
kst_next_execute_date = (context.get('next_execution_date') + timedelta(hours=9)).strftime('%Y%m%d %H%M%S')
message_content = f"
ํ์ํ๊ฑฐ ์์ฑ....
ex)
dag_id = {dag_id}
task_id = {task_id}
log_id = {log_id}
์คํ์๊ฐ = {kst_execute_date}
๋ค์์คํ์๊ฐ = {kst_next_execute_date}
"
telegram_conn = "telegram_alert_conn_id"
alert = TelegramWebhookOperator(
task_id = "AirflowFailedTelegramAlert",
http_conn_id = telegram_conn
message = message_content
)
return alert.execute(context=context)
์ฐธ๊ณ : https://dydwnsekd.tistory.com/69
'๋ฐ์ดํฐ' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Airflow] Nifi Trigger Operatorย ๋ง๋ค๊ธฐ (0) | 2024.04.04 |
---|---|
[Kafka] kafka produce ํ๊ธฐ (with C) (0) | 2023.08.31 |
Python์ ํตํด trino ์ ์ (0) | 2023.06.16 |
TRINO -> Hive metastore ์ฌ์ฉ ์ HIVE_METASTORE_ERROR ์ค๋ฅ ์กฐ์น (0) | 2023.04.21 |
parquet ํ์ผ ๊ตฌ์กฐ (0) | 2022.11.09 |