先来个最简单的例子:

把1-10000每个数求平方

服务器server:

用两个队列存储任务、结果

定义两个函数

要实现分布式得继承multiprocessing.managers.BaseManager

在主函数里multiprocessing.freeze_support()开启分布式支持

注册两个函数给客户端调用

创建管理器,设置ip地址和开启端口、链接密码。

用两个队列加任务、收结果。用刚刚注册的函数

把1-10000压入队列,

把结果压入队列

最后完成关闭服务器

客户端client:

也需要继承multiprocessing.managers.BaseManager

定义一个协程处理一个数据,同时把结果压入结果队列

定义一个线程处理10个数据,开启10个协程

定义一个进程,进程驱动10个线程

主函数:同客户端注册两个函数

同客户端创建管理器,设置ip地址和开启端口、链接密码。

链接服务器

同客户端调用注册的函数,两个队列

套四层循环:10个进程、100个线程、1000个协程

循环进程函数

上代码:

服务器server:

#coding:utf-8

import multiprocessing #分布式进程

import multiprocessing.managers #分布式进程管理器

import random,time #随机数,时间

import Queue #队列

task_queue=Queue.Queue() #任务

result_queue=Queue.Queue() #结果

def return_task(): #返回任务队列

returntask_queuedef return_result(): #返回结果队列

returnresult_queueclass QueueManger(multiprocessing.managers.BaseManager):#继承,进程管理共享数据

pass

if __name__=="__main__":

multiprocessing.freeze_support()#开启分布式支持

QueueManger.register("get_task",callable=return_task)#注册函数给客户端调用

QueueManger.register("get_result", callable=return_result)

manger=QueueManger(address=("192.168.112.11",8848),authkey="123456") #创建一个管理器,设置地址与密码

manger.start() #开启

task,result=manger.get_task(),manger.get_result() #任务,结果

for i in range(10000):print "task add data",i

task.put(i)print "waitting for------"

for i in range(10000):

res=result.get(timeout=100)print "get data",res

manger.shutdown()#关闭服务器

客户端client:

#coding:utf-8

import multiprocessing #分布式进程

import multiprocessing.managers #分布式进程管理器

import random,time #随机数,时间

import Queue #队列

importthreadingimportgeventimportgevent.monkeyclass QueueManger(multiprocessing.managers.BaseManager):#继承,进程管理共享数据

pass

def gevetygo(num ,result): #协程处理一个数据

print num*num

result.put(num*num)def threadgo(datalist,result): #线程处理10个数据,开启10个协程

tasklist=[]for data indatalist:

tasklist.append(gevent.spawn(gevetygo, data,result))

gevent.joinall(tasklist)def processgo(ddatalist,result): #[[1,2,3],[4,5,6]] 进程驱动了10个线程

threadlist=[]for datalist inddatalist:

mythread=threading.Thread(target=threadgo,args=(datalist,result))

mythread.start()

threadlist.append(mythread)for mythread inthreadlist:

mythread.join()if __name__=="__main__":

QueueManger.register("get_task") #注册函数调用服务器

QueueManger.register("get_result")

manger=QueueManger(address=("192.168.112.11",8848),authkey="123456")

manger.connect()#链接服务器

task=manger.get_task()

result=manger.get_result() #任务,结果

#1000

#10个进程

#100个线程

#1000个协程

for i in range(10):

cubelist= [] #[[[1],[2]]]

for j in range(10):

arealist=[]for k in range(10):

linelist=[]for l in range(10):

data=task.get()

linelist.append(data)

arealist.append(linelist)

cubelist.append(arealist)

processlist=[]for myarealist incubelist:

process= multiprocessing.Process(target=processgo, args=(myarealist, result))

process.start()

processlist.append(process)for process inprocesslist:

process.join()

遇到的坑:一个月之前弄分布式的时候写ip地址怎么都开启不了,后来换了台电脑就支持了= =。

如果只是在自己电脑上弄的话,写127.0.0.1也可以运行,如果你也遇到ip地址怎么都开启不了的情况

Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