Python如何创建线程池?
在 Python 中,可以使用 concurrent.futures
或 multiprocessing
模块来创建线程池。以下是常用的方法:
1. 使用 concurrent.futures.ThreadPoolExecutor
这是 Python 标准库中最方便创建线程池的方法之一,它提供了易用的线程池接口,可以简单地提交任务,并等待它们完成。
from concurrent.futures import ThreadPoolExecutor # 定义一个任务函数 def task_function(x): return x * x # 使用线程池 def main(): # 创建一个线程池,指定线程数 with ThreadPoolExecutor(max_workers=4) as executor: # 提交任务到线程池,返回一个 Future 对象 future = executor.submit(task_function, 5) print(f"Result: {future.result()}") # 获取任务结果 # 多个任务提交方法1:使用 submit() futures = [executor.submit(task_function, i) for i in range(10)] # 输出结果 for future in futures: print(future.result()) # 多个任务提交方法2:使用 map() results = executor.map(task_function, range(10)) print(list(results)) # 输出所有结果 if __name__ == '__main__': main()
说明:
max_workers
:指定线程池中的最大线程数。.submit()
:向线程池中提交一个任务,返回一个Future
对象。.map()
:批量提交任务,返回结果的迭代器。- 使用
with
语法创建线程池,线程池会自动关闭,避免资源泄露。
2. 使用 multiprocessing.dummy.Pool
multiprocessing.dummy
是一个基于线程的实现,与 multiprocessing
的 API 一致。使用 Pool
可以很方便地创建线程池并执行并发任务。
from multiprocessing.dummy import Pool # 定义任务函数 def task_function(x): return x * x # 使用线程池 def main(): # 创建一个线程池,指定线程数 pool = Pool(4) # 使用 map 方法分发任务 results = pool.map(task_function, range(10)) print(results) # 使用 apply_async 方法提交单个任务 async_result = pool.apply_async(task_function, (5,)) print(async_result.get()) # 获取任务结果 # 关闭线程池 pool.close() pool.join() if __name__ == '__main__': main()
说明:
Pool(4)
:创建一个最多包含 4 个线程的线程池。.map()
:将函数应用于每个输入(可迭代),返回结果列表。.apply_async()
:提交异步任务。.close()
和.join()
:关闭线程池并等待所有线程完成。
3. 手动创建线程池(不推荐)
如果不使用库,也可以手动创建线程池。这种方式需要更多处理,比如线程的启动、任务队列管理、回调和终止线程。
import threading import queue class ThreadPool: def __init__(self, num_threads): self.tasks = queue.Queue() self.threads = [] # 创建线程 for _ in range(num_threads): thread = threading.Thread(target=self.worker) thread.daemon = True # 确保主线程退出时自动回收 thread.start() self.threads.append(thread) def worker(self): while True: func, args, kwargs = self.tasks.get() try: func(*args, **kwargs) finally: self.tasks.task_done() def submit(self, func, *args, **kwargs): self.tasks.put((func, args, kwargs)) def wait(self): self.tasks.join() # 使用线程池 def task_function(x): print(f"Task {x} completed in thread {threading.current_thread().name}") if __name__ == '__main__': pool = ThreadPool(num_threads=4) for i in range(10): pool.submit(task_function, i) pool.wait()
说明:
- 手动实现线程池需要管理任务队列和线程生命周期。
- 不推荐这种方法,因为标准库已经封装好这些功能。
比较与总结
方法 | 优点 | 缺点 |
---|---|---|
concurrent.futures.ThreadPoolExecutor | 最易用,支持 submit() 和 map() 函数 | 缺乏复杂任务队列管理能力 |
multiprocessing.dummy.Pool | 与 multiprocessing.Pool API 一致 | 较少高级功能 |
手动实现线程池 | 灵活,可完全控制线程和任务逻辑 | 代码复杂,容易出错 |
在大多数情况下,推荐使用 concurrent.futures.ThreadPoolExecutor
,它易用且功能全面,适合现代 Python 开发。
关注公众号:程序新视界,一个让你软实力、硬技术同步提升的平台
除非注明,否则均为程序新视界原创文章,转载必须以链接形式标明本文链接
本文链接:http://www.choupangxia.com/2025/08/02/python-create-theadpool/