Connection
Airflow UI ํ๋ฉด์์ ๋ฑ๋กํ ์ปค๋ฅ์ ์ ๋ณด
๋ฑ๋ก
| key | value |
|---|---|
| Connection id | conn=db-postgres-custom |
| Connection Type | Postgres |
| Host | 172.28.0.3 |
| Database | haejun |
| Login | haejun |
| Password | 4780 |
| Port | 5432 |
Hook
๊ฐ์ ๋ฐ ํน์ง
- ๊ฐ์: Airflow ์ธ๋ถ ์๋ฃจ์ ๊ธฐ๋ฅ์ ์ฌ์ฉํ ์ ์๋๋ก ๋ฏธ๋ฆฌ ๊ตฌํ๋ ๋ฉ์๋๋ฅผ ๊ฐ์ง ํด๋์ค
- ํน์ง
- Connection ์ ๋ณด๋ฅผ ํตํด ์์ฑ๋จ(์ฆ, Connection์ด ์์ฑ๋ ํ ์๋)
- ์ ์์ ๋ณด๋ฅผ Connection ํตํด ๋ฐ์์ค๋ฏ๋ก ์ฝ๋์ ๋ ธ์ถ๋์ง ์์
- ํน์ ์๋ฃจ์ ๋ค๋ฃฐ ์ ์๊ฒ ๋ฉ์๋๊ฐ ๋ฏธ๋ฆฌ ๊ตฌํ๋จ
- task๋ฅผ ๋ง๋ค์ง ๋ชปํจ
- ์ปค์คํ ์คํผ๋ ์ดํฐ ๋๋ python ์คํผ๋ ์ดํฐ ๋ด ์ฌ์ฉ๋๋ ํจ์ ๊ฐ์ฒด
1. get_conn()
Summary
get_conn()์ ํตํด Airflow UI์ ์ ๋ ฅํ Connection ์ ๋ณด๋ฅผ ๊ฐ์ ธ์ ์๋ ๋ณ์๋ฅผ ํ ๋นํ๊ณ , DB์ ์ฐ๊ฒฐํจ
def get_conn(self) -> connection:
"""Establish a connection to a postgres database."""
conn_id = getattr(self, self.conn_name_attr)
conn = deepcopy(self.connection or self.get_connection(conn_id))
# check for authentication via AWS IAM
if conn.extra_dejson.get("iam", False):
conn.login, conn.password, conn.port = self.get_iam_token(conn)
conn_args = {
"host": conn.host,
"user": conn.login,
"password": conn.password,
"dbname": self.database or conn.schema,
"port": conn.port,
}
...
self.conn = psycopg2.connect(**conn_args)
return self.conn2. Bulk_load
bulk_load(table,tmp_file)table: DB ํ ์ด๋ธ๋ชtmp_file: ์ฌ๋ฆด ํ์ผ๋ช
copy_expert๋ฉ์๋๋ฅผ ํ์ฉ
๋ฌธ์ ์
- Load ๊ฐ๋ฅํ ๊ตฌ๋ถ์๊ฐ
\t์ผ๋ก ๊ณ ์ Header๊น์ง ํฌํจ๋์ด ์ ๋ก๋- ํน์๋ฌธ์๋ก ์ธํด ํ์ฑ ์๋ ๊ฒฝ์ฐ ์๋ฌ ๋ฐ์
def bulk_load(self, table: str, tmp_file: str) -> None:
"""Load a tab-delimited file into a database table."""
self.copy_expert(f"COPY {table} FROM STDIN", tmp_file)
def bulk_dump(self, table: str, tmp_file: str) -> None:
"""Dump a database table into a tab-delimited file."""
self.copy_expert(f"COPY {table} TO STDOUT", tmp_file)
2-1) copy_expert
copy_exper(sql, filename)sql:bulk_load์ ์ ์ ๋ ฅํ query๋ฌธfilename:bulk_load์์ ์ ๋ ฅํtmp_file
def copy_expert(self, sql: str, filename: str) -> None:
"""Execute SQL using psycopg2's ``copy_expert`` method. Necessary to execute COPY command without access to a superuser.
Note: if this method is called with a "COPY FROM" statement and the specified input file does not exist, it creates an empty file and no data is loaded, but the operation succeeds. So if users want to be aware when the input file does not exist, they have to check its existence by themselves. """
self.log.info("Running copy expert: %s, filename: %s", sql, filename)
if not os.path.isfile(filename):
with open(filename, "w"):
pass
with open(filename, "r+") as file, closing(self.get_conn()) as conn, closing(conn.cursor()) as cur:
cur.copy_expert(sql, file)
file.truncate(file.tell())
conn.commit()3. Custom Hook
๊ธฐ์กด Hook ๋จ์ ๊ฐ์
BaseHook์ ํด๋์ค๋ฅผ ์์ ๋ฐ์ ์์ฑํด์ผํจ!!- ๊ธฐ์กด Hook ๋จ์ ์์
- Custom Hook์ ์์ฑํ์ฌ ๊ตฌ๋ถ์ ์ ํ์ ์ ๋ ฅ๋ฐ์
Header์ฌ๋ถ ์ ํ- ํน์๋ฌธ์ ์ ๊ฑฐ ๋ก์ง ์ถ๊ฐ
sqlalchemy์ด์ฉ๋ฐฉ๋ฒ ๊ณ ๋ ค1
3-1) BaseHook ์ฃผ์ ํจ์
get_connection(conn_id)
conn์ ๋ฐํ- ๋จ, Airflow UI ํ๋ฉด connection์ ์ ๋ ฅํ ์ ๋ณด๋ฅผ ๋ด์ ๊ฐ์ฒด๋ฅผ ๋ฐํ
get_conn(conn_id)์ ์ฐจ์ด์ ๊ตฌ๋ณ!!
get_hook()
- Hook ๊ฐ์ฒด๋ฅผ ๋ฐํ
get_conn(conn_id)
- get_conn()์ ์ ๊ตฌํ ํด๋๊ณ ์์ ์ด ํ์
conn๋ฐํ- Postgres DB์์ ์ฐ๊ฒฐ์ด ๋ด๊ธด ๊ฐ์ฒด๋ฅผ ๋ฐํ
@classmethod
- ๊ฐ์ฒด๋ฅผ ๋ฐ๋ก ์ง์ ํ์ง ์๊ณ ๋ฐ๋ก ์ฌ์ฉ ๊ฐ๋ฅ
# 1. ๊ธฐ์กด class
import BaseHook
a = BaseHook()
a.function()
# 2. classmethod
import BaseHook
b = BaseHook.get_connections(conn_id)3-2) Custom Hook ๊ฐ๋ฐ
๋ชฉ์
get_conn๋ฉ์๋ ๊ตฌํ- DB์ ์ฐ๊ฒฐ ์ธ์ ๊ฐ์ฒด์ธ
conn์ ๋ฐํ
- ์ฃผ์! Airflow UI์ ๋ฑ๋กํ Connectino ์ ๋ณด๋ฅผ ๋ด์ conn์ด ์๋!!!
- BaseHook์ ์ถ์ ๋ฉ์๋ ๋ฐ ์์ ํด๋์ค ๊ตฌํ
bulk_load๋ฉ์๋ ๊ตฌํ- ์ฌ์ฉ์ ์ ์