/ 我的程序人生 / 1252浏览

【入门Airflow】Connection管理|如何连接数据库?

前言

在很多时候,我们都在和数据是打交道,所以经常会去做一些数据库连接,比如Mysql、Mongo等等。

如果直接把数据库连接写在DAG中,这虽然是可行的,但这会暴露数据库连接等敏感信息,并且当数据库连接信息发生变化时,维护成本也将大大提升。

了解Connection

Airflow提供了Connections 帮助用户管理各种连接(不仅限于数据库连接)。

打开Airflow Web,选中Admin Connections即可管理所有连接。

查看Connections

我们使用docker-compose快速搭建的Airflow默认已支持大部分Connection。

为了更加详细的说明和演示,我选择没有默认安装的MongoDB Connection Type。为了连接MongoDB,那我们则需要安装Mongo Provider。

https://airflow.apache.org/docs/apache-airflow-providers-mongo/stable/index.html

新的Connection

对于比较常用的Python包,Airflow官方建议根据自身的实际需求自定义镜像。

现在,我们需要让Airflow调度和执行支持Mongo连接的任务,则需要安装apache-airflow-providers-mongo

自定义镜像

新建Dockerfile文件

FROM apache/airflow:2.2.2

COPY requirements.txt .

RUN pip install -r requirements.txt --no-cache-dir

同目录下新建requirements.txt

apache-airflow-providers-mongo
pymongo=-3.10.1

安装apache-airflow-providers-mongo时,默认会安装pymongo,这个版本是4以上。对于低版本的mongo数据库,可以像上方一样在requirements.txt中主动指定pymongo的版本。

构建镜像

docker build -t mkdir700/airflow:2.2.2 .

构建好之后,我们就可以使用自己的镜像运行Airflow服务了。

还记得.env文件吗?打开.env,新增以下配置:

AIRFLOW_IMAGE_NAME=mkdir700/airflow:2.2.2

最后,重启Airflow服务

docker-compose up -d

添加Connection

在新建Connection时,就可以看到已经支持Mongo连接了。

填写Mongo的连接信息

我用Docker在本地起了一个Mongo服务,没有帐号密码就全留空。

注意:对于有帐号密码的连接,密码只会在首次创建时显示,后续再去编辑这个连接信息时,密码输入框中的内容会是空的。

填写好连接之后,我们可以测试这个连接的连通性。

这个需要方法支持,mongo这个包没有实现测试方法,所以点击就会报错。

如果是Mysql是可以测试的。

使用Connection

接下来,我们就可以在DAG中使用mongo连接了。

在DAG中,我们可以用MongoHook拿到mongo数据库连接对象,指定我们刚才填写的连接id即可。

from airflow.providers.mongo.sensors.mongo import MongoHook


mongo_hook = MongoHook(conn_id="local_mongo")

获取collection对象,然后就可以操作数据了。

coll = mongo_hook.get_collection(<集合名>, <数据库名>)

完整DAG:

from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
# DAG开发环境安装 pip install apache-airflow-providers-mongo
from airflow.providers.mongo.sensors.mongo import MongoHook

default_args = {
    'owner': 'airflow',
}

with DAG(
    'test_mogno_dag',
    default_args=default_args,
    description='connect MongoDB tutorial',
    schedule_interval=None,
    start_date=datetime(2021, 1, 1),
    catchup=False,
    tags=['test'],
) as dag:
    def read():
        # 读取指定的连接id,并与数据库建立连接
        conn = MongoHook(conn_id="local_mongo")
        # 获取集合(获得表),之后的使用就和Pymongo一模一样了
        coll = conn.get_collection("test_db", "test_coll")
        # 返回查询结果
        return list(coll.find({}, {"_id": 0}))
        
    
    
    run_this = PythonOperator(
        task_id='test_mogno_dag',
        python_callable=read,
    )

执行DAG,查看日志:

SQLite 锁机制:读锁策略与并发事务分析
SQLite 锁机制:读锁策略与并发事务分析
使用 GoReleaser 发布 Rust 二进制文件
使用 GoReleaser 发布 Rust 二进制文件
解决在 Windows 上 openssl-sys 构建失败的问题
解决 Rust 测试中的并行执行冲突:保护共享资源的策略
解决 Rust 测试中的并行执行冲突:保护共享资源的策略
Rust 中的跨平台开发:处理平台特定代码和未使用代码警告
Rust 中的跨平台开发:处理平台特定代码和未使用代码警告
ncdu:高效的磁盘使用分析工具
ncdu:高效的磁盘使用分析工具