RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
使用AirFlow调度MaxCompute

背景

成都创新互联公司主营通州网站建设的网络公司,主营网站建设方案,app开发定制,通州h5成都微信小程序搭建,通州网站营销推广欢迎通州等地区企业咨询

airflow是Airbnb开源的一个用python编写的调度工具,基于有向无环图(DAG),airflow可以定义一组有依赖的任务,按照依赖依次执行,通过python代码定义子任务,并支持各种Operate操作器,灵活性大,能满足用户的各种需求。本文主要介绍使用Airflow的python Operator调度MaxCompute 任务

一、环境准备

Python 2.7.5 PyODPS支持Python2.6以上版本
Airflow apache-airflow-1.10.7

1.安装MaxCompute需要的包

pip install setuptools>=3.0

pip install requests>=2.4.0

pip install greenlet>=0.4.10 # 可选,安装后能加速Tunnel上传。

pip install cython>=0.19.0 # 可选,不建议Windows用户安装。

pip install pyodps

注意:如果requests包冲突,先卸载再安装对应的版本

2.执行如下命令检查安装是否成功

python -c "from odps import ODPS"

二、开发步骤

1.在Airflow家目录编写python调度脚本Airiflow_MC.py

 
 
 
 
  1. # -*- coding: UTF-8 -*-
  2. import sys
  3. import os
  4. from odps import ODPS
  5. from odps import options
  6. from airflow import DAG
  7. from airflow.operators.python_operator import PythonOperator
  8. from datetime import datetime, timedelta
  9. from configparser import ConfigParser
  10. import time
  11. reload(sys)
  12. sys.setdefaultencoding('utf8')
  13. #修改系统默认编码。
  14. # MaxCompute参数设置
  15. options.sql.settings = {'options.tunnel.limit_instance_tunnel': False, 'odps.sql.allow.fullscan': True}
  16. cfg = ConfigParser()
  17. cfg.read("odps.ini")
  18. print(cfg.items())
  19. odps = ODPS(cfg.get("odps","access_id"),cfg.get("odps","secret_access_key"),cfg.get("odps","project"),cfg.get("odps","endpoint"))

 
 
 
 
  1. default_args = {
  2. 'owner': 'airflow',
  3. 'depends_on_past': False,
  4. 'retry_delay': timedelta(minutes=5),
  5. 'start_date':datetime(2020,1,15)
  6. # 'email': ['airflow@cdxwcx.com'],
  7. # 'email_on_failure': False,
  8. # 'email_on_retry': False,
  9. # 'retries': 1,
  10. # 'queue': 'bash_queue',
  11. # 'pool': 'backfill',
  12. # 'priority_weight': 10,
  13. # 'end_date': datetime(2016, 1, 1),
  14. }
  15. dag = DAG(
  16. 'Airiflow_MC', default_args=default_args, schedule_interval=timedelta(seconds=30))
  17. def read_sql(sqlfile):
  18. with io.open(sqlfile, encoding='utf-8', mode='r') as f:
  19. sql=f.read()
  20. f.closed
  21. return sql
  22. def get_time():
  23. print '当前时间是{}'.format(time.time())
  24. return time.time()
  25. def mc_job ():
  26. project = odps.get_project() # 取到默认项目。
  27. instance=odps.run_sql("select * from long_chinese;")
  28. print(instance.get_logview_address())
  29. instance.wait_for_success()
  30. with instance.open_reader() as reader:
  31. count = reader.count
  32. print("查询表数据条数:{}".format(count))
  33. for record in reader:
  34. print record
  35. return count
  36. t1 = PythonOperator (
  37. task_id = 'get_time' ,
  38. provide_context = False ,
  39. python_callable = get_time,
  40. dag = dag )
  41. t2 = PythonOperator (
  42. task_id = 'mc_job' ,
  43. provide_context = False ,
  44. python_callable = mc_job ,
  45. dag = dag )
  46. t2.set_upstream(t1)

2.提交

 
 
 
 
  1. python Airiflow_MC.py

3.进行测试

 
 
 
 
  1. # print the list of active DAGs
  2. airflow list_dags
  3. # prints the list of tasks the "tutorial" dag_id
  4. airflow list_tasks Airiflow_MC
  5. # prints the hierarchy of tasks in the tutorial DAG
  6. airflow list_tasks Airiflow_MC --tree
  7. #测试task
  8. airflow test Airiflow_MC get_time 2010-01-16
  9. airflow test Airiflow_MC mc_job 2010-01-16

4.运行调度任务

登录到web界面点击按钮运行

5.查看任务运行结果

1.点击view log

2.查看结果


本文名称:使用AirFlow调度MaxCompute
文章地址:http://www.jxjierui.cn/article/dpidcgd.html