在 Python 中,可以使用 concurrent.futures 或 multiprocessing 模块来创建线程池。以下是常用的方法:


1. 使用 concurrent.futures.ThreadPoolExecutor

这是 Python 标准库中最方便创建线程池的方法之一,它提供了易用的线程池接口,可以简单地提交任务,并等待它们完成。

from concurrent.futures import ThreadPoolExecutor

# 定义一个任务函数
def task_function(x):
    return x * x

# 使用线程池
def main():
    # 创建一个线程池,指定线程数
    with ThreadPoolExecutor(max_workers=4) as executor:
        # 提交任务到线程池,返回一个 Future 对象
        future = executor.submit(task_function, 5)
        print(f"Result: {future.result()}")  # 获取任务结果
        
        # 多个任务提交方法1:使用 submit()
        futures = [executor.submit(task_function, i) for i in range(10)]

        # 输出结果
        for future in futures:
            print(future.result())

        # 多个任务提交方法2:使用 map()
        results = executor.map(task_function, range(10))
        print(list(results))  # 输出所有结果

if __name__ == '__main__':
    main()

说明:

  • max_workers:指定线程池中的最大线程数。
  • .submit():向线程池中提交一个任务,返回一个 Future 对象。
  • .map():批量提交任务,返回结果的迭代器。
  • 使用 with 语法创建线程池,线程池会自动关闭,避免资源泄露。

2. 使用 multiprocessing.dummy.Pool

multiprocessing.dummy 是一个基于线程的实现,与 multiprocessing 的 API 一致。使用 Pool 可以很方便地创建线程池并执行并发任务。

from multiprocessing.dummy import Pool

# 定义任务函数
def task_function(x):
    return x * x

# 使用线程池
def main():
    # 创建一个线程池,指定线程数
    pool = Pool(4)

    # 使用 map 方法分发任务
    results = pool.map(task_function, range(10))
    print(results)

    # 使用 apply_async 方法提交单个任务
    async_result = pool.apply_async(task_function, (5,))
    print(async_result.get())  # 获取任务结果
    
    # 关闭线程池
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

说明:

  • Pool(4):创建一个最多包含 4 个线程的线程池。
  • .map():将函数应用于每个输入(可迭代),返回结果列表。
  • .apply_async():提交异步任务。
  • .close() 和 .join():关闭线程池并等待所有线程完成。

3. 手动创建线程池(不推荐)

如果不使用库,也可以手动创建线程池。这种方式需要更多处理,比如线程的启动、任务队列管理、回调和终止线程。

import threading
import queue

class ThreadPool:
    def __init__(self, num_threads):
        self.tasks = queue.Queue()
        self.threads = []

        # 创建线程
        for _ in range(num_threads):
            thread = threading.Thread(target=self.worker)
            thread.daemon = True  # 确保主线程退出时自动回收
            thread.start()
            self.threads.append(thread)
    
    def worker(self):
        while True:
            func, args, kwargs = self.tasks.get()
            try:
                func(*args, **kwargs)
            finally:
                self.tasks.task_done()

    def submit(self, func, *args, **kwargs):
        self.tasks.put((func, args, kwargs))

    def wait(self):
        self.tasks.join()

# 使用线程池
def task_function(x):
    print(f"Task {x} completed in thread {threading.current_thread().name}")

if __name__ == '__main__':
    pool = ThreadPool(num_threads=4)
    for i in range(10):
        pool.submit(task_function, i)
    pool.wait()

说明:

  • 手动实现线程池需要管理任务队列和线程生命周期。
  • 不推荐这种方法,因为标准库已经封装好这些功能。

比较与总结

方法优点缺点
concurrent.futures.ThreadPoolExecutor最易用,支持 submit() 和 map() 函数缺乏复杂任务队列管理能力
multiprocessing.dummy.Pool与 multiprocessing.Pool API 一致较少高级功能
手动实现线程池灵活,可完全控制线程和任务逻辑代码复杂,容易出错

在大多数情况下,推荐使用 concurrent.futures.ThreadPoolExecutor,它易用且功能全面,适合现代 Python 开发。



Python如何创建线程池?插图

关注公众号:程序新视界,一个让你软实力、硬技术同步提升的平台

除非注明,否则均为程序新视界原创文章,转载必须以链接形式标明本文链接

本文链接:http://www.choupangxia.com/2025/08/02/python-create-theadpool/