进程池的作用: 比如有100任务需要执行,那么我们不可能开100个进程来执行任务,这样会导致程序执行的很慢,但是我们可以造一个池子里面放着4个进程,每个进程执行一个任务,当其中一个进程执行完了任务,下一个任务就会进入该进程开始执行,以此类推。这样程序就会执行的很快

开多进程,进程数超过3个以上就使用进程池来处理

调用进程池的方法:
  • map(函数, 可迭代对象)
  • apply(函数, args=('参数一',))
  • apply_async(函数, args=('参数一',))

定要在 map/apply/apply_async 调用完之后调用  .close() 和 .join() ,且 .join() 方法必须在 .close() 或 .teminate() 之后执行不然就会报错

1. .map(函数,可迭代对象) -> 可迭代对象的长度就是任务的数量,函数每一次都会接收可迭代对象里面的一个值进行处理

import time
from multiprocessing import Pool

def fun(i):
    time.sleep(0.1)
    i += 1
    print(i)

if __name__ == '__main__':
    p = Pool(5)  # 创建num个进程来处理任务,一般都是cpu的核数 + 1
    p.map(fun, range(1000))  # 使用进程池,完成任务 和 Process() 一样
    p.close()  # 关闭进程池,不允许再向进程池添加任务
    p.join() # 等待子进程结束再往下执行,注意 .join 必须在 close 之后执行,不然就会报错
print('------ 主进程 ------')

# 对比直接开启1000个进程处理任务 和 使用进程池处理任务的时间

import time
from multiprocessing import Pool
from multiprocessing import Process


def fun(i):
    i += 1


if __name__ == '__main__':
    # 使用进程池处理1000个任务
    p = Pool(5)
    start = time.time()
    p.map(fun, range(1000))
    p.close()
    p.join()
    print('使用进程池处理1000个任务所需的时间:%s' % (time.time() - start))  # 使用进程池处理1000个任务所需的时间:0.14959955215454102

    # 直接开启1000个进程处理任务
    l = []
    start = time.time()
    for i in range(1000):
        p = Process(target=fun, args=(i,))
        p.start()
        l.append(p)

    [i.join() for i in l]
    print('直接开启1000个进程处理任务所需的时间:%s' % (time.time() - start))  # 直接开启1000个进程处理任务所需的时间:46.90270376205444

2..apply(函数,args=('参数一',)) -> 同步提交任务机制

import time
from multiprocessing import Pool
from multiprocessing import Process

def fun(i):
    time.sleep(0.1)
    i += 1
    print(i)

if __name__ == '__main__':
    p = Pool(5)  # 创建num个进程来处理任务,一般都是cpu的核数 + 1
    for i in range(20):
        p.apply(fun, args=(i,))  # 使用进程池
    p.close()  # 关闭进程池,不允许再向进程池添加任务
    p.join() # 等待子进程结束再往下执行,注意 .join 必须在 close 之后执行,不然就会报错
print('------ 主进程 ------')

3..apply_async(函数,args=('参数一',)) -> 异步提交任务机制

  • 当使用了 apply_async 的时候,进程池里面的进程和主进程进行了异步,意思就是主进程不等待子进程就直接结束了(而 Process, 主进程会等待子进程结束后再结束),但进程池的进程还是在运行着只是看不到而已,处理方式:在调用完 apply_async 后,执行 .close() 和 .join() ,且 .join() 方法必须在 .close() 或 .teminate() 之后执行不然就会报错

from multiprocessing import Pool
from multiprocessing import Process


def fun(i):
    i += 1
    print(i)


if __name__ == '__main__':
    p = Pool(5)  # 创建num个进程来处理任务,一般都是cpu的核数 + 1
    for i in range(1000):
        p.apply_async(fun, args=(i,))  # 使用进程池
    p.close()  # 关闭进程池,不允许再向进程池添加任务
    p.join()  # 等待子进程结束再往下执行,注意 .join 必须在 close 之后执行,不然就会报错
print('------ 主进程 ------')

4.使用了进程池后都可以的到进程的返回值

  • map进程直接返回一个列表

import time
from multiprocessing import Pool

def fun(i):
    time.sleep(0.1)
    i += 1
    return i

if __name__ == '__main__':
    p = Pool(5)
    res = p.map(fun, range(20))  # 得到进程的返回值,且返回值是一个列表
    p.close()
    p.join()
    print(res) # [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]

  • apply 无需使用 .get() 方法获取,因为进程直接将值返回

