一个读取excel数据处理完成后读入数据库的例子
最近收集了一批数据,各地根据问题数据做出反馈,但是各地在反馈的时候字段都进行了创新,好在下发的数据内容并没有改变,开始写的单进程的,由于时间较长,耗时380 秒,又改成多进程的,时间缩短为80-秒。现在把程序发出来,请各位大神进行指正。
import multiprocessing
import os
import time
import pandas as pd
from sqlalchemy import create_engine
import asyncio
import warnings
# warnings.simplefilter("ignore")
ywstrList=['经办机构', '原子业务编号', '原子业务名称', '风险名称','风险描述',
'校验规则结果', '创建时间', '风险提示信息', '业务日志号']
ywstrListMemo=['经办机构', '原子业务编号', '原子业务名称', '风险名称','风险描述',
'校验规则结果', '创建时间', '风险提示信息', '业务日志号','memo','time']
szlist = ['省本', '成都', '自贡', '攀枝', '泸州', '德阳', '绵阳', '广元',
'遂宁', '内江', '乐山', '南充', '眉山', '宜宾', '广安', '达州',
'雅安', '巴中', '资阳', '阿坝', '甘孜', '凉山']
# connect = create_engine('mysql pymysql://root:@127.0.0.1:3306/ywgk?charset=utf8')
connect = create_engine('mysql mysqlconnector://root:@127.0.0.1:3306/ywgk?charset=utf8')
# engine = create_engine('mysql mysqlconnector://scott:tiger@localhost/foo')
def findSZ(filename):
for sz in szlist:
if filename.find(sz) != -1:
return sz
return None
def ReadExcel(filename):
xlsdf = ''
xlsdf = pd.read_excel(filename)
""""
remove columns='sz' or '市'
"""
if "市" in list(xlsdf.keys().to_list()):
xlsdf.drop(columns='市', axis=1, inplace=True)
if "sz" in list(xlsdf.keys().to_list()):
xlsdf.drop(columns='sz', axis=1, inplace=True)
xlsdf = xlsdf.fillna("").astype('string')
return xlsdf
def filterDataOfSz(filename, xlsdf):
sz = findSZ(filename)
print(sz)
"""
筛选出包含对应市州的数据。
"""
if sz != None:
xlsdf = xlsdf[xlsdf['经办机构'].str[:2] == sz] # 筛出本市州数据
return xlsdf
def ConCatRestCols(xlsdf):
"""
去掉业务部分字段,保留市州反馈意见。
"""
print(xlsdf)
# if xlsdf==None:
#
print(filename "为空,需要处理")
#
return
xlfdf_keys_set = set(xlsdf.keys().to_list())
xlsdf_restkeys_set = xlfdf_keys_set - set(ywstrList)
xlsdf_restkeys_list = list(xlsdf_restkeys_set)
xls_rest_df = xlsdf.loc[:, xlsdf_restkeys_list] # 可以正确操作
xlsdf['memo'] = '#'
for col in xlsdf_restkeys_list:
xlsdf['memo'] = xlsdf[col]
#
return xlsdf
def SetTimeStamp(filename, xlsdf):
xlsdf['time'] = os.stat(str(filename)).st_mtime
return xlsdf
async def ProcessExcelAndtosql(filename, table):
df = ReadExcel(filename)
df = filterDataOfSz(filename=filename, xlsdf=df)
df = ConCatRestCols(df)
df = SetTimeStamp(filename=filename, xlsdf=df)
df = df.loc[:, ywstrListMemo]
print(filename)
print(df)
df.to_sql(name=table, con=connect, if_exists='append', index=False, chunksize=1000, method='multi')
def profile(func):
def wrapers(*args,**kwargs):
print("测试开始")
begin=time.time()
func(*args,**kwargs)
end=time.time()
print(f"耗时{end-begin}秒")
return wrapers
# async def getmsg(msg):
#
print(f'#{msg}')
#
await asyncio.sleep(1)
def getFiles(src:str):
import pathlib
files=[]
for file in pathlib.Path(src).rglob("*.xls?"):
files.append(str(file))
return files
def process_asyncio(files,table):
loop=asyncio.new_event_loop()
tasks=[loop.create_task(ProcessExcelAndtosql(filename,table)) for filename in files]
loop.run_until_complete(asyncio.wait(tasks))
@profile
def run(iterable,table):
process_count = multiprocessing.cpu_count()
# print(process_count)
pool = multiprocessing.Pool(process_count-2)
iterable=get_chunks(iterable, process_count)
for lst in iterable:
pool.apply_async(process_asyncio, args=(lst,table))
pool.close()
pool.join()
def main():
files = getFiles(r"e:\市州返回")
run(files, 'ywgk3')
def get_chunks(iterable,num):
# global iterable
import numpy as np
return np.array_split(iterable, num)
# import profile
if __name__=="__main__":
main()
本人只是编程的业余爱好者,只是把技术用于辅助工作,并没有深入研究技术理论,都是野路子,还请批评指正。
- 0000
- 0000
- 0001
- 0000
- 0000