一、Airflow简介
Airflow是一个编排、调度和监控workflow的平台。Airflow的核心概念有五个:
DAGs:即有向无环图(DirectedAcyclicGraph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的顺序。
Operators:可以简单理解为一个class,描述了DAG中一个具体的task具体要做的事。其中,airflow内置了很多operators,如BashOperator执行一个bash命令,PythonOperator调用任意的Python函数,EmailOperator用于发送邮件,HTTPOperator用于发送HTTP请求,SqlOperator用于执行SQL命令...同时,用户可以自定义Operator,这给用户提供了极大的便利性。
Tasks:Task是Operator的一个实例,也就是DAGs中的一个node。
TaskInstance:task的一次运行。taskinstance有自己的状态,包括running,success,failed,skipped,upforretry等。
TaskRelationships:DAGs中的不同Tasks之间可以有依赖关系,如TaskATaskB,表明TaskB依赖于TaskA。
二、我们用Airflow做些什么?
只要符合定时任务流的工作,都可以用Airflow来实现。我们主要用Airflow来实现定时的ETL处理。
三、为什么要用Airflow?
既然是主要使用其定时器的功能,为什么不直接写一个定时器而要用Airflow这种很占用资源的工具呢?Airflow除了可以很方便的组织任务流中每个环节的处理之外,还有一个非常重要的优势在于任务流的管理。特别是我们现在的ETL处理中,除了可能依赖外部的数据来源之外,还经常要调用模型的执行API(在IT层面也当成外部接口使用)。这些接口未必可控(比如外部数据来源,可能出现API服务掉线或者断网情况),当出现各种不可控问题导致任务流断裂的情况的时候,自己写的定时器就难以方便的、可视化的操作数据数据补回了。还有就是任务之间的依赖情况,在任务流的实现过程中,如果自己写定时器,这方面也需要有大量的考虑,而这些考虑Airflow已经帮你完成了。所以我们在使用Airflow的时候,也需要注意利用他在这两方面的优势,写出我们灵活,容错率高的工作流。毕竟没有bug的代码是难以实现的,每当写一段代码的时候,都要考虑这段代码会不会出bug,他所调用的方法和接口会不会出bug,当出bug的时候怎样弥补。
四、基础配置airflow.cfg
安装好Airflow,第一次运行airflowinitdb之后,会在Airflow文件夹下面产生一个airflow.cfg文件,这个就是基础配置文件。我们以这个基础文件作为模板来修改成为我们需要的配置文件。以下的操作都是找到对应的配置字段,修改其字段内容。
修改默认时区:default_timezone=Asia/Shanghai,说明:修改时区之后,Airflow前端页面仍旧会使用UTC时区显示,但是配合主机/容器的时区,这样我们在写dag任务执行时间的时候就不需要转换时区了。
修改执行器类型:executor=CeleryExecutor
不加载范例dag:load_example=False
不让同个dag并行操作:max_active_runs_per_dag=1,说明:在ETL过程中,还是线性执行会比较好控制,如果里面需要批量操作,可以在ETL的具体处理过程中加入多线程或者多进程方式执行,不要在dag中体现
最高的dag并发数量:dag_concurrency=16,说明:一般配置成服务器的CPU核数,默认16也没问题。
最高的任务并发数量:worker_concurrency=16,说明:CeleryExecutor在Airflow的worker线程中执行的,这里配置的是启动多少个worker
数据库配置:sql_alchemy_conn=mysql://airflow:airflow
.0.0.1:/airflow?charset=utf8,说明:我们一般是用MySQL来配合Airflow的运行CeleryBroker:broker_url=redis://:password
.0.0.1:/0,说明:默认配置中两个redis配置被分到两个redis区,我们也照做吧。CeleryResultbackend:result_backend=redis://:password
.0.0.1:/1,说明:默认配置中两个redis配置被分到两个redis区,我们也照做吧。五、MySQL需要注意的地方
mysql的配置中需要加入以下内容,不然执行会报错。需要在initdb之前加入并重启。
[mysqld]innodb_large_prefix=onexplicit_defaults_for_timestamp=1六、运行
由于使用的是CeleryExecutor,需要顺序执行三个进程:airflowwebserver-Dairflowscheduler-Dairflowworker-D
七、一些技巧分享7.1利用provide_context在任务间传递信息
在default_args里面配置#x27;provide_context#x27;:True,这样在每个任务执行完之后都可以返回一个信息(当你需要的时候)。这样每个任务都可以获取到之前任务执行返回的信息,以进行自身的处理操作。以下是一个简单的例子:
#-------------------------------------------------------------------------------#任务1,获得数据并保存到文件中,返回文件名defjob_get_datas(**kwargs):filename=get_datas()#数据获取的函数,返回的是存储数据的文件名returnfilenameoperator_get_datas=PythonOperator(task_id=#x27;task_get_datas#x27;,python_callable=job_get_datas,dag=dag)#-------------------------------------------------------------------------------#把存储文件的数据导入数据库defjob_data_2_mysql(**kwargs):filename=kwargs[#x27;task_instance#x27;].x