百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

24-4-Python多线程-进程操作-案例

off999 2025-04-29 03:19 28 浏览 0 评论

4-1-介绍

4-1-1-Python的进程

虽然Python语言支持创建多线程应用程序,但是Python解释器使用了内部的全局解释器锁定(GIL),在任意指定的时刻只允许执行单个线程,并且限制了Python程序只能在一个处理器上运行。而现代CPU已经以多核为主,但Python的多线程程序无法使用。使用Python的多进程模块可以将工作分派给不受锁定限制的单个子进程。

4-1-2-创建进程的模块

  1. 在Python 3语言中,对多进程支持的是multiprocessing模块和subprocess模块。
  2. 使用模块multiprocessing可以创建并使用多进程,具体用法和模块threading的使用方法类似。
  3. 创建进程使用multiprocessing.Process对象来完成,和threading.Thread一样,可以使用它以进程方式运行函数,也可以通过继承它并重载run()方法创建进程。
  4. 模块multiprocessing同样具有模块threading中用于同步的Lock、RLock及用于通信的Event

4-2-subprocess模块

4-1-1-subprocess介绍

subprocess 模块是 Python 标准库中的一个强大工具,

用于创建新的进程、连接到它们的输入输出错误管道,并获取它们的返回码。

它允许你在 Python 脚本中执行外部命令,就像在终端中执行一样。

4-1-1-1-基本使用方法

1-subprocess.run()

这是 Python 3.5 及以后版本推荐的执行外部命令的方式。

它会等待命令执行完成,并返回一个 CompletedProcess 对象,该对象包含命令的执行结果信息。

在上述代码中,subprocess.run()接受一个列表作为参数,列表中的第一个元素是要执行的命令,后续元素是命令的参数。capture_output=True 表示捕获命令的标准输出和标准错误输出,text=True表示以文本模式处理输出。

2-subprocess.Popen()

Popen类提供了更底层的进程创建和控制功能,它允许你在命令执行过程中与其进行交互。

import subprocess

# 创建一个新的进程
process = subprocess.Popen(['ping', 'www.baidu.com'], stdout=subprocess.PIPE, text=True)

# 读取命令的输出
while True:
    output = process.stdout.readline()
    if output == '' and process.poll() is not None:
        break
    if output:
        print(output.strip())

# 等待进程结束并获取返回码
returncode = process.wait()
print(f"命令返回码: {returncode}")

在这个例子中,subprocess.Popen()创建了一个新的进程来执行 `ping` 命令,并通过 stdout=subprocess.PIPE 将命令的标准输出重定向到一个管道,以便在 Python 脚本中读取。

4-1-1-2-常用参数

args:要执行的命令,可以是字符串或列表。如果是列表,第一个元素是命令名,后续元素是命令的参数。

stdin、stdout、stderr:用于指定标准输入、标准输出和标准错误输出的处理方式。可以设置为 `subprocess.PIPE` 以捕获输出,或者设置为 `subprocess.DEVNULL` 以丢弃输出。

shell:如果设置为 `True`,则通过 shell 执行命令。默认值为 `False`。使用 `shell=True` 时要注意安全问题,因为可能会导致命令注入。

capture_output:如果设置为 `True`,则捕获命令的标准输出和标准错误输出。这是 Python 3.7 及以后版本的参数。

text:如果设置为 `True`,则以文本模式处理输入和输出,默认以字节模式处理。

4-1-2-异常处理

在使用 subprocess 模块时,可能会抛出一些异常,例如 FileNotFoundError表示找不到要执行的命令,CalledProcessError表示命令执行失败(返回码不为 0)。以下是一个异常处理的示例:

import subprocess

try:
    result = subprocess.run(['nonexistent_command'], check=True, capture_output=True, text=True)
except FileNotFoundError:
    print("命令未找到")
except subprocess.CalledProcessError as e:
    print(f"命令执行失败,返回码: {e.returncode}")
    print(e.stderr)

在上述代码中,check=True表示如果命令执行失败(返回码不为 0),则抛出 CalledProcessError 异常。

4-1-3-实际应用场景

自动化脚本:在 Python 脚本中执行系统命令,实现自动化部署、文件处理等任务。

调用外部程序:调用其他编程语言编写的程序,并与其进行交互。

系统监控:执行系统命令获取系统信息,如 CPU 使用率、内存使用情况等。

4-3-multiprocessing模块

multiprocessing是 Python 标准库中的一个模块,它允许你生成多个进程,以实现并行计算,克服了 Python 中全局解释器锁(GIL)对多线程并行计算的限制,从而在多核 CPU 上充分利用系统资源,提高程序的运行效率。以下为你详细介绍该模块:

4-3-1-基本使用

4-3-1-1-创建进程

你可以通过 multiprocessing.Process 类来创建新的进程,下面是一个简单示例:

