博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
在Python中使用多进程快速处理数据
阅读量:2350 次
发布时间:2019-05-10

本文共 2063 字,大约阅读时间需要 6 分钟。

 

数据分片:可以将数据分片处理的任务适合用多进程代码处理,核心思路是将data分片,对每一片数据处理返回结果(可能是无序的),然后合并。应用场景:多进程爬虫,类mapreduce任务。缺点是子进程会拷贝父进程所有状态,内存浪费严重。

import mathfrom multiprocessing import Pooldef run(data, index, size):  # data 传入数据,index 数据分片索引,size进程数    size = math.ceil(len(data) / size)    start = size * index    end = (index + 1) * size if (index + 1) * size < len(data) else len(data)    temp_data = data[start:end]    # do something    return data  # 可以返回数据,在后面收集起来processor = 40res = []p = Pool(processor)for i in range(processor):    res.append(p.apply_async(run, args=(data, i, processor,)))    print(str(i) + ' processor started !')p.close()p.join()for i in res:    print(i.get())  # 使用get获得多进程处理的结果

 

分文件处理:当内存受限时,不能再继续使用数据分片,因为子进程会拷贝父进程的所有状态,导致内存的浪费。这时候可以考虑先把大文件分片保存到磁盘,然后del 释放掉数据,接着在多进程处理的函数里面分别读取,这样子进程就会分别读取需要处理的数据,而不会占用大量内存。

from multiprocessing import Poolimport pandas as pdimport mathdata=pd.DataFrame({'user_id':[1,2,3,4],'item_id':[6,7,8,9]})users=pd.DataFrame(data['user_id'].unique(),columns=['user_id'])processor=4p=Pool(processor)l_data = len(users)size = math.ceil(l_data / processor)res = []def run(i):    data=pd.read_csv('../data/user_'+str(i)+'.csv')    #todo    return datafor i in range(processor):    start = size * i    end = (i + 1) * size if (i + 1) * size < l_data else l_data    user = users[start:end]    t_data = pd.merge(data, user, on='user_id').reset_index(drop=True)    t_data.to_csv('../data/user_'+str(i)+'.csv',index=False)    print(len(t_data))del data,l_data,usersfor i in range(processor):    res.append(p.apply_async(run, args=(i,)))    print(str(i) + ' processor started !')p.close()p.join()data = pd.concat([i.get() for i in res])
 

多进程数据共享:当需要修改共享的数据时,那么这个时候可以使用数据共享:

from multiprocessing import Process, Manager# 每个子进程执行的函数# 参数中,传递了一个用于多进程之间数据共享的特殊字典def func(i, d):    d[i] = i + 100    print(d.values())# 在主进程中创建特殊字典m = Manager()d = m.dict()for i in range(5):    # 让子进程去修改主进程的特殊字典    p = Process(target=func, args=(i, d))    p.start()p.join()------------[100][100, 101][100, 101, 102, 103][100, 101, 102, 103][100, 101, 102, 103, 104]
 

转载地址:http://ymmvb.baihongyu.com/

你可能感兴趣的文章
设二叉树结点的先根序列、中根序列和后根序列中,所有叶子结点的先后顺序____。
查看>>
将整数序列(7-2-4-6-3-1-5)按所示顺序构建一棵二叉排序树a(亦称二叉搜索树),之后将整数8按照二叉排序树规则插入树a中,请问插入之后的树a中序遍历结果是____。
查看>>
IP地址、子网掩码、网络号、主机号、网络地址、主机地址
查看>>
已知int a[]={1,2,3,4,5};int*p[]={a,a+1,a+2,a+3};int **q=p;表达式*(p[0]+1)+**(q+2)的值是____。
查看>>
CPU输出数据的速度远远高于打印机的打印速度,为了解决这一矛盾,可采用()
查看>>
整型字符常量和字符字面量的区别 sizeof(char) 和 sizeof('a')
查看>>
表的主键特点中,说法不正确的是()
查看>>
用变量a给出下面的定义:一个有10个指针的数组,该指针指向一个函数,该函数有一个整形参数并返回一个整型数
查看>>
冯诺依曼工作方式的基本特点是____
查看>>
下列关于文件索引结构的叙述中,哪些是正确的?
查看>>
Java异常处理
查看>>
JQueryUI实现对话框
查看>>
Java流(Stream)/文件(File)/IO
查看>>
文件处理(压缩与解压)
查看>>
Java中的目录
查看>>
JQuery实现对select选择框的赋值
查看>>
SweetAlert插件
查看>>
JSON学习
查看>>
有关项目的基础知识
查看>>
创建Java工程实现发送邮件(163邮箱)
查看>>