airflow使用oracle
Airflow是一个用于调度、监控和编写多步任务的平台,因为最近在工作中需要处理oracle相关任务,所以在此分享如何在Airflow中使用Oracle数据库。
首先需要安装Oracle驱动程序包cx_Oracle,在Airflow的DAG中可以直接使用PythonOperator调用Oracle进行数据读取、数据清洗、数据加载等操作。假设我们有一个需求是从一个Oracle表中读取数据,清洗后再写入另一个Oracle表中,那么数据清洗的Python代码可以如下所示:
import cx_Oracle
def cleaning_data():
1. 连接源Oracle数据库
connection_src = cx_Oracle.connect("user/password@database_src")
cursor_src = connection_src.cursor()
1. 查询数据
cursor_src.execute("SELECT * FROM TABLE_SRC")
1. 清理数据,并写入目标Oracle数据库
connection_target = cx_Oracle.connect("user/password@database_target")
cursor_target = connection_target.cursor()
for row in cursor_src:
1. 数据清洗
row_cleaned = row[0] + ' cleaned'
1. 写入目标数据库
cursor_target.execute("INSERT INTO TABLE_TARGET VALUES (:1)", (row_cleaned,))
connection_target.commit()
1. 关闭数据库连接
cursor_src.close()
cursor_target.close()
connection_src.close()
connection_target.close()