Python的多线程
目录
基本概念
并行与并发
进程与线程
进程与线程的特点
Python多线程编程
Threading模块
创建线程
方式一:直接继承Thread,改写对应的run方法
方式二:直接调用Thread
多线程问题
多线程加锁
多线程编程死锁
解决死锁的办法
多线程多进程对于cpu bound任务与I/O bound任务的性能对比
总结
一直想对Python的多线程和多进程做一个简单的总结,毕竟开发到后面始终都是会用到并发和并行的设计知识的,现简单总结如下。
基本概念
并行与并发
- 并行:同时处理多个任务的能力,指的是任务数小于等于cpu核数,任务真的是一起执行的。
- 并发:交替处理多个任务的能力。指的是任务数多于cpu核数,通过操作系统的各种任务调度算法,实现用多个任务“一起”执行(实际上总有一些任务不在执行,因为切换任务的速度相当快,看上去一起执行而已)
并行 并发
进程与线程
进程与线程的特点
|
多线程 |
多进程 |
优点 |
|
|
缺点 |
|
|
Python多线程编程
Threading模块
Python3 通过两个标准库 _thread 和 threading 提供对线程的支持。
_thread 提供了低级别的、原始的线程以及一个简单的锁,它相比于 threading 模块的功能还是比较有限的。
threading 模块除了包含 _thread 模块中的所有方法外,还提供的其他方法:
threading.currentThread(): 返回当前的线程变量。
threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。
除了使用方法外,线程模块同样提供了Thread类来处理线程,Thread类提供了以下方法:
run(): 用以表示线程活动的方法。
start():启动线程活动。
join([time]): 等待至线程中止。这阻塞调用线程直至线程的join() 方法被调用中止-正常退出或者抛出未处理的异常-或者是可选的超时发生。
isAlive(): 返回线程是否活动的。
getName(): 返回线程名。
setName(): 设置线程名。
创建线程
方式一:直接继承Thread,改写对应的run方法
- #!/usr/bin/python3
-
-
- import threading
- import time
-
- exitFlag = 0
-
- class myThread (threading.Thread):
- def __init__(self, threadID, name, counter):
- threading.Thread.__init__(self)
- self.threadID = threadID
- self.name = name
- self.counter = counter
- def run(self):
- print ("开始线程:" + self.name)
- print_time(self.name, self.counter, 5)
- print ("退出线程:" + self.name)
-
-
-
- def print_time(threadName, delay, counter):
- while counter:
- if exitFlag:
- threadName.exit()
- time.sleep(delay)
- print ("%s: %s" % (threadName, time.ctime(time.time())))
- counter -= 1
-
-
- # 创建新线程
- thread1 = myThread(1, "Thread-1", 1)
- thread2 = myThread(2, "Thread-2", 2)
-
- # 开启新线程
- thread1.start()
- thread2.start()
- thread1.join()
- thread2.join()
-
- print ("退出主线程")
方式二:直接调用Thread
- # 多任务可以由多进程完成,也可以由一个进程内的多线程完成。
- # 我们前面提到了进程是由若干线程组成的,一个进程至少有一个线程。
- # Python使用threading模块对线程进行操作
- import time, threading
-
- # 新线程执行的代码
- def loop():
- print('thread %s is running...' % threading.current_thread().name)
- n = 0
- while n < 5:
- n += 1
- print('thread %s >>> %s ' % (threading.current_thread().name, n))
- time.sleep(1)
- print('thread %s ended.' % threading.current_thread().name)
-
-
- if __name__ == "__main__":
- print('thread %s is running...' % threading.current_thread().name)
- t = threading.Thread(target=loop, name='LoopThread')
- t.start()
- # 如果不写join,似乎对子线程的运行没有什么影响,只是主线程会提前结束。
- # 主线程的结束不会导致子线程的结束
- t.join()
-
- print('thread %s ended.' % threading.current_thread().name)
多线程问题
多线程加锁
当多线程中多个线程修改共享资源时,需要加锁,否则会出现线程不安全问题。
# 多线程和多进程最大的不同在于,多进程中,同一个变量,各自有一份拷贝存在于每个进程中,互不影响,
# 而多线程中,所有变量都由所有线程共享,所以,任何一个变量都可以被任何一个线程修改,因此,线程之间共享数据最大的危险在于多个线程同时改一个变量,把内容给改乱了。
# 上述所说的即为线程不安全问题
- import time, threading
-
- balance = 0
- lock = threading.Lock()
-
- def change_it(n):
- # 先存后取,结果应该为0
- global balance
- balance += n
- balance -= n
-
-
- def run_thread(n):
- # 在跑线程的时候不加锁,就和之前写的Beringei操作数据库代码一样,由于不是原子性操作
- # 两个线程并行交叉运行多条语句,所以会出现线程不安全,需要枷锁
- for i in range(100000):
- # lock.acquire()
- # try:
- change_it(n)
- # finally:
- # lock.release()
-
- if __name__ == "__main__":
- t1 = threading.Thread(target=run_thread, args=(5,))
- t2 = threading.Thread(target=run_thread, args=(8,))
- t1.start()
- t2.start()
- t1.join()
- t2.join()
- print(balance)
-
# 锁的好处就是确保了某段关键代码只能由一个线程从头到尾完整地执行,坏处当然也很多,首先是阻止了多线程并发执行,包含锁的某段代码
# 实际上只能以单线程模式执行,效率就大大地下降了。其次,由于可以存在多个锁,不同的线程持有不同的锁,并试图获取对方持有的锁时,
# 可能会造成死锁,导致多个线程全部挂起,既不能执行,也无法结束,只能靠操作系统强制终止
以上代码段的结果由于没有加锁,不一定是0, 如下
所以需要对run_thread这个函数进行小小的改动
- def run_thread(n):
- # 在跑线程的时候不加锁,就和之前写的Beringei操作数据库代码一样,由于不是原子性操作
- # 两个线程并行交叉运行多条语句,所以会出现线程不安全,需要枷锁
- for i in range(100000):
- lock.acquire()
- try:
- change_it(n)
- finally:
- lock.release()
多线程编程死锁
如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁,因为系统判断这部分资源正在使用,所以这两个线程如果不人为终止,将一直等待下去。死锁示意图如下:
死锁示例代码与死锁结果:
- import time
- import threading
-
-
- class Mythread(threading.Thread):
- def __init__(self):
- threading.Thread.__init__(self)
-
- def run(self):
- self.foo()
- self.bar()
-
- def foo(self):
- locka.acquire()
- print('I am %s got locka---%s' % (self.name, time.ctime()))
- lockb.acquire()
- print('I am %s got lockb---%s' % (self.name, time.ctime()))
- lockb.release()
- locka.release()
-
-
- def bar(self):
- lockb.acquire()
- print('I am %s got lockb---%s' % (self.name, time.ctime()))
- locka.acquire()
- print('I am %s got locka---%s' % (self.name, time.ctime()))
- locka.release()
- lockb.release()
-
-
- if __name__ == '__main__':
- locka = threading.Lock()
- lockb = threading.Lock()
-
- for i in range(2):
- t = Mythread()
- t.start()
解决死锁的办法
将lock换为递归锁,即Rlock()
- import time
- import threading
-
- class Mythread(threading.Thread):
- def __init__(self):
- threading.Thread.__init__(self)
-
- def run(self):
- self.foo()
- self.bar()
-
- def foo(self):
- lock.acquire()
- print('I am %s got locka---%s' % (self.name, time.ctime()))
- lock.acquire()
- print('I am %s got lockb---%s' % (self.name, time.ctime()))
- lock.release()
- lock.release()
-
- def bar(self):
- lock.acquire()
- print('I am %s got lockb---%s' % (self.name, time.ctime()))
- lock.acquire()
- print('I am %s got locka---%s' % (self.name, time.ctime()))
- lock.release()
- lock.release()
-
- if __name__ == '__main__':
- lock = threading.RLock()
- for i in range(2):
- t = Mythread()
- t.start()
多线程多进程对于cpu bound任务与I/O bound任务的性能对比
- import requests
- import time
- from threading import Thread
- from multiprocessing import Process
-
-
- # 定义cpu密集型的计算函数
- def count(x, y):
- c = 0
- while c < 50000:
- c += 1
- x += x
- y += y
-
-
- # 定义io密集型的文件读写函数
- def write():
- f = open("test.txt", "w")
- for x in range(5000000):
- f.write("testwrite\n")
- f.close()
-
- def read():
- f = open("test.txt", "r")
- lines = f.readlines()
- f.close()
-
- # 定义网络请求函数
- _head = {
- 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/48.0.2564.116 Safari/537.36'}
- url = "http://www.tieba.com"
-
- def http_request():
- try:
- webPage = requests.get(url, headers=_head)
- html = webPage.text
- return {"context": html}
- except Exception as e:
- return {"error": e}
-
-
- # 测试单进程单线程IO密集型、cpu密集型、网络请求密集型操作所需要的时间
- def line():
- # CPU密集操作
- t = time.time()
- for x in range(10):
- count(1, 1)
- print("Line cpu", time.time() - t)
- # IO密集操作
- t = time.time()
- for x in range(10):
- write()
- read()
- print("Line IO", time.time() - t)
- # 网络请求密集型操作
- t = time.time()
- for x in range(10):
- http_request()
- print("Line Http Request", time.time() - t)
-
- def multi_thread_cpu():
- counts = []
- t = time.time()
- for x in range(10):
- thread = Thread(target=count, args=(1, 1))
- counts.append(thread)
- thread.start()
- e = counts.__len__()
- while True:
- for th in counts:
- if not th.is_alive():
- e -= 1
- if e <= 0:
- break
- print(time.time() - t)
-
- def io():
- write()
- read()
-
-
- def multi_thread_io():
- t = time.time()
- ios = []
- t = time.time()
- for x in range(10):
- thread = Thread(target=count, args=(1, 1))
- ios.append(thread)
- thread.start()
-
- e = ios.__len__()
- while True:
- for th in ios:
- if not th.is_alive():
- e -= 1
- if e <= 0:
- break
- print(time.time() - t)
-
- def multi_thread_http():
- t = time.time()
- ios = []
- t = time.time()
- for x in range(10):
- thread = Thread(target=http_request)
- ios.append(thread)
- thread.start()
- e = ios.__len__()
- while True:
- for th in ios:
- if not th.is_alive():
- e -= 1
- if e <= 0:
- break
- print("Thread Http Request", time.time() - t)
-
-
- def mp_cpu():
- counts = []
- t = time.time()
- for x in range(10):
- process = Process(target=count, args=(1, 1))
- counts.append(process)
- process.start()
- e = counts.__len__()
- while True:
- for th in counts:
- if not th.is_alive():
- e -= 1
- if e <= 0:
- break
- print("Multiprocess cpu", time.time() - t)
-
-
- def mp_io():
- t = time.time()
- ios = []
- t = time.time()
- for x in range(10):
- process = Process(target=io)
- ios.append(process)
- process.start()
-
- e = ios.__len__()
- while True:
- for th in ios:
- if not th.is_alive():
- e -= 1
- if e <= 0:
- break
- print("Multiprocess IO", time.time() - t)
-
-
-
- def mp_http():
- t = time.time()
- httprs = []
- ios = []
- t = time.time()
- for x in range(10):
- process = Process(target=http_request)
- ios.append(process)
- process.start()
-
- e = httprs.__len__()
- while True:
- for th in httprs:
- if not th.is_alive():
- e -= 1
- if e <= 0:
- break
- print("Multiprocess Http Request", time.time() - t)
-
-
- if __name__ == '__main__':
- mp_cpu()
- mp_io()
- mp_http()
|
CPU密集型/s |
IO密集型/s |
HTTP密集型/s |
Line(单进程单线程) |
94.91825 |
22.462 |
7.3296 |
多线程 |
101.17 |
24.8605 |
0.5053333 |
多进程 |
53.89 |
12.784 |
0.5045 |
总结
多线程在IO密集型的操作下似乎也没有很大的优势(也许IO操作的任务再繁重一些就能体现出优势),在CPU密集型的操作下明显地比单线程线性执行性能更差,但是对于网络请求这种忙等阻塞线程的操作,多线程的优势便非常显著了
多进程无论是在CPU密集型还是IO密集型以及网络请求密集型(经常发生线程阻塞的操作)中,都能体现出性能的优势。不过在类似网络请求密集型的操作上,与多线程相差无几,但却更占用CPU等资源,所以对于这种情况下,我们可以选择多线程来执行
1、对于CPU密集型的程序,顺序执行比并发执行效率更高,这是因为并发时GIL带来了切换线程额外的开销。
(循环处理、计数等)
2. 对于IO密集型的程序,Python的多线程并发执行能够提高效率的,因为,Python线程在等待IO时,会释放GIL;
(文件处理、网络爬虫)
3、多核多线程比单核多线程更差,单核下的多线程,每次释放GIL,唤醒的那个线程都能获取到GIL锁,所以能够无缝执行,但多核下,CPU0释放GIL后,其他CPU上的线程都会进行竞争,但GIL可能会马上又被CPU0拿到,导致其他几个CPU上被唤醒后的线程会醒着等待到切换时间后又进入待调度状态,这样会造成线程颠簸(thrashing),导致效率更低。