import multiprocessing

def worker(num):
    """进程要执行的任务"""
    print(f'Worker {num} started')
    print(f'Worker {num} finished')

if __name__ == '__main__':
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(i,))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

在上述代码里,multiprocessing.Process 类接收两个重要参数,target 是进程要执行的函数,args 是传递给该函数的参数。start() 方法用于启动进程,join() 方法会让主进程等待子进程执行完毕。

4-3-1-2-进程间通信

multiprocessing模块提供了多种进程间通信(IPC)的方式,常见的有队列(`Queue`)和管道(`Pipe`)。

4-3-1-2-1-使用队列进行通信

import multiprocessing

def producer(q):
    for i in range(5):
        q.put(i)
        print(f'Produced {i}')

def consumer(q):
    while True:
        item = q.get()
        if item is None:
            break
        print(f'Consumed {item}')

if __name__ == '__main__':
    q = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(q,))
    p2 = multiprocessing.Process(target=consumer, args=(q,))

    p1.start()
    p2.start()

    p1.join()
    q.put(None)
    p2.join()

在这个例子中,producer 函数将数据放入队列,consumer 函数从队列中取出数据进行处理。

4-3-1-2-2-使用管道进行通信

import multiprocessing

def sender(conn):
    messages = ['Hello', 'World', '!']
    for msg in messages:
        conn.send(msg)
    conn.close()

def receiver(conn):
    while True:
        try:
            msg = conn.recv()
            if not msg:
                break
            print(f'Received: {msg}')
        except EOFError:
            break

