进程池的作用: 比如有100任务需要执行,那么我们不可能开100个进程来执行任务,这样会导致程序执行的很慢,但是我们可以造一个池子里面放着4个进程,每个进程执行一个任务,当其中一个进程执行完了任务,下一个任务就会进入该进程开始执行,以此类推。这样程序就会执行的很快
开多进程,进程数超过3个以上就使用进程池来处理
调用进程池的方法:
- map(函数, 可迭代对象)
- apply(函数, args=('参数一',))
- apply_async(函数, args=('参数一',))
一定要在 map/apply/apply_async 调用完之后调用 .close() 和 .join() ,且 .join() 方法必须在 .close() 或 .teminate() 之后执行不然就会报错
1. .map(函数,可迭代对象) -> 可迭代对象的长度就是任务的数量,函数每一次都会接收可迭代对象里面的一个值进行处理
import time
from multiprocessing import Pool
def fun(i):
time.sleep(0.1)
i += 1
print(i)
if __name__ == '__main__':
p = Pool(5) # 创建num个进程来处理任务,一般都是cpu的核数 + 1
p.map(fun, range(1000)) # 使用进程池,完成任务 和 Process() 一样
p.close() # 关闭进程池,不允许再向进程池添加任务
p.join() # 等待子进程结束再往下执行,注意 .join 必须在 close 之后执行,不然就会报错
print('------ 主进程 ------')
# 对比直接开启1000个进程处理任务 和 使用进程池处理任务的时间
import time
from multiprocessing import Pool
from multiprocessing import Process
def fun(i):
i += 1
if __name__ == '__main__':
# 使用进程池处理1000个任务
p = Pool(5)
start = time.time()
p.map(fun, range(1000))
p.close()
p.join()
print('使用进程池处理1000个任务所需的时间:%s' % (time.time() - start)) # 使用进程池处理1000个任务所需的时间:0.14959955215454102
# 直接开启1000个进程处理任务
l = []
start = time.time()
for i in range(1000):
p = Process(target=fun, args=(i,))
p.start()
l.append(p)
[i.join() for i in l]
print('直接开启1000个进程处理任务所需的时间:%s' % (time.time() - start)) # 直接开启1000个进程处理任务所需的时间:46.90270376205444
2..apply(函数,args=('参数一',)) -> 同步提交任务机制
import time
from multiprocessing import Pool
from multiprocessing import Process
def fun(i):
time.sleep(0.1)
i += 1
print(i)
if __name__ == '__main__':
p = Pool(5) # 创建num个进程来处理任务,一般都是cpu的核数 + 1
for i in range(20):
p.apply(fun, args=(i,)) # 使用进程池
p.close() # 关闭进程池,不允许再向进程池添加任务
p.join() # 等待子进程结束再往下执行,注意 .join 必须在 close 之后执行,不然就会报错
print('------ 主进程 ------')
3..apply_async(函数,args=('参数一',)) -> 异步提交任务机制
- 当使用了 apply_async 的时候,进程池里面的进程和主进程也进行了异步,意思就是主进程不等待子进程就直接结束了(而 Process, 主进程会等待子进程结束后再结束),但进程池的进程还是在运行着只是看不到而已,处理方式:在调用完 apply_async 后,执行 .close() 和 .join() ,且 .join() 方法必须在 .close() 或 .teminate() 之后执行不然就会报错
from multiprocessing import Pool
from multiprocessing import Process
def fun(i):
i += 1
print(i)
if __name__ == '__main__':
p = Pool(5) # 创建num个进程来处理任务,一般都是cpu的核数 + 1
for i in range(1000):
p.apply_async(fun, args=(i,)) # 使用进程池
p.close() # 关闭进程池,不允许再向进程池添加任务
p.join() # 等待子进程结束再往下执行,注意 .join 必须在 close 之后执行,不然就会报错
print('------ 主进程 ------')
4.使用了进程池后都可以的到进程的返回值
- map进程直接返回一个列表
import time
from multiprocessing import Pool
def fun(i):
time.sleep(0.1)
i += 1
return i
if __name__ == '__main__':
p = Pool(5)
res = p.map(fun, range(20)) # 得到进程的返回值,且返回值是一个列表
p.close()
p.join()
print(res) # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
- apply 无需使用 .get() 方法获取,因为进程直接将值返回
from multiprocessing import Pool
from multiprocessing import Process
def fun(i):
i += 1
return i
if __name__ == '__main__':
p = Pool(5)
res_l = []
for i in range(1000):
res = p.apply(fun, args=(i,)) # 得到进程返回值
res_l.append(res)
p.close()
p.join()
[print(i) for i in res_l] # 无需使用 .get() 方法获取,因为进程直接将值返回
- apply_async 需要使用 .get() 方法获取,因为进程返回的是一个对象
- 不要在循环中进行获取进程的返回值,因为 .get() 方法就是在等待着获取值,这样会将异步执行变成了同步执行,而是将得到的进程对象放进列表中,等待进程结束后再循环列表再使用 .get() 获取返回值
from multiprocessing import Pool
from multiprocessing import Process
def fun(i):
i += 1
return i
if __name__ == '__main__':
p = Pool(5)
res_l = []
for i in range(1000):
res = p.apply_async(fun, args=(i,)) # 得到进程对象
res_l.append(res)
# print(res.get()) # 注意:不要在循环中进行获取进程的返回值,因为 .get() 方法就是在等待着获取值,这样会将异步执行变成了同步执行,而是将得到的进程对象放进列表中,等待进程结束后再循环列表再使用 .get() 获取返回值
p.close()
p.join()
[print(i.get()) for i in res_l] # 使用 .get() 方法获取进程对象中的进程返回值
5.回调函数
- 只有 apply 和 apply_async 里面才有回调函数
- 回调函数其实就是接收进程的返回值后再做一次处理
- 回调函数是在主进程中完成的
- 回调函数不能传参,只能接收多进程中函数的返回值
from multiprocessing import Pool
# 进程函数
def fun(i):
return i * '*'
# 回调函数
def cb(arg): # 接收进程函数的返回值
print(arg) # 处理进程函数的返回值
if __name__ == '__main__':
p = Pool(5)
for i in range(10):
p.apply_async(fun, args=(i,), callback=cb) # callback = 回调函数
p.close()
p.join()
- 如果使用了回调函数得到的返回值也还是和之前一样,是获取进程函数的返回值,而不是回调函数的返回值
from multiprocessing import Pool
# 进程函数
def fun(i):
return i * '*' # 进程函数的返回值
# 回调函数
def cb(arg):
arg += '1'
return arg # 回调函数的返回值
if __name__ == '__main__':
p = Pool(5)
r_l = []
for i in range(10):
res = p.apply_async(fun, args=(i,), callback=cb)
r_l.append(res)
[print(i.get()) for i in r_l] # 打印出来的还是进程函数的返回值,而不是回调函数的
p.close()
p.join()
- 回调函数的使用场景
- 进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
- 我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果
- 回调函数一般会用于爬虫的比较多,因为爬虫需要等待页面返回结果,例如下面的例子
- 例如: 我们要对10个页面进行页面的请求,获取页面的代码进行分析,我们不可能等待一个页面请求完拿到页面代码进行处理后,再去请求第二个页面,这样会很浪费时间的,我们可以使用进程池和回调函数进行处理,把请求多个页面等待结果的任务放到进程池中执行,然后进程池里的进程就会同时开始请求不同的页面,当其中一个进程的请求返回了结果,就将该结果传递给回调函数进行分析处理
# 上述的例子:对多个页面进行页面的请求, 获取页面的代码进行分析
import requests
from multiprocessing import Pool
# 把请求的任务放在进程池中处理,因为请求页面需要等待
def get_url(url):
ret = requests.get(url)
return {
'url': url,
'status_code': ret.status_code, # 获取请求的状态码
'content': ret.text # 获取请求成功后的页面代码
}
# 等待获取页面的请求结果,然后对请求结果进行处理
def cb(dic):
print('网址:%s 状态码:%s 页面代码的长度:%s' % (dic['url'], dic['status_code'], len(dic['content'])))
if __name__ == '__main__':
url_l = [
'http://www.baidu.com',
'http://www.sogou.com',
'http://www.hao123.com',
'http://www.yangxiaoer.cc',
'http://www.python.org'
]
p = Pool(4)
for i in url_l:
p.apply_async(get_url, args=(i,), callback=cb)
p.close()
p.join()