Connection

Airflow UI ํ™”๋ฉด์—์„œ ๋“ฑ๋กํ•œ ์ปค๋„ฅ์…˜ ์ •๋ณด

๋“ฑ๋ก

keyvalue
Connection idconn=db-postgres-custom
Connection TypePostgres
Host172.28.0.3
Databasehaejun
Loginhaejun
Password4780
Port5432

Hook

๊ฐœ์š” ๋ฐ ํŠน์ง•

  • ๊ฐœ์š”: Airflow ์™ธ๋ถ€ ์†”๋ฃจ์…˜ ๊ธฐ๋Šฅ์„ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋„๋ก ๋ฏธ๋ฆฌ ๊ตฌํ˜„๋œ ๋ฉ”์„œ๋“œ๋ฅผ ๊ฐ€์ง„ ํด๋ž˜์Šค
  • ํŠน์ง•
    • Connection ์ •๋ณด๋ฅผ ํ†ตํ•ด ์ƒ์„ฑ๋จ(์ฆ‰, Connection์ด ์ƒ์„ฑ๋œ ํ›„ ์ž‘๋™)
      • ์ ‘์†์ •๋ณด๋ฅผ Connection ํ†ตํ•ด ๋ฐ›์•„์˜ค๋ฏ€๋กœ ์ฝ”๋“œ์ƒ ๋…ธ์ถœ๋˜์ง€ ์•Š์Œ
    • ํŠน์ • ์†”๋ฃจ์…˜ ๋‹ค๋ฃฐ ์ˆ˜ ์žˆ๊ฒŒ ๋ฉ”์„œ๋“œ๊ฐ€ ๋ฏธ๋ฆฌ ๊ตฌํ˜„๋จ
    • task๋ฅผ ๋งŒ๋“ค์ง€ ๋ชปํ•จ
      • ์ปค์Šคํ…€ ์˜คํผ๋ ˆ์ดํ„ฐ ๋˜๋Š” python ์˜คํผ๋ ˆ์ดํ„ฐ ๋‚ด ์‚ฌ์šฉ๋˜๋Š” ํ•จ์ˆ˜ ๊ฐ์ฒด

1. get_conn()

Summary

get_conn()์„ ํ†ตํ•ด Airflow UI์— ์ž…๋ ฅํ•œ Connection ์ •๋ณด๋ฅผ ๊ฐ€์ ธ์™€ ์•„๋ž˜ ๋ณ€์ˆ˜๋ฅผ ํ• ๋‹นํ•˜๊ณ , DB์— ์—ฐ๊ฒฐํ•จ

def get_conn(self) -> connection: 
"""Establish a connection to a postgres database.""" 
	conn_id = getattr(self, self.conn_name_attr) 
	conn = deepcopy(self.connection or self.get_connection(conn_id)) 
 
# check for authentication via AWS IAM 
	if conn.extra_dejson.get("iam", False): 
		conn.login, conn.password, conn.port = self.get_iam_token(conn) 
		
	conn_args = { 
		"host": conn.host, 
		"user": conn.login, 
		"password": conn.password, 
		"dbname": self.database or conn.schema, 
		"port": conn.port, 
		} 
	...
	self.conn = psycopg2.connect(**conn_args) 
	return self.conn

2. Bulk_load

  • bulk_load(table,tmp_file)
    • table: DB ํ…Œ์ด๋ธ”๋ช…
    • tmp_file: ์˜ฌ๋ฆด ํŒŒ์ผ๋ช…
  • copy_expert ๋ฉ”์„œ๋“œ๋ฅผ ํ™œ์šฉ

๋ฌธ์ œ์ 

  • Load ๊ฐ€๋Šฅํ•œ ๊ตฌ๋ถ„์ž๊ฐ€ \t์œผ๋กœ ๊ณ ์ •
  • Header๊นŒ์ง€ ํฌํ•จ๋˜์–ด ์—…๋กœ๋“œ
  • ํŠน์ˆ˜๋ฌธ์ž๋กœ ์ธํ•ด ํŒŒ์‹ฑ ์•ˆ๋  ๊ฒฝ์šฐ ์—๋Ÿฌ ๋ฐœ์ƒ
def bulk_load(self, table: str, tmp_file: str) -> None: 
	"""Load a tab-delimited file into a database table.""" 
	self.copy_expert(f"COPY {table} FROM STDIN", tmp_file)
 
def bulk_dump(self, table: str, tmp_file: str) -> None: 
	"""Dump a database table into a tab-delimited file.""" 
	self.copy_expert(f"COPY {table} TO STDOUT", tmp_file)
 