from multiprocessing import Pool
from multiprocessing import Process


def fun(i):
    i += 1
    return i


if __name__ == '__main__':
    p = Pool(5)

    res_l = []
    for i in range(1000):
        res = p.apply(fun, args=(i,))  # 得到进程返回值
        res_l.append(res)

    p.close()  
    p.join()

    [print(i) for i in res_l]  # 无需使用 .get() 方法获取,因为进程直接将值返回

  • apply_async 需要使用 .get() 方法获取,因为进程返回的是一个对象

    • 不要在循环中进行获取进程的返回值,因为 .get() 方法就是在等待着获取值,这样会将异步执行变成了同步执行,而是将得到的进程对象放进列表中,等待进程结束后再循环列表再使用 .get() 获取返回值

from multiprocessing import Pool
from multiprocessing import Process


def fun(i):
    i += 1
    return i


if __name__ == '__main__':
    p = Pool(5)

    res_l = []
    for i in range(1000):
        res = p.apply_async(fun, args=(i,))  # 得到进程对象
        res_l.append(res)
 # print(res.get())  # 注意:不要在循环中进行获取进程的返回值,因为 .get() 方法就是在等待着获取值,这样会将异步执行变成了同步执行,而是将得到的进程对象放进列表中,等待进程结束后再循环列表再使用 .get() 获取返回值

    p.close()
    p.join()

    [print(i.get()) for i in res_l]  # 使用 .get() 方法获取进程对象中的进程返回值

5.回调函数

  • 只有 apply 和 apply_async 里面才有回调函数
  • 回调函数其实就是接收进程的返回值后再做一次处理
  • 回调函数是在主进程中完成的
  • 回调函数不能传参,只能接收多进程中函数的返回值

from multiprocessing import Pool

# 进程函数
def fun(i):
    return i * '*'


# 回调函数
def cb(arg):  # 接收进程函数的返回值
    print(arg)  # 处理进程函数的返回值


if __name__ == '__main__':
    p = Pool(5)

    for i in range(10):
        p.apply_async(fun, args=(i,), callback=cb)  # callback = 回调函数

    p.close()
    p.join()

  • 如果使用了回调函数得到的返回值也还是和之前一样,是获取进程函数的返回值,而不是回调函数的返回值

from multiprocessing import Pool

# 进程函数
def fun(i):
    return i * '*'  # 进程函数的返回值


# 回调函数
def cb(arg):
    arg += '1'
    return arg  # 回调函数的返回值


if __name__ == '__main__':
    p = Pool(5)

    r_l = []
    for i in range(10):
        res = p.apply_async(fun, args=(i,), callback=cb)
        r_l.append(res)

    [print(i.get()) for i in r_l]  # 打印出来的还是进程函数的返回值,而不是回调函数的

    p.close()
    p.join()

  • 回调函数的使用场景

    • 进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数

    • 我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果

    • 回调函数一般会用于爬虫的比较多,因为爬虫需要等待页面返回结果,例如下面的例子

    • 例如: 我们要对10个页面进行页面的请求,获取页面的代码进行分析,我们不可能等待一个页面请求完拿到页面代码进行处理后,再去请求第二个页面,这样会很浪费时间的,我们可以使用进程池和回调函数进行处理,把请求多个页面等待结果的任务放到进程池中执行,然后进程池里的进程就会同时开始请求不同的页面,当其中一个进程的请求返回了结果,就将该结果传递给回调函数进行分析处理

# 上述的例子:对多个页面进行页面的请求, 获取页面的代码进行分析

import requests
from multiprocessing import Pool


# 把请求的任务放在进程池中处理,因为请求页面需要等待
def get_url(url):
    ret = requests.get(url)
    return {
        'url': url,
        'status_code': ret.status_code,  # 获取请求的状态码
        'content': ret.text  # 获取请求成功后的页面代码
    }


# 等待获取页面的请求结果,然后对请求结果进行处理
def cb(dic):
    print('网址:%s 状态码:%s 页面代码的长度:%s' % (dic['url'], dic['status_code'], len(dic['content'])))


if __name__ == '__main__':
    url_l = [
        'http://www.baidu.com',
        'http://www.sogou.com',
        'http://www.hao123.com',
        'http://www.yangxiaoer.cc',
        'http://www.python.org'
    ]
    p = Pool(4)
    for i in url_l:
        p.apply_async(get_url, args=(i,), callback=cb)
    p.close()
    p.join()