airflow使用oracle

Airflow是一个用于调度、监控和编写多步任务的平台,因为最近在工作中需要处理oracle相关任务,所以在此分享如何在Airflow中使用Oracle数据库。

首先需要安装Oracle驱动程序包cx_Oracle,在Airflow的DAG中可以直接使用PythonOperator调用Oracle进行数据读取、数据清洗、数据加载等操作。假设我们有一个需求是从一个Oracle表中读取数据,清洗后再写入另一个Oracle表中,那么数据清洗的Python代码可以如下所示:

import cx_Oracle def cleaning_data(): 1. 连接源Oracle数据库 connection_src = cx_Oracle.connect("user/password@database_src") cursor_src = connection_src.cursor() 1. 查询数据 cursor_src.execute("SELECT * FROM TABLE_SRC") 1. 清理数据,并写入目标Oracle数据库 connection_target = cx_Oracle.connect("user/password@database_target") cursor_target = connection_target.cursor() for row in cursor_src: 1. 数据清洗 row_cleaned = row[0] + ' cleaned' 1. 写入目标数据库 cursor_target.execute("INSERT INTO TABLE_TARGET VALUES (:1)", (row_cleaned,)) connection_target.commit() 1. 关闭数据库连接 cursor_src.close() cursor_target.close() connection_src.close() connection_target.close()