Process、Thread、Coroutine in Python

进程Process、线程Thread、协程Coroutine

进程: 对应处理器, 一个处理器运行一个进程, 多个进程可以并行Parallelism, 也就是说多个任务同时执行
线程:一个进程里可以运行多个线程, 多个线程可以并发concurrent, 也就是交替执行
协程: 协程是在单线程里, 多个子程序可以交替执行实现并发, 因为是单线程, 所以可以实现资源共享

协程

看示例代码:

import asyncio
import threading
import time


async def foo():
    print('foo start')
    print("foo thread is {}".format(threading.currentThread()))
    await asyncio.sleep(1)
    print('foo finish')


async def bar():
    print('bar start')
    print("bar thread is {}".format(threading.currentThread()))
    await asyncio.sleep(1)
    print('bar finish')


print("main thread is {}".format(threading.currentThread()))
start = time.time()
loop = asyncio.get_event_loop()
tasks = [foo(), bar()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.time()
print("运行时间是: {}".format(end-start))

运行结果是:

main thread is <_MainThread(MainThread, started 140735607858048)>
foo start
foo thread is <_MainThread(MainThread, started 140735607858048)>
bar start
bar thread is <_MainThread(MainThread, started 140735607858048)>
foo finish
bar finish
运行时间是: 1.005871295928955

我们可以看到foo和bar两个函数交替执行, 两个子线程和主线程是同一个线程, 执行时间是1秒, 如果我们顺序执行两个函数, 那么时间也会是1秒

协程相比多线程效率更高, 因为没有切换线程所需要的开销,而且很重要的是可以共享资源(下面我们会解释共享资源)

但是我发现一个问题, 这里我们用的是await asyncio.sleep(1)来模拟阻塞(这里也可以使用aiohttp, 应用在io请求的场景下, 比如web请求, 比如爬虫等等)
但是实际开发中, 阻塞代码可能千差万别, 我们实际上需要await我们自定义的一个函数, 这个函数可能包含了复杂的逻辑, 可能有http请求, 可能有数据库读写, asyncio应该提供这样的接口, 但是我不想拆分的那么细, 拆分也面临很多实际困难, 那么应该怎么办呢?

多线程

上面的问题暂且不表, 我们先说说多线程, 实现多线程, 我们可以使用concurrent.futures.ThreadPoolExecutor 示例代码:

import threading
import time
from concurrent.futures import ThreadPoolExecutor

_executor = ThreadPoolExecutor(2)


def foo(array):
    print("foo start")
    print("foo thread {}".format(threading.currentThread()))
    time.sleep(1)
    array.append("foo")
    print("foo end time is {}".format(time.time()))


def bar(array):
    print("bar start")
    print("bar thread {}".format(threading.currentThread()))
    time.sleep(1)
    array.append("bar")
    print("bar end time is {}".format(time.time()))


start = time.time()
print("start time is {}".format(time.time()))
print("main thread {}".format(threading.currentThread()))
my_list = []
print("init list is {}".format(my_list))
executor = ThreadPoolExecutor(max_workers=2)
executor.submit(foo, my_list)
executor.submit(bar, my_list)
print("finally list is {}".format(my_list))
end = time.time()
print("运行时间是: {}".format(end-start))

运行结果是:

start time is 1615809735.847754
main thread <_MainThread(MainThread, started 140735607858048)>
init list is []
foo start
foo thread <Thread(ThreadPoolExecutor-1_0, started daemon 123145423056896)>
bar start
finally list is []
bar thread <Thread(ThreadPoolExecutor-1_1, started daemon 123145428312064)>
运行时间是: 0.0009667873382568359
foo end time is 1615809736.848752
bar end time is 1615809736.8536499

我们可以看到, 主线程和执行a、b的线程是3个不同的线程, 主线程不会等a、b执行完而顺序执行, 同样的, 执行时间也在1秒左右
(我们我们配置线程是是1, ThreadPoolExecutor(1), 那么a、b会在同一个线程里顺序执行, 总时间会变成2, 这里就不在赘述了)
我们还可以看到, 我们定义的my_list没有变化, 看来也没有共享数据

那么问题来了, 虽然我们可以用此方法实现并发, 提高运行效率, 但是我们想实现在a、b和主线程在一个线程里, 且能共享数据, 应该怎么办呢?

Asyncio和ThreadPoolExecutor

为了解决上面的问题, 我们Asyncio和ThreadPoolExecutor结合起来, 看示例代码:

import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor

_executor = ThreadPoolExecutor(2)

async def foo(array):
    print('foo start')
    print("foo thread is {}".format(threading.currentThread()))
    foo_loop = asyncio.get_event_loop()
    await foo_loop.run_in_executor(_executor, my_func, array, "foo")
    print('foo finish')


async def bar(array):
    print('bar start')
    print("bar thread is {}".format(threading.currentThread()))
    bar_loop = asyncio.get_event_loop()
    await bar_loop.run_in_executor(_executor, my_func, array, "bar")
    print('bar finish')


def my_func(array, item):
    print("my_func thread is {}".format(threading.currentThread()))
    array.append(item)
    time.sleep(1)


print("main thread is {}".format(threading.currentThread()))
start = time.time()
my_list = []
print("init list is {}".format(my_list))
loop = asyncio.get_event_loop()
tasks = [foo(my_list), bar(my_list)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.time()
print("finally list is {}".format(my_list))
print("运行时间是: {}".format(end-start))

运行结果是:

main thread is <_MainThread(MainThread, started 140735607858048)>
init list is []
bar start
bar thread is <_MainThread(MainThread, started 140735607858048)>
my_func thread is <Thread(ThreadPoolExecutor-0_0, started daemon 123145474777088)>
foo start
foo thread is <_MainThread(MainThread, started 140735607858048)>
my_func thread is <Thread(ThreadPoolExecutor-0_1, started daemon 123145480032256)>
bar finish
foo finish
finally list is ['bar', 'foo']
运行时间是: 1.0060360431671143

我们可以看到, 主函数和foo、bar函数同属于一个线程, my_func函数的线程不一样, 但是, 我们发现是可以共享数据的, 这里我也不知道怎么解释
只能解释为执行my_func的线程还是和主线程同属一个线程
这样我们解决了上面的问题

我们对上面的代码做一下优化:

import asyncio
import threading
import time
from concurrent.futures import ThreadPoolExecutor


async def foo(array, item):
    print('foo start')
    print("foo thread is {}".format(threading.currentThread()))
    bar_loop = asyncio.get_event_loop()
    _executor = ThreadPoolExecutor(1)
    await bar_loop.run_in_executor(_executor, my_func, array, item)
    print('foo finish')


def my_func(array, item):
    print("my_func thread is {}".format(threading.currentThread()))
    array.append(item)
    time.sleep(1)


print("main thread is {}".format(threading.currentThread()))
start = time.time()
my_list = []
print("init list is {}".format(my_list))
loop = asyncio.get_event_loop()
tasks = [foo(my_list, i) for i in range(10)]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
end = time.time()
print("finally list is {}".format(my_list))
print("运行时间是: {}".format(end-start))

优化了两方面:

  1. tasks使用了对同一个函数循环添加到list, 更贴近实际场景
  2. _executor定义在函数内, max workers数量可以定义为1, 也可以不传参数采用默认值, 实际上只需要一个就可以了 运行结果是:
    main thread is <_MainThread(MainThread, started 140735693562752)>
    init list is []
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-0_0, started daemon 123145324679168)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-1_0, started daemon 123145329934336)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-2_0, started daemon 123145335189504)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-3_0, started daemon 123145340444672)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-4_0, started daemon 123145345699840)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-5_0, started daemon 123145350955008)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-6_0, started daemon 123145356210176)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-7_0, started daemon 123145361465344)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-8_0, started daemon 123145366720512)>
    foo start
    foo thread is <_MainThread(MainThread, started 140735693562752)>
    my_func thread is <Thread(ThreadPoolExecutor-9_0, started daemon 123145371975680)>
    foo finish
    foo finish
    foo finish
    foo finish
    foo finish
    foo finish
    foo finish
    foo finish
    foo finish
    foo finish
    finally list is [2, 6, 0, 7, 8, 3, 9, 4, 1, 5]
    运行时间是: 1.0114827156066895
    

