python基于asyncio实现 Redis 的异步操作哈希数据写入 / 读取、发布订阅消息中间件


发布者 ourjs  发布时间 1768976694754
关键字 Python  Redis 

基于asyncio实现 Redis 的异步操作 —— 包括哈希数据写入 / 读取、发布订阅(Pub/Sub),同时演示了串行 / 并行两种消息发布方式,最终让订阅者持续监听频道消息,阻止进程退出。

启动 Redis 容器

启动 Redis 容器、

docker run -d --name redis -p 6379:6379 redis

安装 Python 依赖,为后续异步操作 Redis 做环境准备;

pip install asyncio
pip install redis

pip install asyncio:安装 Python 异步 I/O 框架(注:Python 3.7 + 已内置asyncio,此命令实际是冗余的,仅为显式声明依赖); pip install redis:安装 Redis 的 Python 客户端库(最新版redis库已内置异步支持,即aioredis); 核心目的:安装操作 Redis 和实现异步编程所需的依赖包。

Redis操作详解

Redis 哈希操作:

hset:向user:001哈希中写入键值对(mapping参数接收字典); hgetall:读取哈希中所有键值对,默认返回bytes类型(如需字符串,可在创建 Redis 客户端时加decode_responses=True);

串行发布:

await publisher(...) 会等待前一个发布任务完成后,再执行下一个,总耗时是各任务耗时之和; 并行发布(原代码有 Bug): asyncio.create_task(publisher(*args)):创建异步任务(立即启动,不阻塞); *[task1, task2, task3]:解包列表,将多个任务作为独立参数传入gather; asyncio.gather(...) 前漏加await,导致并行任务被等待,不加程序会直接跳过,不等待这些任务可能执行结束;

订阅者任务:

asyncio.create_task(subscriber(...)):将订阅者封装为后台任务,立即启动监听; await subscriber_task:等待订阅者任务结束(但subscriber是无限循环,因此进程会一直运行,持续监听频道消息)。

完整示例

import asyncio
import datetime
from redis import asyncio as aioredis

async def publisher(redis, channel, message, timer = 0.1):
    if timer > 0:
        await asyncio.sleep(timer)
    print(f"Current date time: {datetime.datetime.now()}")
    await redis.publish(channel, message)
    print(f"Message sent: {message}")

async def subscriber(redis, channel):
   pubsub = redis.pubsub()
   await pubsub.subscribe(channel)
   async for message in pubsub.listen():
    #    if message['type'] == 'message':
           print(f"Message received: {message}")

async def main():
    redis = aioredis.Redis(host='localhost', port=6379)

    # 写入用户001
    await redis.hset("user:001", mapping={ "id": "001", "name": "typescript" })
    user = await redis.hgetall("user:001")
    print(user)

    channel="test channel"
    subscriber_task = asyncio.create_task(subscriber(redis, channel))

    # 调用方法一: 串行执行
    await publisher(redis, channel, "Hello world", 2)   #先等待再发送
    await publisher(redis, channel, "Hello redis", 0)   #立即发送
    await publisher(redis, channel, "Hello python")

    # 调用方法二: 并行执行
    publish_tasks = [
        (redis, channel, "gather: Hello world", 10),
        (redis, channel, "gather: Hello redis", 0),
        (redis, channel, "gather: Hello python")
    ]
    # 并行执行,不区分先后顺序, 用*把列表拆成独立参数
    asyncio.gather(*[ asyncio.create_task(publisher(*args)) for args in publish_tasks ])

    # 写入用户002
    await redis.hset("user:002", mapping={ "id": "002", "name": "python" })
    user = await redis.hgetall("user:002")
    print(user)

    # 阻止进程退出,一直监听
    await subscriber_task

asyncio.run(main())