百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

Python爬虫进阶教程(二):线程、协程

off999 2025-04-09 19:06 11 浏览 0 评论

简介

线程

线程也叫轻量级进程,它是一个基本的CPU执行单元,也是程序执行过程中的最小单元,由线程ID、程序计数器、寄存器集合和堆栈共同组成。线程的引入减小了程序并发执行时的开销,提高了操作系统的并发性能。 线程没有自己的系统资源,只拥有在运行时必不可少的资源。但线程可以与同属与同一进程的其他线程共享进程所拥有的其他资源。

多线程类似同时执行多个不同程序,python的标准库提供了两个模块thread和threading,thread是低级模块,threading是高级模块,对thread进行了封装,大多情况下,我们只使用threading模块即可。

threading模块

此模块在较低级别的thread模块之上构建的更高级别的线程接口,一般通过两种方式实现多线程,第一种方式是把一个函数传入并创建实例,然后调用start方法执行;第二种方式是直接从threading.Thread继承并创建线程类,然后重写init方法和run方法。

第一种方式代码示例:

import time
import random
import threading

def t_run(urls):
 """
 线程执行代码
 """
 # threading.current_thread()返回当前的Thread对象,对应于调用者控制的线程。
 # 如果调用者控制的线程不是通过threading模块创建的,则返回一个只有有限功能的虚假线程对象。
 print('Current %s is running...' % threading.current_thread().name)
 for url in urls:
 print(' threading %s -----> %s ' % (threading.current_thread().name, url))
 time.sleep(random.random())
 print('%s ended.' % threading.current_thread().name)

if __name__ == '__main__':
 # 创建两个线程实例
 t1 = threading.Thread(target=t_run, name='Thread_1', args=(['url1', 'url2'],))
 t2 = threading.Thread(target=t_run, name='Thread_2', args=(['url3', 'url4'],))
 # 启动线程
 t1.start()
 t2.start()
 # 等待线程结束
 t1.join()
 t2.join()
 print('%s ended.' % threading.current_thread().name)

运行结果如下:

Current Thread_1 is running...
 threading Thread_1 -----> url1 
Current Thread_2 is running...
 threading Thread_2 -----> url3 
 threading Thread_1 -----> url2 
 threading Thread_2 -----> url4 
Thread_2 ended.
Thread_1 ended.
MainThread ended.

第二种方式用threading.Thread继承创建线程类

import time
import random
import threading

class MyThread(threading.Thread):
 """
 定义线程类
 """

 def __init__(self, name, urls):
 """
 初始化,重写线程
 """
 threading.Thread.__init__(self, name=name)
 self.urls = urls

 def run(self):
 """
 执行函数
 """
 # 打印当前线程名
 print('Current %s is running...' % threading.current_thread().name)
 for url in self.urls:
 print('Thread %s ------> %s' % (threading.current_thread().name, url))
 time.sleep(random.random())
 print('%s ended.' % threading.current_thread().name)

if __name__ == '__main__':
 print('%s is running...' % threading.current_thread().name)
 t1 = MyThread(name='Thread_1', urls=['url1', 'url2'])
 t2 = MyThread(name='Thread_2', urls=['url3', 'url4'])
 t1.start()
 t2.start()
 t1.join()
 t2.join()
 print('%s ended.' % threading.current_thread().name)

结果如下:

MainThread is running...
Current Thread_1 is running...
Thread Thread_1 ------> url1
Current Thread_2 is running...
Thread Thread_2 ------> url3
Thread Thread_1 ------> url2
Thread Thread_2 ------> url4
Thread_1 ended.
Thread_2 ended.
MainThread ended.

线程同步

如果多个线程共同对某个数据进行修改,就有可能会造成不可预料的结果,为了防止这种情况发生,需要对线程进行同步,使用Lock和Rlock可以实现简单线程同步。

Lock 对象

