- 进程Process、线程Thread、协程Coroutine
- 协程
- 多线程
- Asyncio和ThreadPoolExecutor
- 多进程
- multi processing and asyncio
- multi processing and concurrent
进程Process、线程Thread、协程Coroutine
进程: 对应处理器, 一个处理器运行一个进程, 多个进程可以并行Parallelism, 也就是说多个任务同时执行
线程:一个进程里可以运行多个线程, 多个线程可以并发concurrent, 也就是交替执行
协程: 协程是在单线程里, 多个子程序可以交替执行实现并发, 因为是单线程, 所以可以实现资源共享
协程
看示例代码:
import asyncio
import threading
import time
async def foo():
print('foo start')
print("foo thread is {}".format(threading.currentThread()))
await asyncio.sleep(1)
print('foo finish')
async def bar():
print('bar start')
print("bar thread is {}".format(threading.currentThread()))
await asyncio.sleep(1)
print('bar finish')
print("main thread is {}".format(threading.currentThread()))
start = time.time()
loop = asyncio.get_event_loop()
tasks = [foo(), bar()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.time()
print("运行时间是: {}".format(end-start))
运行结果是:
main thread is <_MainThread(MainThread, started 140735607858048)>
foo start
foo thread is <_MainThread(MainThread, started 140735607858048)>
bar start
bar thread is <_MainThread(MainThread, started 140735607858048)>
foo finish
bar finish
运行时间是: 1.005871295928955
我们可以看到foo和bar两个函数交替执行, 两个子线程和主线程是同一个线程, 执行时间是1秒, 如果我们顺序执行两个函数, 那么时间也会是1秒
协程相比多线程效率更高, 因为没有切换线程所需要的开销,而且很重要的是可以共享资源(下面我们会解释共享资源)
但是我发现一个问题, 这里我们用的是await asyncio.sleep(1)
来模拟阻塞(这里也可以使用aiohttp, 应用在io请求的场景下, 比如web请求, 比如爬虫等等)
但是实际开发中, 阻塞代码可能千差万别, 我们实际上需要await我们自定义的一个函数, 这个函数可能包含了复杂的逻辑, 可能有http请求, 可能有数据库读写, asyncio应该提供这样的接口, 但是我不想拆分的那么细, 拆分也面临很多实际困难, 那么应该怎么办呢?
多线程
上面的问题暂且不表, 我们先说说多线程, 实现多线程, 我们可以使用concurrent.futures.ThreadPoolExecutor 示例代码:
import threading
import time
from concurrent.futures import ThreadPoolExecutor
_executor = ThreadPoolExecutor(2)
def foo(array):
print("foo start")
print("foo thread {}".format(threading.currentThread()))
time.sleep(1)
array.append("foo")
print("foo end time is {}".format(time.time()))
def bar(array):
print("bar start")
print("bar thread {}".format(threading.currentThread()))
time.sleep(1)
array.append("bar")
print("bar end time is {}".format(time.time()))
start = time.time()
print("start time is {}".format(time.time()))
print("main thread {}".format(threading.currentThread()))
my_list = []
print("init list is {}".format(my_list))
executor = ThreadPoolExecutor(max_workers=2)
executor.submit(foo, my_list)
executor.submit(bar, my_list)
print("finally list is {}".format(my_list))
end = time.time()
print("运行时间是: {}".format(end-start))
运行结果是:
start time is 1615809735.847754
main thread <_MainThread(MainThread, started 140735607858048)>
init list is []
foo start
foo thread <Thread(ThreadPoolExecutor-1_0, started daemon 123145423056896)>
bar start
finally list is []
bar thread <Thread(ThreadPoolExecutor-1_1, started daemon 123145428312064)>
运行时间是: 0.0009667873382568359
foo end time is 1615809736.848752
bar end time is 1615809736.8536499
我们可以看到, 主线程和执行a、b的线程是3个不同的线程, 主线程不会等a、b执行完而顺序执行, 同样的, 执行时间也在1秒左右
(我们我们配置线程是是1, ThreadPoolExecutor(1), 那么a、b会在同一个线程里顺序执行, 总时间会变成2, 这里就不在赘述了)
我们还可以看到, 我们定义的my_list没有变化, 看来也没有共享数据
那么问题来了, 虽然我们可以用此方法实现并发, 提高运行效率, 但是我们想实现在a、b和主线程在一个线程里, 且能共享数据, 应该怎么办呢?
Asyncio和ThreadPoolExecutor
为了解决上面的问题, 我们Asyncio和ThreadPoolExecutor结合起来, 看示例代码:
import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor
_executor = ThreadPoolExecutor(2)
async def foo(array):
print('foo start')
print("foo thread is {}".format(threading.currentThread()))
foo_loop = asyncio.get_event_loop()
await foo_loop.run_in_executor(_executor, my_func, array, "foo")
print('foo finish')
async def bar(array):
print('bar start')
print("bar thread is {}".format(threading.currentThread()))
bar_loop = asyncio.get_event_loop()
await bar_loop.run_in_executor(_executor, my_func, array, "bar")
print('bar finish')
def my_func(array, item):
print("my_func thread is {}".format(threading.currentThread()))
array.append(item)
time.sleep(1)
print("main thread is {}".format(threading.currentThread()))
start = time.time()
my_list = []
print("init list is {}".format(my_list))
loop = asyncio.get_event_loop()
tasks = [foo(my_list), bar(my_list)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.time()
print("finally list is {}".format(my_list))
print("运行时间是: {}".format(end-start))
运行结果是:
main thread is <_MainThread(MainThread, started 140735607858048)>
init list is []
bar start
bar thread is <_MainThread(MainThread, started 140735607858048)>
my_func thread is <Thread(ThreadPoolExecutor-0_0, started daemon 123145474777088)>
foo start
foo thread is <_MainThread(MainThread, started 140735607858048)>
my_func thread is <Thread(ThreadPoolExecutor-0_1, started daemon 123145480032256)>
bar finish
foo finish
finally list is ['bar', 'foo']
运行时间是: 1.0060360431671143
我们可以看到, 主函数和foo、bar函数同属于一个线程, my_func函数的线程不一样, 但是, 我们发现是可以共享数据的, 这里我也不知道怎么解释
只能解释为执行my_func的线程还是和主线程同属一个线程
这样我们解决了上面的问题
我们对上面的代码做一下优化:
import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor
async def foo(array, item):
print('foo start')
print("foo thread is {}".format(threading.currentThread()))
bar_loop = asyncio.get_event_loop()
_executor = ThreadPoolExecutor(1)
await bar_loop.run_in_executor(_executor, my_func, array, item)
print('foo finish')
def my_func(array, item):
print("my_func thread is {}".format(threading.currentThread()))
array.append(item)
time.sleep(1)
print("main thread is {}".format(threading.currentThread()))
start = time.time()
my_list = []
print("init list is {}".format(my_list))
loop = asyncio.get_event_loop()
tasks = [foo(my_list, i) for i in range(10)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.time()
print("finally list is {}".format(my_list))
print("运行时间是: {}".format(end-start))
优化了两方面:
- tasks使用了对同一个函数循环添加到list, 更贴近实际场景
- _executor定义在函数内, max workers数量可以定义为1, 也可以不传参数采用默认值, 实际上只需要一个就可以了
运行结果是:
main thread is <_MainThread(MainThread, started 140735693562752)> init list is [] foo start foo thread is <_MainThread(MainThread, started 140735693562752)> my_func thread is <Thread(ThreadPoolExecutor-0_0, started daemon 123145324679168)> foo start foo thread is <_MainThread(MainThread, started 140735693562752)> my_func thread is <Thread(ThreadPoolExecutor-1_0, started daemon 123145329934336)> foo start foo thread is <_MainThread(MainThread, started 140735693562752)> my_func thread is <Thread(ThreadPoolExecutor-2_0, started daemon 123145335189504)> foo start foo thread is <_MainThread(MainThread, started 140735693562752)> my_func thread is <Thread(ThreadPoolExecutor-3_0, started daemon 123145340444672)> foo start foo thread is <_MainThread(MainThread, started 140735693562752)> my_func thread is <Thread(ThreadPoolExecutor-4_0, started daemon 123145345699840)> foo start foo thread is <_MainThread(MainThread, started 140735693562752)> my_func thread is <Thread(ThreadPoolExecutor-5_0, started daemon 123145350955008)> foo start foo thread is <_MainThread(MainThread, started 140735693562752)> my_func thread is <Thread(ThreadPoolExecutor-6_0, started daemon 123145356210176)> foo start foo thread is <_MainThread(MainThread, started 140735693562752)> my_func thread is <Thread(ThreadPoolExecutor-7_0, started daemon 123145361465344)> foo start foo thread is <_MainThread(MainThread, started 140735693562752)> my_func thread is <Thread(ThreadPoolExecutor-8_0, started daemon 123145366720512)> foo start foo thread is <_MainThread(MainThread, started 140735693562752)> my_func thread is <Thread(ThreadPoolExecutor-9_0, started daemon 123145371975680)> foo finish foo finish foo finish foo finish foo finish foo finish foo finish foo finish foo finish foo finish finally list is [2, 6, 0, 7, 8, 3, 9, 4, 1, 5] 运行时间是: 1.0114827156066895
多进程
对于IO型程序, 为了减少IO等待带来的效率地下, 我们可以使用多线程和协程
但是对于计算型程序, 没有IO等待的问题, 为了提高效率, 就可以利用多进程, 将多核处理器充分利用起来
对于多进程的实现, 我们可以使concurrent.futures.ProcessPoolExecutor
import concurrent.futures
import math
import time
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n, m=None):
time.sleep(1)
print("{}-{}".format(n ,m))
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
# 这里有三种写法, 可以灵活使用
# 第一种
# futures = {executor.submit(is_prime, i): i for i in PRIMES}
# for f in concurrent.futures.as_completed(futures):
# i = futures[f]
# data = f.result()
# print("{} {}".format(i, data))
# 第二种, 注: 如果is_prime有两个参数, submit则增加一个参数
# for i in PRIMES:
# executor.submit(is_prime, i, 2)
# 第三种
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
t1 = time.time()
main()
print("耗时{}".format(time.time()-t1))
multi processing and asyncio
我们让多进程和asyncio结合一下, 看看是什么情况:
import asyncio
import threading
import time
from concurrent.futures import ProcessPoolExecutor
async def foo(array):
print('foo start')
print("foo thread is {}".format(threading.currentThread()))
foo_loop = asyncio.get_event_loop()
_executor = ProcessPoolExecutor()
tasks = [foo_loop.run_in_executor(_executor, my_func, array, item) for item in range(10)]
completed, pending = await asyncio.wait(tasks)
results = [t.result() for t in completed]
print('results: {!r}'.format(results))
print('foo finish')
return results
def my_func(array, item):
print("my_func thread is {}".format(threading.currentThread()))
array.append(item)
time.sleep(1)
return item
print("main thread is {}".format(threading.currentThread()))
start = time.time()
my_list = []
print("init my list is {}".format(my_list))
loop = asyncio.get_event_loop()
result = loop.run_until_complete(foo(my_list))
loop.close()
end = time.time()
print("finally my list is {}".format(my_list))
print("result list is {}".format(result))
print("运行时间是: {}".format(end-start))
运行结果是:
main thread is <_MainThread(MainThread, started 140736046461824)>
init my list is []
foo start
foo thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
results: [4, 9, 5, 1, 6, 2, 7, 3, 8, 0]
foo finish
finally my list is []
result list is [4, 9, 5, 1, 6, 2, 7, 3, 8, 0]
运行时间是: 3.021937847137451
我们看到, 运行时间变长了, 因为sleep相当于IO等待, 适用于多线程. 在这里使用多进程, 因为我的电脑CPU数量是4, 有10个任务, 相当于4个CPU要同时运行3次, 所以总时间在3秒
同时我们看到list init和fanally没有变化, 也印证了多进程没有办法共享数据
但是我们欣喜的看到: results: [4, 9, 5, 1, 6, 2, 7, 3, 8, 0]
, 我们通过completed, pending = await asyncio.wait(tasks)
可以将多个进程里执行的任务的结果组织起来, 这样建立了联系, 在某些场景里是很有用的.
multi processing and concurrent
上面已经说到了多进程和协程, 那么如何让多进程与协程结合呢?