2-1) copy_expert

  • copy_exper(sql, filename)
    • sql: bulk_load์• ์„œ ์ž…๋ ฅํ•œ query๋ฌธ
    • filename: bulk_load์—์„œ ์ž…๋ ฅํ•œ tmp_file
def copy_expert(self, sql: str, filename: str) -> None: 
"""Execute SQL using psycopg2's ``copy_expert`` method. Necessary to execute COPY command without access to a superuser. 
 
Note: if this method is called with a "COPY FROM" statement and the specified input file does not exist, it creates an empty file and no data is loaded, but the operation succeeds. So if users want to be aware when the input file does not exist, they have to check its existence by themselves. """
 
	self.log.info("Running copy expert: %s, filename: %s", sql, filename) 
	if not os.path.isfile(filename): 
		with open(filename, "w"): 
			pass 
			
	with open(filename, "r+") as file, closing(self.get_conn()) as conn, closing(conn.cursor()) as cur:
	 cur.copy_expert(sql, file) 
	 file.truncate(file.tell()) 
	 conn.commit()

3. Custom Hook

๊ธฐ์กด Hook ๋‹จ์  ๊ฐœ์„ 

  • BaseHook ์˜ ํด๋ž˜์Šค๋ฅผ ์ƒ์† ๋ฐ›์•„ ์ž‘์„ฑํ•ด์•ผํ•จ!!
  • ๊ธฐ์กด Hook ๋‹จ์  ์˜ˆ์‹œ
    • Custom Hook์„ ์ƒ์„ฑํ•˜์—ฌ ๊ตฌ๋ถ„์ž ์œ ํ˜•์„ ์ž…๋ ฅ๋ฐ›์Œ
    • Header์—ฌ๋ถ€ ์„ ํƒ
    • ํŠน์ˆ˜๋ฌธ์ž ์ œ๊ฑฐ ๋กœ์ง ์ถ”๊ฐ€
  • sqlalchemy์ด์šฉ๋ฐฉ๋ฒ• ๊ณ ๋ ค1

3-1) BaseHook ์ฃผ์š” ํ•จ์ˆ˜

get_connection(conn_id)
  • conn์„ ๋ฐ˜ํ™˜
  • ๋‹จ, Airflow UI ํ™”๋ฉด connection์— ์ž…๋ ฅํ•œ ์ •๋ณด๋ฅผ ๋‹ด์€ ๊ฐ์ฒด๋ฅผ ๋ฐ˜ํ™˜
  • get_conn(conn_id)์™€ ์ฐจ์ด์  ๊ตฌ๋ณ„!!
get_hook()
  • Hook ๊ฐ์ฒด๋ฅผ ๋ฐ˜ํ™˜
get_conn(conn_id)
  • get_conn()์„ ์„  ๊ตฌํ˜„ ํ•ด๋†“๊ณ  ์ž‘์—…์ด ํ•„์š”
  • conn ๋ฐ˜ํ™˜
  • Postgres DB์™€์˜ ์—ฐ๊ฒฐ์ด ๋‹ด๊ธด ๊ฐ์ฒด๋ฅผ ๋ฐ˜ํ™˜

@classmethod

  • ๊ฐ์ฒด๋ฅผ ๋”ฐ๋กœ ์ง€์ •ํ•˜์ง€ ์•Š๊ณ  ๋ฐ”๋กœ ์‚ฌ์šฉ ๊ฐ€๋Šฅ
# 1. ๊ธฐ์กด class
import BaseHook
 
a = BaseHook()
a.function()
 
# 2. classmethod
import BaseHook
 
b = BaseHook.get_connections(conn_id)

3-2) Custom Hook ๊ฐœ๋ฐœ

๋ชฉ์ 

  1. get_conn๋ฉ”์„œ๋“œ ๊ตฌํ˜„
  2. DB์™€ ์—ฐ๊ฒฐ ์„ธ์…˜ ๊ฐ์ฒด์ธ conn์„ ๋ฐ˜ํ™˜
    1. ์ฃผ์˜! Airflow UI์— ๋“ฑ๋กํ•œ Connectino ์ •๋ณด๋ฅผ ๋‹ด์€ conn์ด ์•„๋‹˜!!!
  3. BaseHook์˜ ์ถ”์ƒ ๋ฉ”์„œ๋“œ ๋ฐ ์ž์‹ ํด๋ž˜์Šค ๊ตฌํ˜„
  4. bulk_load๋ฉ”์„œ๋“œ ๊ตฌํ˜„
  5. ์‚ฌ์šฉ์ž ์ •์˜