一个可重入锁处于“locked”或者“unlocked”状态中的一种。它创建时处于unlocked状态。它有两个基本方法,acquire()和release()。当状态是unlocked时,acquire()改变该状态为locked并立即返回。当状态被锁定时,acquire()阻塞,直到在另一个线程中对release()的调用将其改为unlocked,然后acquire()执行,release()方法只应在锁定状态下调用;它将状态更改为已解锁并立即返回。如果尝试释放已解锁的锁,将会引发RuntimeError。

Rlock 对象

一个可重入锁必须由获得它的线程释放。一旦线程获得了可重入锁,同一线程可以再次获取它而不阻塞;在所有的release操作完成后,别的线程才能申请Rlock对象,见下面例子:

import threading
# 创建Rlock实例
lock = threading.RLock()
# 定义变量
num = 0

class MyThread(threading.Thread):
 """
 定义线程类
 """

 def __init__(self, name):
 """
 重新定义name
 """
 threading.Thread.__init__(self, name=name)

 def run(self):
 """
 执行函数
 """
 # 全局变量num
 global num
 while True:
 # 加锁
 lock.acquire()
 print('%s locked, Number: %d' % (threading.current_thread().name, num))
 if num >= 4:
 # 解锁
 lock.release()
 print('%s released, Number: %d' % (threading.current_thread().name, num))
 break
 num += 1
 print('%s released, Number: %d' % (threading.current_thread().name, num))
 lock.release()

if __name__ == '__main__':
 thread1 = MyThread('Thread_1')
 thread2 = MyThread('Thread_2')
 thread3 = MyThread('Thread_3')
 thread1.start()
 thread2.start()
 thread3.start()

运行结果如下:

Thread_1 locked, Number: 0
Thread_1 released, Number: 1
Thread_1 locked, Number: 1
Thread_1 released, Number: 2
Thread_1 locked, Number: 2
Thread_1 released, Number: 3
Thread_1 locked, Number: 3
Thread_1 released, Number: 4
Thread_1 locked, Number: 4
Thread_1 released, Number: 4
Thread_2 locked, Number: 4
Thread_2 released, Number: 4
Thread_3 locked, Number: 4
Thread_3 released, Number: 4

可以看出Rlock锁只有线程1的num为4时,调用release方法,全部解锁后,线程2才可以调用,线程2开始时num就是4,所以也直接到if判断结束,调用release后,线程3开始执行。

全局解释器锁(GIL)

首先说的一点是GIL并不是Python的特性,它是Python解析器(CPython)引入的一个概念。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。

GIL全称Global Interpreter Lock,是一个互斥锁,它可以防止多个本地线程同时执行Python的某个值,毫无疑问全局锁的存在会对多线程的效率有不小影响。几乎等于Python是个单线程的程序。(这也是大家吐槽python多线程慢的槽点)

协程

协程又称微线程、纤程,就好比同时开启多个任务,但一次只顺序执行一个。等到所执行的任务遭遇阻塞,就切换到下一个任务继续执行,以期节省下阻塞所占用的时间。

协程与线程类似,每个协程表示一个执行单元,有自己的本地数据,与其它协程共享全局数据和其它资源。对CPU来说协程就是单线程,不必考虑切换开销。

那么python如何实现协程呢?Python对协程的支持是通过generator实现的。在generator中,我们不但可以通过for循环来迭代,还可以不断调用next()函数获取由yield语句返回的下一个值。但是Python的yield不但可以返回一个值,它还可以接收调用者发出的参数。见下面简单生产者消费者示例:

def consumer():
 r = ''
 while True:
 # 这个地方注意,到达这个yield后,就会抛出n的值,暂停等待next或send继续
 n = yield r
 if not n:
 return
 print('[CONSUMER] Consuming %s ...' % n)
 r = '200 OK'

def produce(c):
 c.send(None)
 n = 0
 while n < 5:
 n = n + 1
 print('[PRODUCER] Porducing %s ...' % n)
 r = c.send(n)
 print('[PRODUCER] Consumer return: %s...' % r)
 c.close()

c = consumer()
produce(c)

结果如下:

