前言
我们接触一个新的框架总会有许多新概念,这些概念虽然比较重要,但我想它的功能才是我们初步接触最想体验的,所以我更偏向于在使用的过程中去逐渐了解。
本文将从实际需求出发完成一个DAG的编写,并且我会写些BUG,以达到避坑的目的。
需求
现在我们有一个需求,每天定时请求下方接口,并把数据保存成文本文件。
https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US
分析
一共两个步骤:
- 请求接口;
- 存储响应内容;
我们可以分别定义两个函数,来完成这两个步骤。
请求接口
def request():
import requests
resp = requests.get("https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US")
return resp
保存数据
def save(text: str):
with open("/opt/airflow/results.txt", "a+", encoding="utf-8") as f:
f.write(text)
DAG
在上一步,我们分别完成了两个函数,如果将它们组合起来就可以完成需求了。所以,现在需要一个家伙来“组合”它们。
DAG(Directed Acyclic Graph)即,有向无环图,关于它的含义,官方介绍比我清楚的多。
通俗点理解,DAG是一张地图,Airflow将根据这张地图,按照上面的路线抵达终点。
现在,我们就需要完成这张地图的路线绘制。
对于我们的需求,执行的路线就是,先下载再保存 ,所以路线图就是:
Operator
我们知道DAG是用来绘制路线图的,那么我们的任务怎么执行,谁来执行呢?
所以,需要一个家伙来将我们写的代码跑起来,这个家伙就是Operator。
列车有乘务,飞机有空姐,轮船有水手... Operator也是一样,根据不同的任务类型,可以选择不同的Operator。
例如:
- BashOperator - 执行Bash命令行
- PythonOperator - 调用任意Python函数
- EmailOperator - 发送邮件
- 更多Operator
我们的需求都是执行Python函数,所以我们可以只使用PythonOperator
不同Operator可以互相依赖,就好像海陆空协同作战。
编写DAG
了解完上面的两个基本概念,我们就可以编写DAG了。
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG(
dag_id='download_bing',
schedule_interval='0 0 * * *', # 定时执行,Cron表达式
start_date=datetime(2021, 1, 1), # 开始时间
tags=['test'], # DAG的tag标签,用于分类
) as dag:
def request():
import requests
resp = requests.get("https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US")
return resp
def save(text: str):
with open("/opt/airflow/results.txt", "a+", encoding="utf-8") as f:
f.write(text)
# 使用PythonOperator执行Python函数
request_task = PythonOperator(
task_id="request_task",
python_callable=request
)
save_task = PythonOperator(
task_id="save_task",
python_callable=save
)
上方就是我们编写DAG,仔细看,request和save之间没有先后关系(依赖关系)。
Airflow实现了语法糖,使用>>
、 <<
定义任务之间的依赖关系,则在本例中的依赖关系可以定义如下:
request_task >> save_task
完整DAG
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG(
dag_id='download_bing',
schedule_interval='0 0 * * *',
start_date=datetime(2021, 1, 1),
tags=['test'],
) as dag:
def request():
import requests
resp = requests.get("https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US")
return resp
def save(text: str):
with open("/opt/airflow/results.txt", "a+", encoding="utf-8") as f:
f.write(text)
request_task = PythonOperator(
task_id="request_task",
python_callable=request
)
save_task = PythonOperator(
task_id="save_task",
python_callable=save
)
request_task >> save_task
激活DAG
“激活”很简单,我们只要放置到/dags
目录下,等待Airflow自动读取即可。
在[入门Airflow] 使用Docker在本地快速搭建Airflow一文中,我们已映射了/dags /logs /plugins
这三个文件,所以我们将刚编写的DAG放置到/dags
目录下,然后打开Web端,等待一会儿刷新即可。
如果DAG有错误,Web端则会包导入错误的异常:
可以输入框,输入test
标签快速找到DAG
执行DAG
点击右侧的执行按钮
好吧,不出意外,你会看到下图的一片红色(doge🤣)
查看DAG执行日志
我理解你急于解决这个问题的心情,不妨来看看如何是哪里出了问题。
返回值类型错误
报错信息:
TypeError: Object of type Response is not JSON serializable
这个报错说Response
类型不能被JSON序列化。
详细内容:
[2021-12-16, 21:24:37 ] {python.py:152} INFO - Done. Returned value was: <Response [200]>
[2021-12-16, 21:24:37 ] {xcom.py:334} ERROR - Could not serialize the XCom value into JSON. If you are using pickle instead of JSON for XCom, then you need to enable pickle support for XCom in your *** config.
不卖关子,request
函数的返回值就是Response
,Airflow推荐的返回值是能够JSON序列化的数据。
所以,我们可以返回Response.text
即可。
DAG执行多次的问题
上面的报错解决了,但你可以先不用去改代码,我们进入DAG详情页面,选择Calendar
选项
What Fuck! 怎么全是红的?
这其实是因为我们配置DAG的原因
with DAG(
dag_id='download_bing',
schedule_interval='0 0 * * *',
start_date=datetime(2021, 1, 1),
tags=['test'],
) as dag:
...
我们指定了start_date
,所以Airflow就从2021-01-01
执行到当前时间。
如果想让过去的时间,归为当前一次执行,则应该指定参数catchup
为False
,我理解的意思是不捕获错过的时间 。
with DAG(
dag_id='download_bing',
schedule_interval='0 0 * * *',
start_date=datetime(2021, 1, 1),
tags=['test'],
catchup=False # 新增
) as dag:
...
任务之间的值传递
还有一件事。有没有发现,request
返回值,如何交给save
呢?
详细的可以了解Xcoms
的概念,这里我就简单描述下任务之间值传递的逻辑。
save
并不是被动接收来自request
的返回值,而是主动去询问。
所以save
函数这样写:
def save(**kwargs):
# 获取任务实例
ti = kwargs['ti']
# 拉取任务ID为request_task的返回值
text = ti.xcom_pull(task_ids="request_task")
with open("/opt/airflow/results.txt", "a+", encoding="utf-8") as f:
f.write(text)
通过ti.xcom_pull
,我们就可以拿到request_task
的返回值。
对于值传递,可以在Xcoms查看详情,我也会在后续的文章中,详细解释Xcoms工作逻辑。
完整DAG代码:
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
with DAG(
dag_id='download_bing',
schedule_interval='0 0 * * *',
start_date=datetime(2021, 1, 1),
tags=['test'],
catchup=False,
) as dag:
def request():
import requests
resp = requests.get("https://www.bing.com/HPImageArchive.aspx?format=js&idx=1&n=10&mkt=en-US")
return resp.text
def save(**kwargs):
# 获取任务实例
ti = kwargs['ti']
# 拉取任务ID为request_task的返回值
text = ti.xcom_pull(task_ids="request_task")
with open("/opt/airflow/results.txt", "a+", encoding="utf-8") as f:
f.write(text)
request_task = PythonOperator(
task_id="request_task",
python_callable=request
)
save_task = PythonOperator(
task_id="save_task",
python_callable=save
)
request_task >> save_task
再次执行DAG
再次执行,可以看到变绿了🤡
执行下方命令,查看Worker容器:
docker exec -it airflow_airflow-worker_1 bash
ls -la
至此,文件已成功保存了。