之前学习了线程、进程的概念,了解了在操作系统中进程是资源分配的最小单位,线程是CPU调度的最小单位。按道理来说我们已经算是把cpu的利用率提高很多了。但是我们知道无论是创建多进程还是创建多线程来解决问题,都要消耗一定的时间来创建进程、创建线程、以及管理他们之间的切换, 为了对效率的追求不断提高,基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发。这样就可以节省创建线进程所消耗的时间。
协程: 基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发。这样就可以节省创建线进程所消耗的时间。
协程可以开很多很多,没有上限,切换之间的消耗可以忽略不计
并发的本质: 函数之间切换执行 + 保存状态(即: 一个函数执行上一次的执行位置,以便下次再从该位置开始执行)
1. 使用生成器实现函数之间的切换执行 -> 遇到 IO操作(阻塞)还是会等待 IO操作(阻塞)完后再往下执行,没有实现真正的并发,只是实现了函数之间的切换执行
def fun1():
print(1)
yield
print(3)
yield
def fun2():
g = fun1()
next(g)
print(2)
next(g)
print(4)
fun2()
2. 使用生成器中的 .send() 实现函数之间的切换执行 -> 生产者消费者模型(吃包子例子) -> 遇到 IO操作(阻塞)还是会等待 IO操作(阻塞)完后再往下执行,没有实现真正的并发,只是实现了函数之间的切换执行
def consumer():
while True:
n = yield
print('吃了%s个包子' % n)
def producer():
g = consumer()
next(g) # 第一次使用生成器的时候一定用next获取下一个值,不懂的可以看回生成器中 .send() 方法的注意事项
for i in range(10):
print('生产了%s个包子' % (i + 1))
g.send(i + 1)
producer()
3. greenlet 的安装
pip3 install greenlet -i https://pypi.douban.com/simple # 使用豆瓣的镜像
4. 使用第三方模块 greenlet 实现函数之间的切换执行-> 遇到 IO操作(阻塞)还是会等待 IO操作(阻塞)完后再往下执行,没有实现真正的并发,只是实现了函数之间的切换执行
- 当函数的执行过程中遇见了 .switch() 就会进行函数的切换执行
import time
from greenlet import greenlet
def eat1():
print('food1')
g2.switch() # 切换执行 eat2 函数
time.sleep(5)
print('food3')
g2.switch() # 切换执行 eat2 函数
def eat2():
print('food2')
g1.switch() # 切换执行 eat1 函数
time.sleep(3)
print('food4')
g1 = greenlet(eat1)
g2 = greenlet(eat2)
g1.switch() # 执行 eat1 方法 -> 一定要使用 .switch() 执行函数不然会没有效果
5. 在没有IO操作的情况下进行函数之间的切换执行反而会降低效率
# 串行执行(同步执行)
import time
def consumer(res):
"""任务1:接收数据,处理数据"""
pass
def producer():
"""任务2:生产数据"""
res = []
for i in range(10000000):
res.append(i)
return res
start = time.time()
# 串行执行(同步执行)
res = producer()
consumer(res) # 写成consumer(producer())会降低执行效率
stop = time.time()
print(stop - start) # 1.5536692142486572
# 基于yield并发执行
import time
def consumer():
"""任务1:接收数据,处理数据"""
while True:
x = yield
def producer():
"""任务2:生产数据"""
g = consumer()
next(g)
for i in range(10000000):
g.send(i)
start = time.time()
# 基于yield保存状态,实现两个任务直接来回切换,即并发的效果
# PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
producer()
stop = time.time()
print(stop - start) # 2.0272178649902344
6. gevent 的安装
pip3 install gevent -i https://pypi.douban.com/simple # 使用豆瓣的镜像
7.使用第三方模块 gevent 实现真正的意义上的并发执行,即:协程
- gevent模块是基于greenlet模块 进行封装的
- 遇见 gevent 的自带的 IO 操作(阻塞)就会自动的切换到另一个函数去执行,从而实现并发的效果
- 注意: gevent的操作是完全和程序进行异步,虽然它执行了但是会看不见效果的,解决办法: 使用 g1.join() 或者.joinall([g1, g2]) 等待gevent执行完毕
- gevent.spawn(函数名, 函数接收的参数1, 函数接收的参数2, 函数接收的参数3)
import gevent
def fun1(d1, d2):
print(d1)
gevent.sleep(2) # gevent 自带的 IO 操作,延时执行下方代码 -> 如果使用 time.sleep(2) 它就无法识别这个IO操作(因为不是 gevent 自带的),导致不会自动切换另一个函数去执行,而是等待2秒后继续往下执行
print(d2)
def fun2(d1, d2):
print(d1)
gevent.sleep(2) # gevent 自带的 IO 操作,延时执行下方代码 -> 如果使用 time.sleep(2) 它就无法识别这个IO操作(因为不是 gevent 自带的),导致不会自动切换另一个函数去执行,而是等待2秒后继续往下执行
print(d2)
g1 = gevent.spawn(fun1, 1, 3) # 执行 fun1 函数,当 gevent 遇见它自带的 IO操作(阻塞)就会自动切换另一个函数去执行,从而实现并发
g2 = gevent.spawn(fun2, 2, 4) # 执行 fun2 函数,当 gevent 遇见它自带的 IO操作(阻塞)就会自动切换另一个函数去执行,从而实现并发
# g1.join()
# g2.join()
# 等价于
gevent.joinall([g1, g2]) # 等待所有 gevent 执行完后再往下执行
- 解决 gevent 模块可以识别其他IO阻塞
- 上例gevent.sleep(2)模拟的是gevent可以识别的io阻塞, 而time.sleep(2)或其他的阻塞,gevent是不能直接识别的需要用
- 使用 from gevent import monkey;monkey.patch_all()解决 gevent 模块可以识别其他IO阻塞 -> 把所有IO阻塞操作进行打包放进 gevent 里面
- 如果使用了gevent 模块,就将 from gevent import monkey;monkey.patch_all() 放在程序的起始位置
# 写法一
# from gevent import monkey;monkey.patch_all()
# 写法二
from gevent import monkey
monkey.patch_all()
import gevent
import time
def fun1(d1, d2):
print(d1)
time.sleep(2) # IO阻塞
print(d2)
def fun2(d1, d2):
print(d1)
time.sleep(2) # IO阻塞
print(d2)
g1 = gevent.spawn(fun1, 1, 3) # 执行 fun1 函数,遇到IO阻塞就会自动切换另一个函数去执行,从而实现并发
g2 = gevent.spawn(fun2, 2, 4) # 执行 fun2 函数,遇到IO阻塞就会自动切换另一个函数去执行,从而实现并发
# g1.join()
# g2.join()
# 等价于
gevent.joinall([g1, g2]) # 等待所有 gevent 执行完后再往下执行
8. 使用协程实现TCP协议的多人聊天
# server.py
from gevent import monkey
monkey.patch_all()
import gevent
import socket
def talk(conn):
while True:
ret = conn.recv(1024).decode('utf-8')
print(ret)
conn.send(ret.upper().encode('utf-8'))
sk = socket.socket()
sk.bind(('127.0.0.1', 8080))
sk.listen()
while True:
conn, addr = sk.accept()
gevent.spawn(talk, conn)
# client.py
import socket
from threading import Thread
import time
def my_client():
sk = socket.socket()
sk.connect(('127.0.0.1', 8080))
while True:
sk.send(b'hello')
msg = sk.recv(1024).decode('utf-8')
print(msg)
time.sleep(1)
sk.close()
for i in range(500):
t = Thread(target=my_client)
t.start()
9. 爬取网页信息
# 使用协程爬取网页信息
from gevent import monkey
monkey.patch_all()
import gevent
import requests
import time
def get_url(url):
res = requests.get(url)
print(url, res.status_code, len(res.text))
url_lis = [
'http://www.sohu.com',
'http://www.baidu.com',
'http://www.qq.com',
'http://www.python.org',
'http://www.cnblogs.com',
'http://www.mi.com',
'http://www.apache.org',
'https://www.taobao.com',
'http://www.360.com',
'http://www.7daysinn.cn/'
]
stat_time = time.time()
g_lis = []
for url in url_lis:
g = gevent.spawn(get_url, url)
g_lis.append(g)
gevent.joinall(g_lis)
print(time.time() - stat_time) # 执行时间: 1.5333659648895264
# 不使用协程爬取网页信息
import requests
import time
def get_url(url):
res = requests.get(url)
print(url, res.status_code, len(res.text))
url_lis = [
'http://www.sohu.com',
'http://www.baidu.com',
'http://www.qq.com',
'http://www.python.org',
'http://www.cnblogs.com',
'http://www.mi.com',
'http://www.apache.org',
'https://www.taobao.com',
'http://www.360.com',
'http://www.7daysinn.cn/'
]
stat_time = time.time()
for url in url_lis:
get_url(url)
print(time.time() - stat_time) # 执行时间: 4.400969982147217