[PRODUCER] Porducing 1 ...
[CONSUMER] Consuming 1 ...
[PRODUCER] Consumer return: 200 OK...
[PRODUCER] Porducing 2 ...
[CONSUMER] Consuming 2 ...
[PRODUCER] Consumer return: 200 OK...
[PRODUCER] Porducing 3 ...
[CONSUMER] Consuming 3 ...
[PRODUCER] Consumer return: 200 OK...
[PRODUCER] Porducing 4 ...
[CONSUMER] Consuming 4 ...
[PRODUCER] Consumer return: 200 OK...
[PRODUCER] Porducing 5 ...
[CONSUMER] Consuming 5 ...
[PRODUCER] Consumer return: 200 OK...

注意到consumer函数是一个generator,把一个consumer传入produce后:

首先调用c.send(None)启动生成器;

然后,一旦生产了东西,通过c.send(n)切换到consumer执行;

consumer通过yield拿到消息,处理,又通过yield把结果传回;

produce拿到consumer处理的结果,继续生产下一条消息;

produce决定不生产了,通过c.close()关闭consumer,整个过程结束。

整个流程无锁,由一个线程执行,produce和consumer协作完成任务,所以称为“协程”,而非线程的抢占式多任务。

最后套用Donald Knuth的一句话总结协程的特点:

“子程序就是协程的一种特例。”

asyncio

asyncio是Python 3.4版本引入的标准库,直接内置了对异步IO的支持。用asyncio实现Hello world代码如下:

import asyncio
# 把一个generator标记为协程类型
@asyncio.coroutine
def hello():
 """
 定义一个生成器
 """
 print('Hello world!')
 # 通过yield from调用另一个标记为协程的生成器
 r = yield from asyncio.sleep(2)
 print('Hello again!')
# 生成实例
loop = asyncio.get_event_loop()
# 将标记为协程的生成器执行
loop.run_until_complete(hello())
loop.close()

执行结果:

Hello world!
# 此处等待了2秒
Hello again!

从结果可以看出hello()会首先打印出Hello world!,然后,yield from语法可以让我们方便地调用另一个generator。由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。把asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行。

我们用Task封装两个coroutine试试:

import asyncio
import threading
@asyncio.coroutine
def hello():
 print('Hello world! %s' % threading.current_thread())
 r = yield from asyncio.sleep(5)
 print('Hello again! %s' % threading.current_thread())
loop = asyncio.get_event_loop()
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

输出结果如下:

