๐ฅ
[Airflow] Nifi Trigger Operator ๋ง๋ค๊ธฐ ๋ณธ๋ฌธ
airflow์์ nifi processor๋ก trigger๋ฅผ ๋ณด๋ด์ผ ํ๋ค.
์ฌ์ค nifi์์ restAPI๋ฅผ ์ง์ํด์ฃผ๊ธฐ ๋๋ฌธ์ NIFI ETL ์์ฒด๋ฅผ ์์ํ๋ ๊ฒ์ ํฐ ๋ฌธ์ ๊ฐ ์๋์ง๋ง nifi ์์ ์ดํ์ airflow์์ ํ์ task๊ฐ ์๋ค๋ฉด ETL์ด ์ด๋ ์์ ์ ๋๋๋ ์ง๋ฅผ ํ์ธํ๋ ๊ฒ ์ด๋ ค์ ๋ค.
NIFI๋ processor ๋จ์๋ก ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๋ฉฐ, flowfile์ด processor์ ์ธํ๋๋ฉด ๋ฐ๋ก ์ฒ๋ฆฌ ํ output stream์ผ๋ก ๋ด๋ณด๋ด๊ธฐ ๋๋ฌธ์ ์ข ๋ฃ ์์ ์ ๋ช ํํ ์ ์๊ฐ ์์๋ค.
์๋ ๋ธ๋ก๊ทธ์์ airflow์์ nifi ๋ฅผ ํธ๋ฆฌ๊ฑฐํ ์ ์๋ ๋ฐฉ๋ฒ์ ์๊ฐํ์ง๋ง ์์ ๋ ธ๋๋ฅผ `GenerateFlowFile`๋ก ์ค์ ํด์ผ ํ๋ ๋ฑ ์ ์ฝ์กฐ๊ฑด์ด ์์๋ค.
https://towardsdatascience.com/interconnecting-airflow-with-a-nifi-etl-pipeline-8abea0667b8a
์๋ฅผ ๋ค๋ฉด ListSFTP ๋ก ์์ํ๋ ์์ ์ด ์๋ค๋ฉด, ListSFTP๋ incoming relationship์ ํ์ฉํ์ง ์์ผ๋ฏ๋ก ๋ธ๋ก๊ทธ์์ ์๊ฐํ๋ ๋ฐฉ๋ฒ์ ์ฌ์ฉํ ์ ์์๋ค.
Input requirement:
This component does not allow an incoming relationship.
์ถ์ฒ: ๋ํ๋จผํธ
๊ทธ๋ฆฌ๊ณ ์์์์๋ Nifi cluster๊ฐ single node๋ฅผ ๊ฐ์ ํ ๊ฒ์ด๋ผ ์ฌ๋ฌ๊ฐ์ ๋ ธ๋์ผ ๊ฒฝ์ฐ failover๋ฅผ ์ถ๊ฐ๋ก ๊ณ ๋ คํด์ผ ํ๊ณ , flowfile์ด ํ ๊ฐ์ธ ๊ฒฝ์ฐ์๋ง ์ฌ์ฉํ ์ ์์๋ค ใ
๊ทธ๋๋ ๋ฐฉ๋ฒ์ ์ดํด๊ฐ ๊ฐ์ ์กฐ๊ธ ๋ ๋ฒ์ฉ์ ์ผ๋ก ์ธ ์ ์๋๋ก ๋ธ๋ก๊ทธ ์ฝ๋๋ฅผ ์ฐธ๊ณ ํด์ ์ด์ง ๋ฐ๊ฟจ๋ค.
1. ์ปจ์
2. Airflow Connection ์ค์
HttpHook์ ์์ํด์ ์ฌ์ฉํ ๊ฒ์ด๊ธฐ ๋๋ฌธ์ Connection Type์ HTTP๋ก ์ค์ ํด์ฃผ์๋ค.
`Connection Id`: nifi1_api
`Connection Type`: HTTP
`Host`: http://x.x.x.x:8080
์ด๋ฐ์์ผ๋ก ๋ง๋ค์ด์ค๋ค.
๋ ธ๋๊ฐ ์ฌ๋ฌ๊ฐ๋ผ๋ฉด ์ฌ๋ฌ ๊ฐ์ ์ปค๋ฅ์ ์ ๋ง๋ ๋ค.
3. Nifi ETL ์ค์
ํ์์กฐ๊ฑด
๋ง์ง๋ง End Node๋ก `UpdateAttribute`๋ฅผ ์ฌ์ฉํด์ผ ํ๋ค.
clusterNode์ ์ํ๋ณํ๋ฅผ ํ์ธํ๊ธฐ ์ํจ์ด๋ค.
์์๋ก ์๋์ ๊ฐ์ด ๊ฐ๋จํ ์์ ์ ๋ง๋ค์ด ๋ณด์๋ค.
์๊ฒฉ์ง์์ List -> Fetch ํ HDFS์ ๋ฃ๊ณ End Node๋ก `UpdateAttribute`๋ฅผ ์ฌ์ฉํ๋ ๋ชจ์ต์ด๋ค.
Start Node๋ง ์ผฐ๋ค ๊ป๋ค ํ๋ฉด์ ์ฌ์ฉํ ๊ฑฐ๋ผ Start Node๋ ์ค์ง์ํ, ๋ค๋ฅธ ๋๋จธ์ง๋ Running ์ํ๋ก ๋๋ฉด ๋๋ค.
Start Node ์ ์ค์ผ์ฅด๋ง์ timer driven์ผ๋ก ๋๋, penalty ducation์ ์ถฉ๋ถํ ํฐ ๊ฐ์ผ๋ก ๋์ด running -> stopped ์ํ๊ฐ ๋ ๋ ๋ฌด์ํ ๋ง์ flowfile์ ์์ฑํ์ง ๋ชปํ๋๋ก (ํ ๋ฒ๋ง ์คํ๋๋๋ก) ํด์ฃผ์ด์ผ ํ๋ค.
End Node ์ properties๋ ์๋์ ๊ฐ์ด ์ค์ ํด์ค๋ค.
Store State๋ "Store state locally"๋ก ์ค์ ํ๊ณ ,
initial value๋ ์๋ฌด๊ฑฐ๋ ๋๊ณ `last_tms`๋ผ๋ properties๋ฅผ ์ถ๊ฐํด์ ๊ฐ์ ํ์ฌ ์๊ฐ `${now()}`์ผ๋ก ์์ฑํด์ค๋ค.
์ด๋ ๊ฒ ๋๋ฉด Flowfile์ด processor๋ฅผ ์ง๋ ๋๋ง๋ค ClusterNode ๋ด์ ์๋ ํค ๊ฐ์ธ last_tms์ value๋ flowfile์ด ๋๊ฐ ์์ ์ผ๋ก ์ ๋ฐ์ดํธ ๋๋ค.
2. ๊ตฌํ: nifiutil
nifi restAPI๋ ๊ณต์๋ฌธ์์ ์ ๋ฆฌ๋์ด ์๋ค.
ํ๋ก์ธ์ ์ ๋ณด ๊ฐ์ ธ์ค๊ธฐ
def get_processor(url_nifi_api, processor_id):
header = {"Content-Type": "application/json"}
response = requests.get(url_nifi_api + f"processors/{processor_id}", headers=header)
return json.loads(response.content)
def get_processor_state(url_nifi_api, processor_id, token):
header = {"Content-Type": "application/json"}
response = requests.get(url_nifi_api + f"processors/{processor_id}/state", headers=header)
return json.loads(response.content)
`get_processor`: ํ๋ก์ธ์๋ฅผ ๊ฐ์ ธ์จ๋ค.
`get_processor_state`: ํ๋ก์ธ์์ ํ์ฌ ์ํ๋ฅผ ๊ฐ์ ธ์จ๋ค.
ํ๋ก์ธ์ ์ํ ์ ๋ณด ํ์ฑ
๋ ธ๋๊ฐ ๋ ๊ฐ์ธ nifi ์ ํ๋ก์ธ์์ ๋ํด `get_processor_state`๋ก ์ํ๋ฅผ ๊ฐ์ ธ์ค๋ฉด ์๋์ ๊ฐ์ json์ ๋ฐํํ๋ค.
{
"componentState": {
"componentId": "...",
"stateDescription": "...",
"clusterState": {},
"localState": {
"scope": "LOCAL",
"state": [
{
"key": "last_tms",
"value": "0",
"clusterNodeId": "...",
"clusterNodeAddress": "nifinode1:8080"
},
{
"key": "last_tms",
"value": "0",
"clusterNodeId": "...",
"clusterNodeAddress": "nifinode2:8080"
}
],
"totalEntryCount": 2
}
}
}
1์์ stateful variable inital value๋ฅผ 0์ผ๋ก ์ค์ ํ๋ค๋ฉด last_tms ์ value๋ฅผ 0์ผ๋ก ์ค ๊ฒ์ด๋ค.
def parse_state(json_obj, state_key):
result = {}
states = json_obj["componentState"]["localState"]["state"]
for state in states:
if state["key"] == state_key:
result[state["clusterNodeId"]] = state["value"]
return result
last_tms ์ clusterNodeId๋ง ํ์ํ๊ธฐ ๋๋ฌธ์ ์ ํจ์์ ๊ฐ์ด ํ์ฑํ์ฌ {clusterNodeId:value} ํํ๋ก ๋งต์ ๋ง๋ ๋ค.
๊ฒฐ๊ณผ:
{"node1์ nodeid":"0","node2์ nodeid":"0"}
ํ๋ก์ธ์ ์ํ ๋ณ๊ฒฝ
def update_processor_status(processor_id, new_state, url_nifi_api):
processor = get_processor(url_nifi_api, processor_id)
put_dict = {
"revision": processor["revision"],
"state": new_state,
"disconnectedNodeAcknowledged": True,
}
payload = json.dumps(put_dict).encode("utf8")
header = {
"Content-Type": "application/json"
}
response = requests.put(
url_nifi_api + f"processors/{processor_id}/run-status",
headers=header,
data=payload,
)
return response
ํ๋ก์ธ์ ์ํ ๋ณ๊ฒฝ ๊ฒ์ฌ
def wait_for_update(url_nifi_api, processor_id, state_key, initial_state):
processor_state = get_processor_state(url_nifi_api, processor_id, None)
while True:
processor_state = get_processor_state(url_nifi_api, processor_id, None)
value_current = parse_state(processor_state, state_key)
if initial_state == value_current:
pause(1)
else:
pause(10)
processor_state = get_processor_state(url_nifi_api, processor_id, None)
second_current = parse_state(processor_state, state_key)
if second_current == value_current:
break
else:
continue
End Node๋ฅผ ํตํด ETL์ด ์ข ๋ฃ๋์๋์ง ์ข ๋ฃ๋์ง ์์๋์ง ํ๋จํ ๋ ์ฌ์ฉํ ํจ์์ด๋ค.
1์ด์ ํ ๋ฒ์ฉ ํ๋ก์ธ์์ state๋ฅผ ํ์ธํด ์ด๊ธฐ ๊ฐ์ last_tms์ ํ์ฌ์ last_tms๊ฐ์ ๋ณ๊ฒฝ์ด ์๋์ง ๊ฒ์ฌํ๋ค.
๋ง์ฝ ๋ณ๊ฒฝ์ด ์๋ค๋ฉด ๋ฐ๋ก ์ข ๋ฃํ์ง ์๊ณ 10์ด๊ฐ ๋ ๊ธฐ๋ค๋ฆฐ ํ์๋ ์ํ๊ฐ ๋์ผํ์ง ๊ฒ์ฌํ๋ค (๋๋ฒ์งธ if ๋ถ๋ถ) .
๋ ๋ฒ ๊ฒ์ฌํ๋ ์ด์ ๋:
์๋ฅผ๋ค์ด 100๊ฐ์ flowfile์ด round robin์ผ๋ก ๋ ๊ฐ์ ๋ ธ๋์์ ์ฒ๋ฆฌ๋๋ค๊ณ ๊ฐ์ ํ๋ฉด, ๋ ๊ฐ์ ๋ ธ๋ ์ค ๊ฐ์ฅ ๋จผ์ ์ฒซ ๋ฒ์งธ flowfile์ ์ฒ๋ฆฌํ ๋ ธ๋๋ local state๋ฅผ ์ ๋ฐ์ดํธ ํ๊ฒ๋๋ค.
last_tms ๋ณ๊ฒฝ์ด ๊ฐ์ง๋์์ง๋ง ์ฌ์ค์ ETL์ด ๋๋ ๊ฒ์ด ์๋๋ค (98๊ฐ์ flowfile์ด ๋จ์์์).
์ด๋ฌํ ์ค์๋์ ํผํ๊ธฐ ์ํด ์ผ์ ์๊ฐ ์ดํ์๋ last_tms ๊ฐ์ ๋ณ๊ฒฝ์ด ์์๋์ง ํ์ธํ๊ณ , ์์์ ๋ ETL์ด ์ข ๋ฃ๋๋ ๊ฒ์ผ๋ก ์์ฑํ๋ค.
3. ๊ตฌํ: nifihookHook
2์์ ๋ง๋ nifiutl์ importํด์ ์ฌ์ฉํ๋ค.
import requests
from airflow.exceptions import AirflowException
from airflow.providers.http.hooks.http import HttpHook
from some.nifi.path.nifiutil import update_processor_status, wait_for_update, get_processor_state, parse_state
class NifihookHook(HttpHook):
def __init__(
self,
http_conn_ids: list,
start_processor_id = '',
end_processor_id = '',
state_key = '',
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.http_conn_ids = http_conn_ids,
self.conn = None,
self.start_processor_id = start_processor_id
self.end_processor_id = end_processor_id
self.state_key = state_key
def _find_stable_nodes(self) -> None:
for conn in self.http_conn_ids:
conn_url = self.get_connection(conn).host
r = requests.post(conn_url)
if r.status_code == 200:
if conn_url[-1] == '/':
self.nifi_api = conn_url + "nifi-api/"
else:
self.nifi_api = conn_url + "/nifi-api/"
break
def execute(self) -> None:
self._find_stable_nodes()
initial_state = parse_state(get_processor_state(self.nifi_api, self.end_processor_id, None), self.state_key)
r = update_processor_status(self.start_processor_id, "RUNNING", self.nifi_api)
pause(15)
r = update_processor_status(self.start_processor_id, "STOPPED", self.nifi_api)
if length(self.end_processor_id)>0:
wait_for_update(self.nifi_api, self.end_processor_id, self.state_key, initial_state)
`http_conn_ids` ๋ list[string] ํํ๋ก ๋ฐ์์ nifi ์ฌ๋ฌ ๋ ธ๋์ ์ ๋ณด๋ฅผ ๋ฐ๋๋ค.
`_find_stable_nodes()`์์๋ list๋ก ๋ฐ์์จ nifi ๋ ธ๋์ ์์ฒญ์ ๋ณด๋ด๋ฉด์ ์๋ต์ฝ๋ 200์ ๋ฆฌํดํด์ฃผ๋ ๋ ธ๋๋ฅผ ์ฐพ๋๋ค.
๋ง์ฝ ์ ์ ์๋ต์ ๋ฐ๋ conn_id๊ฐ ์๋ค๋ฉด `self.nifi_api` ๋ ์ ์๋์ง ์๊ณ NifihookHook์ ์คํจ๋๋ค.
`execute()`์์๋ ์๋์ ์์๋ก ์คํํ๋ค.
- End node์ ํ์ฌ ์ํ๋ฅผ ๊ฐ์ ธ์จ๋ค.
- Start node๋ฅผ ์์ํ๋ค.
- ์ผ์ ์๊ฐ ์ดํ Start node๋ฅผ ์ค์งํ๋ค.
- ๋ง์ฝ end_processor_id ๊ฐ์ด ์๋ค๋ฉด End Node๋ฅผ ๊ฒ์ฌํ๋ฉฐ ๋๋ฌ๋ค๊ณ ํ๋จ๋ ๋๊น์ง ํ์ธํ๋ค. ์๋ค๋ฉด ์ด๋ถ๋ถ์ ์คํตํ๋ค.
4. ๊ตฌํ: NifiTriggerOperator
from typing import Any, Dict, Optional
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.utils.decorators import apply_defaults
from some.nifi.path.nifihookHook import NifihookHook
class NifiTriggerOperator(SimpleHttpOperator):
@apply_defaults
def __init__(
self,
*,
http_conn_ids: list,
start_processor_id: str='',
end_processor_id: str='',
state_key: str='',
**kwargs,
) -> None:
super().__init__(**kwargs)
self.http_conn_ids = http_conn_ids
self.start_processor_id = start_processor_id
self.end_processor_id = end_processor_id
self.state_key = state_key
self.hook: Optional[NifihookHook] = None
def execute(self, context: Dict[str, Any]) -> None:
self.hook = NifihookHook(
self.http_conn_ids,
self.start_processor_id,
self.end_processor_id,
self.state_key
)
self.hook.execute()
๊ทธ๋ฅ ์ธ์๊ฐ ๋ฐ์์ NifihookHook initํ๊ณ execute ํจ์ ์คํํ๋ ๋ถ๋ถ์ด๋ค.
5. ์ฌ์ฉ
test = NifiTriggerOperator(
task_id = "test",
http_conn_ids = ["nifi1_api", "nifi2_api"],
start_processor_id = f"{start_node_id}",
end_processor_id = f"{end_node_id}",
state_key = f"{my_state_key}"
)
์์์ ์ ์ํด์ค ๋ณ์๊ฐ๋ค์ ์ธ์๋ก ๋ฃ์ด์ฃผ๋ฉด ๋๋ค.
state_key ๊ฐ์ ๊ฒฝ์ฐ๋ 3์์์ฒ๋ผ property ์ด๋ฆ์ last_tms๋ก ์ค์ ํ๋ค๋ฉด `state_key = "last_tms"`๋ก ์์ฑํ๋ฉด ๋๋ค.
์ถ๊ฐ ๊ณ ๋ ค ํ์ ์ฌํญ
1, ETL์ด ๋๋ฌด ์ค๋ ๊ฑธ๋ฆฌ๋ฉด `wait_for_update`์์ ๊ณ์ while loop๋ฅผ ๋ ์ ์๋ค.
๋ฉ๋๊ฐ๋ฅํ ์๊ฐ์ ๋๊ณ ์ฐ๊ฒฐ์ ๋๋ ๋ถ๋ถ์ ์ถ๊ฐํด์ผํ ๊ฒ ๊ฐ๋ค.
2. start node running -> stopped ์ํ๋ณํ์ ํ ์ ์ข ๊ณ ๋ฏผํด๋ด์ผํ ๊ฒ ๊ฐ๋ค.
์ผ๋ก๋ก ์์ ์ listsftp๋ฅผ ์ฐ๋๋ฐ ํ์ผ์ด ๋๋ฌด ๋ง์์ ๋ฆฌ์คํ ํ๋๋ฐ๋ง 1๋ถ์ด ๊ฑธ๋ฆฐ ์ ์ด ์์๋ค..;
operator ์ฌ์ฉ ์์ ํ ์ ๋ช์ด๋ก ๋ ๊ฑด์ง ๊ฐ์ ๋ฐ๋๊ฒ๋ ์๊ฐํด๋ดค๋๋ฐ ์ฌ์ฉ์๊ฐ ์ด ์คํผ๋ ์ดํฐ๋ฅผ ์ฌ์ฉํ ๋ ๊ทธ๋ฐ๊ฒ๊น์ง ๊ณ ๋ คํด์ผ ํ๋.. ์ถ๊ธฐ๋ ํ๋ค.
3. ์คํจ์ ๋ํ ์ผ์ด์ค๋ฅผ ๋ง๋ค์ด์ task๊ฐ ์คํจํ์ ๋ ๋ก๊ทธ๋ฅผ ๋ณด๊ณ ์ง๊ด์ ์ผ๋ก ์ ์ ์๊ฒ ์ถ๊ฐ๋๋ฉด ์ข์ ๊ฒ ๊ฐ๋ค.
'๋ฐ์ดํฐ' ์นดํ ๊ณ ๋ฆฌ์ ๋ค๋ฅธ ๊ธ
[Airflow] Telegram ์๋ฆผ operator ๋ง๋ค๊ธฐ (requests ์ฌ์ฉ) (0) | 2024.03.29 |
---|---|
[Kafka] kafka produce ํ๊ธฐ (with C) (0) | 2023.08.31 |
Python์ ํตํด trino ์ ์ (0) | 2023.06.16 |
TRINO -> Hive metastore ์ฌ์ฉ ์ HIVE_METASTORE_ERROR ์ค๋ฅ ์กฐ์น (0) | 2023.04.21 |
parquet ํ์ผ ๊ตฌ์กฐ (0) | 2022.11.09 |