本文共 2063 字,大约阅读时间需要 6 分钟。
数据分片:可以将数据分片处理的任务适合用多进程代码处理,核心思路是将data分片,对每一片数据处理返回结果(可能是无序的),然后合并。应用场景:多进程爬虫,类mapreduce任务。缺点是子进程会拷贝父进程所有状态,内存浪费严重。
import mathfrom multiprocessing import Pooldef run(data, index, size): # data 传入数据,index 数据分片索引,size进程数 size = math.ceil(len(data) / size) start = size * index end = (index + 1) * size if (index + 1) * size < len(data) else len(data) temp_data = data[start:end] # do something return data # 可以返回数据,在后面收集起来processor = 40res = []p = Pool(processor)for i in range(processor): res.append(p.apply_async(run, args=(data, i, processor,))) print(str(i) + ' processor started !')p.close()p.join()for i in res: print(i.get()) # 使用get获得多进程处理的结果
分文件处理:当内存受限时,不能再继续使用数据分片,因为子进程会拷贝父进程的所有状态,导致内存的浪费。这时候可以考虑先把大文件分片保存到磁盘,然后del 释放掉数据,接着在多进程处理的函数里面分别读取,这样子进程就会分别读取需要处理的数据,而不会占用大量内存。
from multiprocessing import Poolimport pandas as pdimport mathdata=pd.DataFrame({'user_id':[1,2,3,4],'item_id':[6,7,8,9]})users=pd.DataFrame(data['user_id'].unique(),columns=['user_id'])processor=4p=Pool(processor)l_data = len(users)size = math.ceil(l_data / processor)res = []def run(i): data=pd.read_csv('../data/user_'+str(i)+'.csv') #todo return datafor i in range(processor): start = size * i end = (i + 1) * size if (i + 1) * size < l_data else l_data user = users[start:end] t_data = pd.merge(data, user, on='user_id').reset_index(drop=True) t_data.to_csv('../data/user_'+str(i)+'.csv',index=False) print(len(t_data))del data,l_data,usersfor i in range(processor): res.append(p.apply_async(run, args=(i,))) print(str(i) + ' processor started !')p.close()p.join()data = pd.concat([i.get() for i in res])
多进程数据共享:当需要修改共享的数据时,那么这个时候可以使用数据共享:
from multiprocessing import Process, Manager# 每个子进程执行的函数# 参数中,传递了一个用于多进程之间数据共享的特殊字典def func(i, d): d[i] = i + 100 print(d.values())# 在主进程中创建特殊字典m = Manager()d = m.dict()for i in range(5): # 让子进程去修改主进程的特殊字典 p = Process(target=func, args=(i, d)) p.start()p.join()------------[100][100, 101][100, 101, 102, 103][100, 101, 102, 103][100, 101, 102, 103, 104]
转载地址:http://ymmvb.baihongyu.com/