Hello world! <_mainthread(mainthread, started 22668>
Hello world! <_mainthread(mainthread, started 22668>
# 此处暂停5秒
Hello again! <_mainthread(mainthread, started 22668>
Hello again! <_mainthread(mainthread, started 22668>

由打印的当前线程名称可以看出,两个coroutine是由同一个线程并发执行的。

如果把asyncio.sleep()换成真正的IO操作,则多个coroutine就可以由一个线程并发执行。

我们用asyncio的异步网络连接来获取sina、sohu和163的网站首页:

import asyncio
@asyncio.coroutine
def wget(host):
 print('wget %s ...' % host)
 connect = asyncio.open_connection(host, 80)
 reader, writer = yield from connect
 header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
 writer.write(header.encode('utf-8'))
 yield from writer.drain()
 while True:
 line = yield from reader.readline()
 if line == b'\r\n':
 break
 print('%s header > %s ' % (host, line.decode('utf-8').rstrip()))
 writer.close()
loop = asyncio.get_event_loop()
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.baidu.com', 'www.163.com']]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

输出结果如下:

wget www.163.com ...
wget www.sina.com.cn ...
wget www.baidu.com ...
www.163.com header > HTTP/1.0 302 Moved Temporarily 
www.163.com header > Server: Cdn Cache Server V2.0 
www.163.com header > Date: Sat, 06 Jan 2018 14:14:58 GMT 
www.163.com header > Content-Length: 0 
www.163.com header > Location: http://www.163.com/special/0077jt/error_isp.html 
www.163.com header > Connection: close 
www.sina.com.cn header > HTTP/1.1 200 OK 
www.sina.com.cn header > Server: nginx 
www.sina.com.cn header > Date: Sat, 06 Jan 2018 14:12:15 GMT 
www.sina.com.cn header > Content-Type: text/html 
www.sina.com.cn header > Content-Length: 605048 
www.sina.com.cn header > Connection: close 
www.sina.com.cn header > Last-Modified: Sat, 06 Jan 2018 14:09:06 GMT 
www.sina.com.cn header > Vary: Accept-Encoding 
www.sina.com.cn header > Expires: Sat, 06 Jan 2018 14:13:12 GMT 
www.sina.com.cn header > Cache-Control: max-age=60 
www.sina.com.cn header > X-Powered-By: shci_v1.03 
www.sina.com.cn header > Age: 3 
www.sina.com.cn header > Via: http/1.1 cnc.beixian.ha2ts4.205 (ApacheTrafficServer/6.2.1 [cMsSf ]), http/1.1 gwbn.beijing.ha2ts4.23 (ApacheTrafficServer/6.2.1 [cHs f ]) 
www.sina.com.cn header > X-Via-Edge: 15152479356296c6422730904eedb7d91845d 
www.sina.com.cn header > X-Cache: HIT.23 
www.sina.com.cn header > X-Via-CDN: f=edge,s=gwbn.beijing.ha2ts4.21.nb.sinaedge.com,c=115.34.100.108;f=Edge,s=gwbn.beijing.ha2ts4.23,c=219.238.4.21 
www.baidu.com header > HTTP/1.1 200 OK 
www.baidu.com header > Date: Sat, 06 Jan 2018 14:12:15 GMT 
www.baidu.com header > Content-Type: text/html 
www.baidu.com header > Content-Length: 14613 
www.baidu.com header > Last-Modified: Fri, 29 Dec 2017 03:29:00 GMT 
www.baidu.com header > Connection: Close 
www.baidu.com header > Vary: Accept-Encoding 
www.baidu.com header > Set-Cookie: BAIDUID=BEA2CAC2706F8386AAC50DEEC6287BD9:FG=1; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com 
www.baidu.com header > Set-Cookie: BIDUPSID=BEA2CAC2706F8386AAC50DEEC6287BD9; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com 
www.baidu.com header > Set-Cookie: PSTM=1515247935; expires=Thu, 31-Dec-37 23:55:55 GMT; max-age=2147483647; path=/; domain=.baidu.com 
www.baidu.com header > P3P: CP=" OTI DSP COR IVA OUR IND COM " 
www.baidu.com header > Server: BWS/1.1 
www.baidu.com header > X-UA-Compatible: IE=Edge,chrome=1 
www.baidu.com header > Pragma: no-cache 
www.baidu.com header > Cache-control: no-cache 
www.baidu.com header > Accept-Ranges: bytes 

async/await

asyncio提供的@asyncio.coroutine可以把一个generator标记为coroutine类型,然后在coroutine内部用yield from调用另一个coroutine实现异步操作。

为了简化并更好地标识异步IO,从Python 3.5开始引入了新的语法async和await,可以让coroutine的代码更简洁易读。

请注意,async和await是针对coroutine的新语法,要使用新的语法,只需要做两步简单的替换:

  • 把@asyncio.coroutine替换为async;
  • 把yield from替换为await。

那么6部分的hello代码就可以这样替换了

# 把一个generator标记为协程类型
@asyncio.coroutine
def hello():
"""
定义一个生成器
"""
print('Hello world!')
# 通过yield from调用另一个标记为协程的生成器
r = yield from asyncio.sleep(2)
print('Hello again!')

替换为

async def hello():
"""
定义一个生成器
"""
print('Hello world!')
# 通过await调用另一个标记为协程的生成器
r = await asyncio.sleep(2)
print('Hello again!')

其它位置的代码均不变化。

相关推荐

Python 数据分析——利用Pandas进行分组统计

话说天下大势,分久必合,合久必分。数据分析也是如此,我们经常要对数据进行分组与聚合,以对不同组的数据进行深入解读。本章将介绍如何利用Pandas中的GroupBy操作函数来完成数据的分组、聚合以及统计...

python数据分析:介绍pandas库的数据类型Series和DataFrame

安装pandaspipinstallpandas-ihttps://mirrors.aliyun.com/pypi/simple/使用pandas直接导入即可importpandasas...

使用DataFrame计算两列的总和和最大值_[python]

【如果对您有用,请关注并转发,谢谢~~】最近在处理气象类相关数据的空间计算,在做综合性计算的时候,DataFrame针对每列的统计求和、最大值等较为方便,对某行的两列或多列数据进行求和与最大值等的简便...

8-Python内置函数

Python提供了丰富的内置函数,这些函数可以直接使用而无需导入任何模块。以下是一些常用的内置函数及其示例:1-print()1-1-说明输出指定的信息到控制台。1-2-例子2-len()2-1-说...

Python中函数式编程函数: reduce()函数

Python中的reduce()函数是一个强大的工具,它通过连续地将指定的函数应用于序列(如列表)来对序列(如列表)执行累积操作。它是functools模块的一部分,这意味着您需要在使用它之...

万万没想到,除了香农计划,Python3.11竟还有这么多性能提升

众所周知,Python3.11版本带来了较大的性能提升,但是,它具体在哪些方面上得到了优化呢?除了著名的“香农计划”外,它还包含哪些与性能相关的优化呢?本文将带你一探究竟!作者:BeshrKay...

最全python3.11版12类75个内置函数大全

获取全部内置函数:importbuiltins#导入模块yc=[]#异常属性nc=[]#不可调用fn=[]#内置函数defll(ty=builtins):...

软件测试笔试题

测试工程师岗位,3-5年,10-14k1.我司有一款产品,类似TeamViewer,向日葵,mstsc,QQ远程控制产品,一个PC客户端产品,请设想一下测试要点。并写出2.写出常用的SQL语句8条,l...

备战各大互联网巨头公司招聘会,最全Python面试大全,共300题

前言众所周知,越是顶尖的互联网公司在面试这一part的要求就越高,需要你有很好的技术功底、项目经验、一份漂亮的简历,当然还有避免不了的笔试过关。对于Python的工程师来说,全面掌握好有关Python...

经典 SQL 数据库笔试题及答案整理

马上又是金三银四啦,有蛮多小伙伴在跳槽找工作,但对于年限稍短的软件测试工程师,难免会需要进行笔试,而在笔试中,基本都会碰到一道关于数据库的大题,今天这篇文章呢,就收录了下最近学员反馈上来的一些数据库笔...

用Python开发日常小软件,让生活与工作更高效!附实例代码

引言:Python如何让生活更轻松?在数字化时代,编程早已不是程序员的专属技能。Python凭借其简洁易学的特点,成为普通人提升效率、解决日常问题的得力工具。无论是自动化重复任务、处理数据,还是开发个...

太牛了!102个Python实战项目被我扒到了!建议收藏!

挖到宝了!整整102个Python实战项目合集,从基础语法到高阶应用全覆盖,附完整源码+数据集,手把手带你从代码小白变身实战大神!这波羊毛不薅真的亏到哭!超全项目库,学练一站式搞定这份资...

Python中的并发编程

1.Python对并发编程的支持多线程:threading,利用CPU和IO可以同时执行的原理,让CPU不会干巴巴等待IO完成。多进程:multiprocessing,利用多核CPU...

Python 也有内存泄漏?

1.背景前段时间接手了一个边缘视觉识别的项目,大功能已经开发的差不多了,主要是需要是优化一些性能问题。其中比较突出的内存泄漏的问题,而且不止一处,有些比较有代表性,可以总结一下。为了更好地可视化内存...

python爬虫之多线程threading、多进程、协程aiohttp批量下载图片

一、单线程常规下载常规单线程执行脚本爬取壁纸图片,只爬取一页的图片。importdatetimeimportreimportrequestsfrombs4importBeautifu...

取消回复欢迎 发表评论: