python并发并行性能测试

Posted by Shi Hai's Blog on December 23, 2021

python支持并发/并行的手段主要是asyncio、threading以及multiprocessing。
我的测试环境是4核虚拟机。

asyncio

import asyncio
import time


async def count(number):
    result = sum(i * i for i in range(number))
    return result


async def cacluate_count(numbers):
    tasks = [asyncio.create_task(count(number)) for number in numbers]
    result = await asyncio.gather(*tasks)


def main():
    begin_time = time.perf_counter()
    numbers = [1000000 + x for x in range(20)]
    asyncio.run(cacluate_count(numbers))
    end_time = time.perf_counter()
    print('Total cost time is: ', end_time -  begin_time)


if __name__ == '__main__':
    main()

结果输出是:Total cost time is: 5.781235985457897。(执行五次取得的中位数)

threading

import  threading
import time


def count(number):
    result = sum(i * i for i in range(number))
    return result


def cacluate_count(numbers):
    threads = [threading.Thread(target=count, args=(number,)) for number in numbers]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()


def main():
    begin_time = time.perf_counter()
    numbers = [1000000 + x for x in range(20)]
    cacluate_count(numbers)
    end_time = time.perf_counter()
    print('Total cost time is: ', end_time -  begin_time)


if __name__ == '__main__':
    main()

结果输出是:Total cost time is: 5.884195368736982。(执行五次取得的中位数)

multiprocessing

import multiprocessing
import time


def count(number):
    result = sum(i * i for i in range(number))
    return result


def cacluate_count(numbers):
    processes = [multiprocessing.Process(target=count, args=(number,)) for number in numbers]
    for process in processes:
        process.start()
    for process in processes:
        process.join()


def main():
    begin_time = time.perf_counter()
    numbers = [1000000 + x for x in range(20)]
    cacluate_count(numbers)
    end_time = time.perf_counter()
    print('Total cost time is: ', end_time -  begin_time)


if __name__ == '__main__':
    main()

结果输出是:Total cost time is: 1.502054424956441

threading+subinterpreters

_xxsubinterpreters模块是python编译器未对外正式发布的模块,如果想要用此模块则需要重新编译python编译器(用此命令行重新编译./configure -with-experimental-isolated-subinterpreters && make -j)。

import threading
import time
import _xxsubinterpreters as subinterpreters


def count(number):
    interp = subinterpreters.create()
    subinterpreters.run_string(interp,
        f"sum(i * i for i in range({number}))")
    return


def cacluate_count(numbers):
    threads = [threading.Thread(target=count, args=(number,)) for number in numbers]
    for thread in threads:
        thread.start()
    for thread in threads:
        thread.join()


def main():
    begin_time = time.perf_counter()
    numbers = [1000000 + x for x in range(20)]
    cacluate_count(numbers)
    end_time = time.perf_counter()
    print('Total cost time is: ', end_time -  begin_time)


if __name__ == '__main__':
    main()

结果输出是:Total cost time is: 5.727610696107149。这里看出此运行方式实际性能和普通多线程或者协程效率几乎一样。GIL这个问题还要继续努力干掉。

用pypy执行

用pypy执行本文中提到的多线程代码,输出为Total cost time is: 0.17054572142660618,执行本文中的多进程代码,输出为Total cost time is: 0.14108090475201607

总结

从上面测试结果看,在我的4核cpu虚拟机上,进程的效率大概是线程和协程的4倍,协程的效率略快于线程,主要原因是线程上下文切换和GIL锁获取的操作上。
pypy执行过程效率高的恐怖,对这类JIT框架暂时不太了解,后续在补充。