前言
在很多时候,我们都在和数据是打交道,所以经常会去做一些数据库连接,比如Mysql、Mongo等等。
如果直接把数据库连接写在DAG中,这虽然是可行的,但这会暴露数据库连接等敏感信息,并且当数据库连接信息发生变化时,维护成本也将大大提升。
了解Connection
Airflow提供了Connections
帮助用户管理各种连接(不仅限于数据库连接)。
打开Airflow Web,选中Admin Connections即可管理所有连接。
我们使用docker-compose快速搭建的Airflow默认已支持大部分Connection。
为了更加详细的说明和演示,我选择没有默认安装的MongoDB Connection Type。为了连接MongoDB,那我们则需要安装Mongo Provider。
https://airflow.apache.org/docs/apache-airflow-providers-mongo/stable/index.html
新的Connection
对于比较常用的Python包,Airflow官方建议根据自身的实际需求自定义镜像。
现在,我们需要让Airflow调度和执行支持Mongo连接的任务,则需要安装apache-airflow-providers-mongo
自定义镜像
新建Dockerfile
文件
FROM apache/airflow:2.2.2
COPY requirements.txt .
RUN pip install -r requirements.txt --no-cache-dir
同目录下新建requirements.txt
apache-airflow-providers-mongo
pymongo=-3.10.1
安装apache-airflow-providers-mongo时,默认会安装pymongo,这个版本是4以上。对于低版本的mongo数据库,可以像上方一样在requirements.txt中主动指定pymongo的版本。
构建镜像
docker build -t mkdir700/airflow:2.2.2 .
构建好之后,我们就可以使用自己的镜像运行Airflow服务了。
还记得.env
文件吗?打开.env
,新增以下配置:
AIRFLOW_IMAGE_NAME=mkdir700/airflow:2.2.2
最后,重启Airflow服务
docker-compose up -d
添加Connection
在新建Connection时,就可以看到已经支持Mongo连接了。
填写Mongo的连接信息
我用Docker在本地起了一个Mongo服务,没有帐号密码就全留空。
注意:对于有帐号密码的连接,密码只会在首次创建时显示,后续再去编辑这个连接信息时,密码输入框中的内容会是空的。
填写好连接之后,我们可以测试这个连接的连通性。
这个需要方法支持,mongo这个包没有实现测试方法,所以点击就会报错。
如果是Mysql是可以测试的。
使用Connection
接下来,我们就可以在DAG中使用mongo连接了。
在DAG中,我们可以用MongoHook
拿到mongo数据库连接对象,指定我们刚才填写的连接id即可。
from airflow.providers.mongo.sensors.mongo import MongoHook
mongo_hook = MongoHook(conn_id="local_mongo")
获取collection对象,然后就可以操作数据了。
coll = mongo_hook.get_collection(<集合名>, <数据库名>)
完整DAG:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
# DAG开发环境安装 pip install apache-airflow-providers-mongo
from airflow.providers.mongo.sensors.mongo import MongoHook
default_args = {
'owner': 'airflow',
}
with DAG(
'test_mogno_dag',
default_args=default_args,
description='connect MongoDB tutorial',
schedule_interval=None,
start_date=datetime(2021, 1, 1),
catchup=False,
tags=['test'],
) as dag:
def read():
# 读取指定的连接id,并与数据库建立连接
conn = MongoHook(conn_id="local_mongo")
# 获取集合(获得表),之后的使用就和Pymongo一模一样了
coll = conn.get_collection("test_db", "test_coll")
# 返回查询结果
return list(coll.find({}, {"_id": 0}))
run_this = PythonOperator(
task_id='test_mogno_dag',
python_callable=read,
)
执行DAG,查看日志: