python分布式计算多服务器_分布式计算--(分布式+多进程+多线程+多协程)
先来个最简单的例子:把1-10000每个数求平方服务器server:用两个队列存储任务、结果定义两个函数要实现分布式得继承multiprocessing.managers.BaseManager在主函数里multiprocessing.freeze_support()开启分布式支持注册两个函数给客户端调用创建管理器,设置ip地址和开启端口、链接密码。用两个队列加任务、收结果。用刚刚注册的函数把1-
先来个最简单的例子:
把1-10000每个数求平方
服务器server:
用两个队列存储任务、结果
定义两个函数
要实现分布式得继承multiprocessing.managers.BaseManager
在主函数里multiprocessing.freeze_support()开启分布式支持
注册两个函数给客户端调用
创建管理器,设置ip地址和开启端口、链接密码。
用两个队列加任务、收结果。用刚刚注册的函数
把1-10000压入队列,
把结果压入队列
最后完成关闭服务器
客户端client:
也需要继承multiprocessing.managers.BaseManager
定义一个协程处理一个数据,同时把结果压入结果队列
定义一个线程处理10个数据,开启10个协程
定义一个进程,进程驱动10个线程
主函数:同客户端注册两个函数
同客户端创建管理器,设置ip地址和开启端口、链接密码。
链接服务器
同客户端调用注册的函数,两个队列
套四层循环:10个进程、100个线程、1000个协程
循环进程函数
上代码:
服务器server:
#coding:utf-8
import multiprocessing #分布式进程
import multiprocessing.managers #分布式进程管理器
import random,time #随机数,时间
import Queue #队列
task_queue=Queue.Queue() #任务
result_queue=Queue.Queue() #结果
def return_task(): #返回任务队列
returntask_queuedef return_result(): #返回结果队列
returnresult_queueclass QueueManger(multiprocessing.managers.BaseManager):#继承,进程管理共享数据
pass
if __name__=="__main__":
multiprocessing.freeze_support()#开启分布式支持
QueueManger.register("get_task",callable=return_task)#注册函数给客户端调用
QueueManger.register("get_result", callable=return_result)
manger=QueueManger(address=("192.168.112.11",8848),authkey="123456") #创建一个管理器,设置地址与密码
manger.start() #开启
task,result=manger.get_task(),manger.get_result() #任务,结果
for i in range(10000):print "task add data",i
task.put(i)print "waitting for------"
for i in range(10000):
res=result.get(timeout=100)print "get data",res
manger.shutdown()#关闭服务器
客户端client:
#coding:utf-8
import multiprocessing #分布式进程
import multiprocessing.managers #分布式进程管理器
import random,time #随机数,时间
import Queue #队列
importthreadingimportgeventimportgevent.monkeyclass QueueManger(multiprocessing.managers.BaseManager):#继承,进程管理共享数据
pass
def gevetygo(num ,result): #协程处理一个数据
print num*num
result.put(num*num)def threadgo(datalist,result): #线程处理10个数据,开启10个协程
tasklist=[]for data indatalist:
tasklist.append(gevent.spawn(gevetygo, data,result))
gevent.joinall(tasklist)def processgo(ddatalist,result): #[[1,2,3],[4,5,6]] 进程驱动了10个线程
threadlist=[]for datalist inddatalist:
mythread=threading.Thread(target=threadgo,args=(datalist,result))
mythread.start()
threadlist.append(mythread)for mythread inthreadlist:
mythread.join()if __name__=="__main__":
QueueManger.register("get_task") #注册函数调用服务器
QueueManger.register("get_result")
manger=QueueManger(address=("192.168.112.11",8848),authkey="123456")
manger.connect()#链接服务器
task=manger.get_task()
result=manger.get_result() #任务,结果
#1000
#10个进程
#100个线程
#1000个协程
for i in range(10):
cubelist= [] #[[[1],[2]]]
for j in range(10):
arealist=[]for k in range(10):
linelist=[]for l in range(10):
data=task.get()
linelist.append(data)
arealist.append(linelist)
cubelist.append(arealist)
processlist=[]for myarealist incubelist:
process= multiprocessing.Process(target=processgo, args=(myarealist, result))
process.start()
processlist.append(process)for process inprocesslist:
process.join()
遇到的坑:一个月之前弄分布式的时候写ip地址怎么都开启不了,后来换了台电脑就支持了= =。
如果只是在自己电脑上弄的话,写127.0.0.1也可以运行,如果你也遇到ip地址怎么都开启不了的情况
DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐


所有评论(0)