if __name__ == '__main__':
    parent_conn, child_conn = multiprocessing.Pipe()
    p1 = multiprocessing.Process(target=sender, args=(child_conn,))
    p2 = multiprocessing.Process(target=receiver, args=(parent_conn,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

这里通过 multiprocessing.Pipe() 创建了一个管道,管道有两个连接对象,分别用于发送和接收数据。

4-3-1-3-进程池

当你需要处理大量任务时,手动创建和管理每个进程会很繁琐,此时可以使用 multiprocessing.Pool类来创建进程池。

import multiprocessing

def square(x):
    return x * x

if __name__ == '__main__':
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(square, [1, 2, 3, 4, 5])
        print(results)

pool.map() 方法会将可迭代对象中的每个元素依次传递给指定的函数进行处理,并返回处理结果的列表。

4-3-2-注意事项

4-3-2-1-if __name__ == '__main__'

在 Windows 和某些其他操作系统上,使用 `multiprocessing` 模块时,必须将创建和启动进程的代码放在 `if __name__ == '__main__':` 语句块中,以避免递归创建进程。

4-3-2-2-资源管理

进程会占用系统资源,因此要合理控制进程的数量,避免资源耗尽。

使用 join() 方法确保子进程执行完毕,防止僵尸进程的产生。

5-案例

5-1-需求

多线程实现抢票功能

5-2-代码

5-2-1-database_utils.py

该文件主要负责数据库连接池的配置和数据库查询操作。

import logging
import mysql.connector
from mysql.connector import pooling

# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 数据库连接池配置
dbconfig = {
    "host": "localhost",
    "user": "your_username",
    "password": "your_password",
    "database": "your_database",
    "pool_name": "mypool",
    "pool_size": 5,
    "connect_timeout": 10
}
pool = pooling.MySQLConnectionPool(**dbconfig)


def execute_query(query, params=None):
    try:
        # 从连接池获取连接
        mydb = pool.get_connection()
        mycursor = mydb.cursor()
        if params:
            mycursor.execute(query, params)
        else:
            mycursor.execute(query)
        if query.strip().lower().startswith("select"):
            result = mycursor.fetchall()
        else:
            mydb.commit()
            result = None
        return result
    except mysql.connector.errors.InterfaceError as err:
        if err.errno == 2003:
            logging.error(f"数据库连接超时: {err}")
        else:
            logging.error(f"数据库接口错误: {err}")
        return None
    except mysql.connector.Error as err:
        logging.error(f"数据库操作出错: {err}")
        return None
    except Exception as e:
        logging.error(f"执行查询时出现未知错误: {e}")
        return None
    finally:
        if 'mydb' in locals() and mydb.is_connected():
            mycursor.close()
            mydb.close()
    

5-2-2-user_utils.py

from database_utils import execute_query

# 用户信息类
class User:
    def __init__(self, user_id, name, id_number):
        self.user_id = user_id
        self.name = name
        self.id_number = id_number

    def __str__(self):
        return f"用户ID: {self.user_id}, 姓名: {self.name}, 身份证号: {self.id_number}"


def get_users_from_db():
    try:
        query = "SELECT user_id, name, id_number FROM users"
        result = execute_query(query)
        users = []
        if result:
            for user_id, name, id_number in result:
                user = User(user_id, name, id_number)
                users.append(user)
        return users
    except Exception as e:
        logging.error(f"获取用户信息时出错: {e}")
        return []
    

5-2-3-train_utils.py

该文件负责获取车次票信息和更新车次票信息。

import logging
from database_utils import execute_query

# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


def get_trains_from_db():
    try:
        query = "SELECT train_number, seat_type, tickets FROM train_tickets"
        result = execute_query(query)
        trains = {}
        if result:
            for train_number, seat_type, tickets in result:
                if train_number not in trains:
                    trains[train_number] = {}
                trains[train_number][seat_type] = tickets
        return trains
    except Exception as e:
        logging.error(f"获取车次票信息时出错: {e}")
        return {}


def update_ticket_in_db(train_number, seat_type, new_tickets):
    try:
        query = "UPDATE train_tickets SET tickets = %s WHERE train_number = %s AND seat_type = %s"
        params = (new_tickets, train_number, seat_type)
        execute_query(query, params)
        logging.info(f"{train_number} 次列车的 {seat_type} 座位票数已更新为 {new_tickets}。")
    except Exception as e:
        logging.error(f"更新车次票信息时出错: {e}")
    

5-2-4-ticket_request_utils.py

该文件用于获取车票请求信息。

from database_utils import execute_query
from user_utils import get_users_from_db


def get_ticket_requests_from_db(user_id=None):
    try:
        if user_id is None:
            query = "SELECT user_id, train_number, seat_type FROM ticket_requests"
            params = None
        else:
            query = "SELECT user_id, train_number, seat_type FROM ticket_requests WHERE user_id = %s"
            params = (user_id,)
        result = execute_query(query, params)
        users = get_users_from_db()
        user_dict = {user.user_id: user for user in users}
        ticket_requests = []
        for user_id, train_number, seat_type in result:
            if user_id in user_dict:
                ticket_requests.append((user_dict[user_id], train_number, seat_type))
        return ticket_requests
    except Exception as e:
        logging.error(f"获取抢票请求时出错: {e}")
        return []
    

5-2-5-main.py

该文件是主程序入口,负责启动线程进行抢票操作。

import threading
import logging
from train_utils import get_trains_from_db, update_ticket_in_db
from user_utils import User
from ticket_request_utils import get_ticket_requests_from_db

# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# 创建一个线程锁,用于线程同步
lock = threading.Lock()

def book_ticket(user, train_number, seat_type):
    trains = get_trains_from_db()
    result = None
    if train_number in trains and seat_type in trains[train_number]:
        if trains[train_number][seat_type] > 0:
            try:
                # 模拟一些处理时间
                import time
                time.sleep(0.1)
                # 使用上下文管理器管理锁
                with lock:
                    # 再次检查票数,防止竞态条件
                    trains = get_trains_from_db()
                    if train_number in trains and seat_type in trains[train_number] and trains[train_number][seat_type] > 0:
                        new_tickets = trains[train_number][seat_type] - 1
                        trains[train_number][seat_type] = new_tickets
                        result = f"{user} 抢到了 {train_number} 次列车的 {seat_type} 座位,剩余 {seat_type} 票数: {new_tickets}"
                        logging.info(result)
                        update_ticket_in_db(train_number, seat_type, new_tickets)
                    else:
                        result = f"{user} 抢票失败,{train_number} 次列车的 {seat_type} 座位已售罄。"
                        logging.info(result)
            except Exception as e:
                result = f"抢票过程中出现异常: {e}"
                logging.error(result)
        else:
            result = f"{user} 抢票失败,{train_number} 次列车的 {seat_type} 座位已售罄。"
            logging.info(result)
    else:
        result = f"{user} 抢票失败,车次 {train_number} 或座位类型 {seat_type} 不存在。"
        logging.info(result)

    # 将抢票结果保存到数据库
    from database_utils import execute_query
    query = "INSERT INTO ticket_booking_results (user_id, train_number, seat_type, result) VALUES (%s, %s, %s, %s)"
    params = (user.user_id, train_number, seat_type, result)
    execute_query(query, params)
    logging.info("抢票结果已保存到数据库。")


if __name__ == "__main__":
    try:
        # 示例:获取用户 ID 为 1 的车票请求
        user_id = 1
        ticket_requests = get_ticket_requests_from_db(user_id)

        threads = []

        for user, train_number, seat_type in ticket_requests:
            try:
                # 创建线程
                t = threading.Thread(target=book_ticket, args=(user, train_number, seat_type))
                threads.append(t)
                # 启动线程
                t.start()
            except Exception as e:
                logging.error(f"创建或启动线程时出错: {e}")

        # 等待所有线程完成
        for t in threads:
            try:
                t.join()
            except Exception as e:
                logging.error(f"等待线程完成时出错: {e}")

        logging.info("抢票结束。")
    except Exception as e:
        logging.error(f"主程序执行出错: {e}")
    

相关推荐

爱纯净系统官方的网址(爱纯净官网是哪个)

备份步骤:  第一步:点击win10纯净版系统桌面左下角的【Windows】按钮,从打开的扩展面板中找到【设置】按钮点击进入。  第二步:打开win10纯净版64位系统中的设置界面中,点击【更新和安全...

最新电脑主机配置清单(2021电脑主机主流配置)

1.CPU全志a31s80元主板建议联想乐pad160元显示屏7寸分辨率建议在800*480以上约110元电源接口约0.2元wifi模块约20元蓝牙模块约20元硬盘建议金...

windows7系统界面(win7界面什么样)

关于这个问题,要将Win1系统调成Win7界面,您可以尝试以下几个步骤:1.下载并安装一个Win7主题:在网上搜索并下载一个Win7主题,例如“Windows7ThemeforWin10”,...

把文件隐藏了怎么显示出来(文件隐藏起来了怎么找)

需要显示出来因为有些文件被系统默认设置为隐藏状态,为了查看或编辑这些文件,需要将它们显示出来。如果你使用Windows操作系统,可以在文件资源管理器中点击“查看”选项卡,然后勾选“隐藏项目”复选框,隐...

mercury路由器wifi密码(mercury路由器wifi密码忘了怎么办)

水星路由器宽带密码查看的方法:一、首先登陆原来路由器管理界面,输入路由器账号密码登陆。二、进入路由器管理界面后,点击进入“备份和载入配置”,然后点击“备份配置文件”,然后将备份的配置文件存放在电脑桌...

microsoft word产品密钥(微软word产品密钥)

产品密钥是由一些字符组成的代码,用于激活对应产品。产品密钥是产品授权的证明,它是根据一定的算法(如椭圆算法)等产生的随机数。当用户输入密钥产品会根据其输入的密钥判断是否满足相应的算法,通过这样来判断,...

笔记本显卡推荐(笔记本显卡推荐性价比高)

1、微星R7850TF2GD5/OC:这款微星显卡是用GCN架构设计的图形核心,里面有1024个sp单元,执行、输出能力都非常的强悍,能完美地将DX11.1特效呈现出来。它还能够支持驳接各种大型...

hp系统重装win10(hp怎么重装系统win10)

答具体解决方法如下准备工作:  1、下载u启动u盘启动盘制作工具  2、下载win10系统镜像并存入u盘启动盘中  3、硬盘模式更改为ahci模式  安装win10系统操作步骤:  1、首先,我们提...

惠普笔记本电脑售后服务(惠普笔记本电脑售后服务维修点)

惠普笔记本电脑的售后服务包括全国联保、上门维修、24小时服务热线等多种服务。消费者可以通过官网或客户服务热线轻松预约维修服务,享受专业、高效的技术支持。同时,惠普还提供质保服务和延保服务,保障消费者的...

住房公积金管理中心官网(广州住房公积金管理中心官网)

按照《住房公积金管理条例》有关规定,住房公积金管理中心性质属于事业单位,隶属事业单位编制。使用手机查询住房公积金的方法。微信查询法1、打开微信,点击进入“我”的页面,在这个页面找到钱包;2、点击进入...

远程控制系统(路灯远程控制系统)

      汽车远程控制系统是一种通过移动终端或其他设备远程控制汽车的系统,可以实现远程锁车/解锁、远程启动车辆、远程启动空调、...

惠普1136打印机驱动(惠普1136打印机驱动用的是1130吗)

原因:1、可能是安装驱动步骤错了,下载驱动的步骤一定要按步骤检查安装。2、可能是驱动程序兼容性问题,或驱动程序损坏,建议使用驱动管理软件安装,这样简单,而且是最新的,兼容性、稳定性好。3、可能是打印机...

office2013标准版产品密钥(microsoft office2013产品密钥在哪里能找到)

win7/win8/win10系统下VisualStudio2013各个版本的密钥:VisualStudioUltimate2013KEY:BWG7X-J98B3-W34RT-33B3R-...

win7旗舰版电脑非常卡怎么办

针对Windows7旗舰版卡顿的情况,可以尝试以下几种方法来解决:1.升级硬件:如果你的电脑配置较低,可以考虑升级一下硬件,例如更换内存条、加装固态硬盘等。2.清理磁盘:清理电脑中不必要的文件、...

随身wifi怎么用的使用教程(随身wifi怎么用的使用教程图解)

1、将随身wifi插入电脑端,按照提示操作2、首次安装驱动后,随身wifi会随机生成10位默认密码,每台电脑是不相同的。为方便连接,也可手动设置为相同的密码。3、随身WiFi客户端支持3种手机连接方式...

取消回复欢迎 发表评论: