线程池的作用: 比如有100任务需要执行,那么我们不可能开100个线程来执行任务,这样会导致程序执行的很慢,但是我们可以造一个池子里面放着4个线程,每个线程执行一个任务,当其中一个线程执行完了任务,下一个任务就会进入该线程开始执行,以此类推。这样程序就会执行的很快
开多线程,线程数超过3个以上就使用线程池来处理
前几年是没有线程池,后面才推出了 concurrent 模块
concurrent 模块中的 futures 提供了两个方法:
- futures.ThreadPoolExecutor(num) -> 创建线程池
- futures.ProcessPoolExecutor(num) -> 创建进程池
通过 concurrent 模块中的 futures 创建的线程池和进程池的用法都是一样的,所提供的方法都是一样,在这里就不写进程池的用法了
调用线程/进程池的方法:
- submit(函数, '参数')
- map(函数, 可迭代对象)
注意: 在本章节中 ThreadPoolExecutor(5) 为 5 是因为可以看到效果,在正式开发中默认不填就可以了,因为源码已经帮你做了处理
1. .submit(函数, '参数') -> 天生异步执行
from concurrent import futures
def fun(i):
i += 1
print(i)
thread_pool = futures.ThreadPoolExecutor(5) # 创建num个线程来处理任务,如果不传默认是 os.cpu_count()(cpu数量) * 5
for i in range(20):
f = thread_pool.submit(fun, i) # 使用线程池 -> summit() 合并了创建线程的对象 和 调用 start() 方法
print('-----主线程-----')
2. .map(函数, 可迭代对象) -> 天生异步执行,可迭代对象的长度就是任务的数量,函数每一次都会接收可迭代对象里面的一个值进行处理
from concurrent import futures
def fun(i):
i += 1
print(i)
thread_pool = futures.ThreadPoolExecutor(5) # 创建num个线程来处理任务,如果不传默认是 os.cpu_count()(cpu数量) * 5
thread_pool.map(fun, range(20)) # 使用线程池
print('-----主线程-----')
3..shutdown() -> 等待所有子线程执行完再往下执行
from concurrent import futures
def fun(i):
i += 1
print(i)
thread_pool = futures.ThreadPoolExecutor(5) # 创建num个线程来处理任务,如果不传默认是 os.cpu_count()(cpu数量) * 5
for i in range(20):
thread_pool.submit(fun, i) # 使用线程池
thread_pool.shutdown() # 等待所有子线程结束后再往下执行
print('---- 主线程 ----')
4. .result() -> 获取线程返回值
- map方法是没有返回值的
import time
import random
from concurrent import futures
def fun(i):
i += 1
time.sleep(random.random())
return i
thread_pool = futures.ThreadPoolExecutor(5) # 创建num个线程来处理任务,如果不传默认是 os.cpu_count()(cpu数量) * 5
r_lis = []
for i in range(20):
res = thread_pool.submit(fun, i) # 使用线程池
r_lis.append(res)
# print(res.result()) # 注意:不要在循环中进行获取线程的返回值,因为 .result() 方法就是在等待着获取值,这样会将异步执行变成了同步执行,而是将得到的线程对象放进列表中,等待线程结束后再循环列表再使用 .result() 获取返回值
thread_pool.shutdown() # 等待所有子线程结束后再往下执行
for i in r_lis:
print(i.result()) # 获取线程返回值 -> 注意:如果没有 thread_pool.shutdown() ,只要 .result() 一获取到值就开始返回了,可以把 thread_pool.shutdown() 注释起来看一下
print('---- 主线程 ----')
5..add_done_callback(回调函数) -> 回调函数
- 回调函数其实就是接收线程的返回值后再做一次处理
- map 没有回调函数
- 使用了回调函数就没有了返回值
import time
import random
from concurrent import futures
# 线程函数
def fun(i):
i += 1
time.sleep(random.random())
return i
# 回调函数
def cb(args):
print(args.result() + 1) # 使用 .result() 方法接收线程的返回值
thread_pool = futures.ThreadPoolExecutor(5)
thread_pool.submit(fun, 1).add_done_callback(cb) # .add_done_callback() 使用回调函数
- 回调函数的使用场景
- 线程池中任何一个任务一旦处理完了,就立即告知主线程:我好了额,你可以处理我的结果了。主线程则调用一个函数去处理该结果,该函数即回调函数
- 我们可以把耗时间(阻塞)的任务放到线程池中,然后指定回调函数(主线程负责执行),这样主线程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果
- 回调函数一般会用于爬虫的比较多,因为爬虫需要等待页面返回结果,例如下面的例子
- 例如: 我们要对10个页面进行页面的请求,获取页面的代码进行分析,我们不可能等待一个页面请求完拿到页面代码进行处理后,再去请求第二个页面,这样会很浪费时间的,我们可以使用线程池和回调函数进行处理,把请求多个页面等待结果的任务放到线程池中执行,然后线程池里的线程就会同时开始请求不同的页面,当其中一个线程的请求返回了结果,就将该结果传递给回调函数进行分析处理
import requests
from concurrent import futures
# 把请求的任务放在线程池中处理,因为请求页面需要等待 -> 线程函数
def get_url(url):
ret = requests.get(url)
return {
'url': url,
'status_code': ret.status_code, # 获取请求的状态码
'content': ret.text # 获取请求成功后的页面代码
}
# 等待获取页面的请求结果,然后对请求结果进行处理 -> 回调函数
def cb(dic):
dic = dic.result()
print('网址:%s 状态码:%s 页面代码的长度:%s' % (dic['url'], dic['status_code'], len(dic['content'])))
# 主线程
url_l = [
'http://www.baidu.com',
'http://www.sogou.com',
'http://www.hao123.com',
'http://www.yangxiaoer.cc',
'http://www.python.org'
]
thread_pool = futures.ThreadPoolExecutor(5)
for i in url_l:
thread_pool.submit(get_url, i).add_done_callback(cb)
← 线程概念(重要) 线程间通信 - 队列 →