门户型网站建设,招商局网站建设方案,网站开发工程师月薪,如何做一款app使用DS接口实现批量导入工作量并上线脚本
前面实现了批量生成DS的任务#xff0c;当导入时发现只能逐个导入#xff0c;因此通过接口实现会更方便。
DS接口文档
DS是有接口文档的地址是
http://IP:12345/dolphinscheduler/swagger-ui/index.html?languagezh_CNlang…使用DS接口实现批量导入工作量并上线脚本
前面实现了批量生成DS的任务当导入时发现只能逐个导入因此通过接口实现会更方便。
DS接口文档
DS是有接口文档的地址是
http://IP:12345/dolphinscheduler/swagger-ui/index.html?languagezh_CNlangcn不过这文档写的比较简略不太能懂那么只能自己去找了。
token
所有的接口都需要用到token 在安全中心-令牌管理 创建一个token 。记住这个token后面所有的接口都需要用到 。
header
根据上面的token组成请求要用的header
token
headers {Accept: application/json,token: token
}项目ID project_id 可以在查看项目工作流时在url中找到。
DS导入任务接口
导入任务的接口是
import_url http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition/import知道接口 就可以导入了
def import_job(file_path):
# 打开文件并读取为二进制数据with open(file_path, rb) as file:files {file: file}# 导入工作流response requests.post(import_url, headersheaders, filesfiles)print(response.status_code)if response.status_code ! 200:print(上传失败 file_path)需要注意的是导入任务时 只支持二进制 。 file_path 是工作流文件具体实现 可以工作流中导出一个作为参考。 重复使用上述方法就可以实现批量导入任务。
工作流上线
使用上述方法批量完成任务上传后依旧有问题逐个上线工作量也是个不小的工作量因此继续使用接口。 经过研究发现上线工作流需要先获取工作流的调度ID 。
获取工作流列表 - 获取工作流code - 获取所有工作流的调度ID - 工作流上线
获取工作流列表
这是接口地址
jobs_url http://IP:12345/dolphinscheduler/projects/{project_id}/process-definition不过这个要分页查询稍微有一点点麻烦
def get_jobs_list():# 分页查询# 初始化分页参数pageNo 1pageSize 10url f{jobs_url}?pageSize10pageNo1searchVal# 构建完整的URL# 存储所有结果all_items list()while True:# 构建完整的URLurl f{jobs_url}?pageSize{pageSize}pageNo{pageNo}searchVal# 发送GET请求response requests.get(url, headersheaders)# 检查响应状态码if response.status_code 200:# 请求成功处理响应数据items response.content.decode()total json.loads(items)[data][total]item json.loads(items)[data][totalList]# 将当前页的数据添加到结果列表中for i in item:all_items.append(i)# 如果当前页没有数据退出循环if pageNo * pageSize total:breakif not items:break# 增加页码pageNo 1else:# 请求失败打印错误信息print(请求失败:, response.status_code, response.text)breakreturn all_itemsall_items 是所有工作流的具体内容需要提取一下 all_jobs get_jobs_list()job_codes [job[code] for job in all_jobs]这样就是所有的工作流code
获取调度ID
下面是调度ID的接口因为不想分页直接一页1000个。
schedules_url http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules?pageSize1000pageNo1processDefinitionCode使用这个接口就能拿到所有的调度ID
def schedule_id(job_code):url schedules_urlstr(job_code)response requests.get(url, headersheaders)if response.status_code 200:data response.content.decode()js json.loads(data)if len(js[data][totalList])0 and js[data][totalList][0][releaseState]OFFLINE:return js[data][totalList][0][id]else:return 这里过滤了已经上线的调度ID 。
上线
万事俱备 终于可以上线了
online_url http://36.133.140.132:12345/dolphinscheduler/projects/{project_id}/schedules/{scheduler_id}/online具体实现
def online_job(scheduler_id):url online_url.format(scheduler_idscheduler_id)response requests.post(url, headersheaders)if response.status_code 200:print(success)else:print(online job failed)到此 就可以实现导入-批量全自动了。
打完收工祝你不加班。