diff --git a/packages/try-nats/py/README.md b/packages/try-nats/py/README.md index 15cbf9e..b31b6fe 100644 --- a/packages/try-nats/py/README.md +++ b/packages/try-nats/py/README.md @@ -10,6 +10,10 @@ pip install nats-py ``` +### 说明: + +- ✅ 给出了`多个 worker`, `唯一性`消费的示例, 非常简单. + ### Nats 官方示例: - https://github.com/nats-io/nats.py/tree/main/examples diff --git a/packages/try-nats/py/Taskfile.yml b/packages/try-nats/py/Taskfile.yml index 36dc335..7b7b24b 100644 --- a/packages/try-nats/py/Taskfile.yml +++ b/packages/try-nats/py/Taskfile.yml @@ -34,3 +34,9 @@ tasks: cmds: - python run_sub.py dir: ./src/try_nats + + sub:m: + aliases: [ "multi:sub", "m:sub" ] + cmds: + - python run_multi_sub.py + dir: ./src/try_nats \ No newline at end of file diff --git a/packages/try-nats/py/src/try_nats/run_multi_sub.py b/packages/try-nats/py/src/try_nats/run_multi_sub.py new file mode 100644 index 0000000..7951d6b --- /dev/null +++ b/packages/try-nats/py/src/try_nats/run_multi_sub.py @@ -0,0 +1,30 @@ +import asyncio + +from nats.aio.client import Client as NATS + + +async def message_handler(msg): + subject = msg.subject + data = msg.data.decode() + print(f"Received a message on '{subject}': {data}") + + +async def main(): + nc = NATS() + + await nc.connect("nats://localhost:4222") + + # + # todo x: 基于 queue 方式订阅, worker 消息处理唯一性 + # + await nc.subscribe("updates", "workers", cb=message_handler) + + print("Listening for messages...") + + # Keep the program running to listen for messages + while True: + await asyncio.sleep(1) + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/packages/try-nats/py/src/try_nats/run_pub.py b/packages/try-nats/py/src/try_nats/run_pub.py index 904b529..39e1cae 100644 --- a/packages/try-nats/py/src/try_nats/run_pub.py +++ b/packages/try-nats/py/src/try_nats/run_pub.py @@ -8,6 +8,12 @@ async def main(): await nc.connect("nats://localhost:4222") + # + # todo x: 并发测试, 测试 run_multi_sub.py 基于 queue 方式订阅, 处理唯一性 + # + for i in range(10): + await nc.publish("updates", f"Message {i}".encode()) + await nc.publish("updates", b'Hello NATS!') print("Message published!")