多进程

对于IO型程序, 为了减少IO等待带来的效率地下, 我们可以使用多线程和协程
但是对于计算型程序, 没有IO等待的问题, 为了提高效率, 就可以利用多进程, 将多核处理器充分利用起来
对于多进程的实现, 我们可以使concurrent.futures.ProcessPoolExecutor

import concurrent.futures
import math
import time

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n, m=None):
    time.sleep(1)
    print("{}-{}".format(n ,m))

    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        # 这里有三种写法, 可以灵活使用

        # 第一种
        # futures = {executor.submit(is_prime, i): i for i in PRIMES}
        # for f in concurrent.futures.as_completed(futures):
        #     i = futures[f]
        #     data = f.result()
        #     print("{} {}".format(i, data))

        # 第二种, 注: 如果is_prime有两个参数, submit则增加一个参数
        # for i in PRIMES:
        #     executor.submit(is_prime, i, 2)

        # 第三种
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    t1 = time.time()
    main()
    print("耗时{}".format(time.time()-t1))

multi processing and asyncio

我们让多进程和asyncio结合一下, 看看是什么情况:

import asyncio
import threading
import time
from concurrent.futures import ProcessPoolExecutor


async def foo(array):
    print('foo start')
    print("foo thread is {}".format(threading.currentThread()))
    foo_loop = asyncio.get_event_loop()
    _executor = ProcessPoolExecutor()
    tasks = [foo_loop.run_in_executor(_executor, my_func, array, item) for item in range(10)]
    completed, pending = await asyncio.wait(tasks)
    results = [t.result() for t in completed]
    print('results: {!r}'.format(results))
    print('foo finish')
    return results


