网站建设化妆品的目录今天的国内新闻
前段时间有位客户问: 你们的程序能不能给我们生成个 txt 文件,把新增的员工都放进来,字段也不需要太多,就要 员工姓名/卡号/员工编号/员工职位/公司 这些字段就行了,然后我们的程序会去读取这个 txt 文件,拿里面的内容,读完之后会这个文件删掉
我: 可以接受延迟吗?可能没办法实时生成,写个脚本,用定时任务去跑还是可以实现的
客户: 可以呀,那个脚本 30mins 跑一次就行
这篇文章就记录一下大概实现思路,后续如果遇到类似需求,就可以直接 copy 代码了
实现思路:
- 定义全局变量 update_time
- 判断 update_time 是否为空
- 为空,说明是第一次查询数据,取查到的最后一条记录的 create_time ,赋值给 update_time
- 不为空,说明不是第一次查询数据,查询数据时, create_time > update_time ,同时更新 update_time
- 判断 txt 文件是否存在
- 存在,则 txt 表头不需要重新生成
- 不存在, txt 表头需要重新生成
其实逻辑很简单, Python 类库也很丰富,接下来就上代码:
import pymysql
import os
import argparse
import schedule
import datetimeupdate_time = ""def handle_variables(server_ip,password):global real_ip real_ip = server_ipglobal root_password root_password = passworddef get_person_info():connection = pymysql.connect(host = real_ip,user = 'root',password = root_password,db = 'xxx')try:global update_timecur = connection.cursor()if "" == update_time :sql = "select `name`,`card_num`,`code`,`postion`,`company_id`,`gmt_create` from person"else :sql = "select `name`,`card_num`,`code`,`postion`,`company_id`,`gmt_create` from person where `gmt_create` > '" + update_time + "'"cur.execute(sql)query_person_info_list = cur.fetchall()# get the time of the last recordif len(query_person_info_list) != 0:temp_list = query_person_info_list[-1]update_time = temp_list[-1]if isinstance(update_time, datetime.datetime):update_time = update_time.strftime('%Y-%m-%d %H:%M:%S')# if the txt not exists, add new textif not os.path.exists('add_person.txt') :with open('add_person.txt', 'w') as f:f.write('Lastname, Firstname, CardNumber, Employee ID, Designation, Department \n')for person_info in query_person_info_list :person_name = person_info[0]# the name is generally first + last, which needs to be convertedperson_name_list = person_name.split()person_first_name = person_name_list[0]start_length = len(person_first_name) + 1person_last_name = person_name[start_length:]f.write(str(person_last_name) + ', ' + str(person_first_name) + ', ')card_number = person_info[1]employee_id = person_info[2]designation = person_info[3]department = person_info[4]if card_number is not None :f.write(str(card_number) + ', ')else :f.write(', ')if employee_id is not None :f.write(str(employee_id) + ', ')else :f.write(', ')if designation is not None :f.write(str(designation) + ', ')else :f.write(', ')if department is not None :# get department namedepartment_sql = "select `name` from zone where id = " + str(department)cur.execute(department_sql)department_result = cur.fetchone()department_name = department_result[0]f.write(str(department_name))else :f.write(', ')f.write('\n')# if the txt exists, update the textelse :# get original datawith open('add_person.txt', 'r') as e:data = e.readlines()with open('add_person.txt', 'w') as f:# first save the original data for write_data in data :f.write(write_data)# then save new datafor person_info in query_person_info_list :person_name = person_info[0]# the name is generally first + last, which needs to be convertedperson_name_list = person_name.split()person_first_name = person_name_list[0]start_length = len(person_first_name) + 1person_last_name = person_name[start_length:]f.write(str(person_last_name) + ', ' + str(person_first_name) + ', ')card_number = person_info[1]employee_id = person_info[2]designation = person_info[3]department = person_info[4]if card_number is not None :f.write(str(card_number) + ', ')else :f.write(', ')if employee_id is not None :f.write(str(employee_id) + ', ')else :f.write(', ')if designation is not None :f.write(str(designation) + ', ')else :f.write(', ')if department is not None :# get department namedepartment_sql = "select `name` from zone where id = " + str(department)cur.execute(department_sql)department_result = cur.fetchone()department_name = department_result[0]f.write(str(department_name))else :f.write(', ')f.write('\n')except IOError:print('Error: get update person info fail')finally:cur.close()connection.close()if __name__ == '__main__':parser = argparse.ArgumentParser(description="please enter")parser.add_argument("--server_ip", "-server", help="server ip", default="", type=str, required=False)parser.add_argument("--password", "-p", help="devops infra password", default="", type=str, required=False)args = parser.parse_args()# let the variable become a global variable, no need to pass ithandle_variables(args.server_ip, args.password)# execute the method every 30mins#schedule.every(30).minutes.do(get_person_info)# for test -- execute the method every 30 secondsschedule.every(30).seconds.do(get_person_info)while True:# run task schedule.run_pending()
以上~