def my_func(array, item):
    print("my_func thread is {}".format(threading.currentThread()))
    array.append(item)
    time.sleep(1)
    return item


print("main thread is {}".format(threading.currentThread()))
start = time.time()
my_list = []
print("init my list is {}".format(my_list))
loop = asyncio.get_event_loop()
result = loop.run_until_complete(foo(my_list))
loop.close()
end = time.time()
print("finally my list is {}".format(my_list))
print("result list is {}".format(result))
print("运行时间是: {}".format(end-start))

运行结果是:

main thread is <_MainThread(MainThread, started 140736046461824)>
init my list is []
foo start
foo thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
my_func thread is <_MainThread(MainThread, started 140736046461824)>
results: [4, 9, 5, 1, 6, 2, 7, 3, 8, 0]
foo finish
finally my list is []
result list is [4, 9, 5, 1, 6, 2, 7, 3, 8, 0]
运行时间是: 3.021937847137451

我们看到, 运行时间变长了, 因为sleep相当于IO等待, 适用于多线程. 在这里使用多进程, 因为我的电脑CPU数量是4, 有10个任务, 相当于4个CPU要同时运行3次, 所以总时间在3秒
同时我们看到list init和fanally没有变化, 也印证了多进程没有办法共享数据
但是我们欣喜的看到: results: [4, 9, 5, 1, 6, 2, 7, 3, 8, 0], 我们通过completed, pending = await asyncio.wait(tasks)可以将多个进程里执行的任务的结果组织起来, 这样建立了联系, 在某些场景里是很有用的.

multi processing and concurrent

上面已经说到了多进程和协程, 那么如何让多进程与协程结合呢?

实现方式参照: https://pypi.org/project/aiomultiprocess/