博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python运维开发之第十一天(RabbitMQ,redis)
阅读量:4334 次
发布时间:2019-06-07

本文共 6634 字,大约阅读时间需要 22 分钟。

一、RabbitMQ

python的Queue与RabbitMQ之间的理解:

python的进程或线程Queue只能python自己用。RabbitMQ队列多个应用之间共享队列,互相通信。

1、简单的实现生产者与消费者

  生产者

  (1)建立socket连接;(2)声明一个管道;(3)声明队列(queue);(4)通过管道发消息;(5)routing_key(queue名字);(6)body(内容)

  消费者

  (1)建立连接;(2)声明管道;(3)声明队列;(4)消费者声明队列(防止生产者后启动,消费者报错);(5)消费消息;(6)callback如果收到消息就调用函数处理消息 queue队列名字;

#!/usr/bin/env python# -*- coding: utf-8 -*-# @Author  : Willpower-chen# @blog: http://www.cnblogs.com/willpower-chen/import pika#建立socket连接connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))#声明一个管道channel = connection.channel()#声明一个队列channel.queue_declare(queue='hello')#通过管道发消息,routing_key 队列queue名字 ,body发送内容channel.basic_publish(exchange='',                      routing_key='hello',                      body='Hello World! 1 2')print("[x] send 'Hello World! 1 2 '")connection.close()
producer
#!/usr/bin/env python# -*- coding: utf-8 -*-# @Author  : Willpower-chen# @blog: http://www.cnblogs.com/willpower-chen/import pika,time#建立连接connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))#声明一个管道channel = connection.channel()#声明队列,防止生产者(发送端)没开启,消费者端报错channel.queue_declare(queue='hello')#ch管道的内存对象地址,如果收到消息就调用函数callback,处理消息def callbak(ch,method,properties,body):    print("[x] Received %r " % body)    # time.sleep(30)#消费消息channel.basic_consume(callbak,                      queue='hello',                      no_ack=True #消息有没处理,都不给生产者发确认消息                      )print('[*] Waitting for messages TO exit press ctrl+c')channel.start_consuming() #开始
consumer

 2、消费者对生产者,可以1对多,而且默认是轮询机制

no_ack=True如果注释掉的话,消费者端不给服务器端确认收到消息,服务器端就不会把要发的消息从队列里清除

如下图注释了no_ack,加了一个时间,

     

开启三个消费者,一个生产者,生产者只send一次数据,挨个停止consumer,会发现同一条消息会被重新发给下一个consumer,直到producer收到consumer的确认收到的消息

 

3、队列查询

清除队列消息

 

4、消息持久化

(1)durable只是队列持久化

channel.queue_declare(queue='hello',durable=True)

生产者和消费者都需要添加durable=True

(2)要实现消息持久化,还需要

5、消息(1对多)实现权重功能

消费者端添加在消费消息之前

channel.basic_qos(prefetch_count=1)

 

6、广播消息fanout(纯广播)订阅发布

#!/usr/bin/env python# -*- coding: utf-8 -*-# @Author  : Willpower-chen# @blog: http://www.cnblogs.com/willpower-chen/import pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='logs',                         type='fanout')#message = ' '.join(sys.argv[1:]) or "info: Hello World!"message = "info: Hello World!2"channel.basic_publish(exchange='logs',                      routing_key='',                      body=message)print(" [x] Sent %r" % message)connection.close()
fanout_producer
#!/usr/bin/env python# -*- coding: utf-8 -*-# @Author  : Willpower-chen# @blog: http://www.cnblogs.com/willpower-chen/import pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='logs',                         type='fanout')result = channel.queue_declare(exclusive=True)queue_name = result.method.queueprint("random queuename",queue_name)channel.queue_bind(exchange='logs',                   queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] %r" % body)channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()
fanout_consumer

7、direct广播模式(有选择性的发送接收消息)

import pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='direct_logs',                         type='direct')severity = sys.argv[1] if len(sys.argv) > 1 else 'info'message = ' '.join(sys.argv[2:]) or 'Hello World!'channel.basic_publish(exchange='direct_logs',                      routing_key=severity,                      body=message)print(" [x] Sent %r:%r" % (severity, message))connection.close()
direct_producer
#!/usr/bin/env python# -*- coding: utf-8 -*-# @Author  : Willpower-chen# @blog: http://www.cnblogs.com/willpower-chen/import pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='direct_logs',                         type='direct')result = channel.queue_declare(exclusive=True)queue_name = result.method.queueseverities = sys.argv[1:]if not severities:    sys.stderr.write("Usage: %s [info] [warning] [error]\n" % sys.argv[0])    sys.exit(1)for severity in severities:    channel.queue_bind(exchange='direct_logs',                       queue=queue_name,                       routing_key=severity)print(severities)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()
direct_consumer

8、更细致的消息判断 type = topic

#!/usr/bin/env python# -*- coding: utf-8 -*-# @Author  : Willpower-chen# @blog: http://www.cnblogs.com/willpower-chen/import pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='topic_logs',                         type='topic')routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'message = ' '.join(sys.argv[2:]) or 'Hello World!'channel.basic_publish(exchange='topic_logs',                      routing_key=routing_key,                      body=message)print(" [x] Sent %r:%r" % (routing_key, message))connection.close()
topic_producer
#!/usr/bin/env python# -*- coding: utf-8 -*-# @Author  : Willpower-chen# @blog: http://www.cnblogs.com/willpower-chen/import pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(    host='localhost'))channel = connection.channel()channel.exchange_declare(exchange='topic_logs',                         type='topic')result = channel.queue_declare(exclusive=True)queue_name = result.method.queuebinding_keys = sys.argv[1:]if not binding_keys:    sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])    sys.exit(1)for binding_key in binding_keys:    channel.queue_bind(exchange='topic_logs',                       queue=queue_name,                       routing_key=binding_key)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()
topic_consumer

 

 

 

 

 

 
 
 

转载于:https://www.cnblogs.com/willpower-chen/p/5977633.html

你可能感兴趣的文章
阶段3 2.Spring_03.Spring的 IOC 和 DI_6 spring中bean的细节之三种创建Bean对象的方式
查看>>
阶段3 2.Spring_04.Spring的常用注解_3 用于创建的Component注解
查看>>
阶段3 2.Spring_04.Spring的常用注解_2 常用IOC注解按照作用分类
查看>>
阶段3 2.Spring_09.JdbcTemplate的基本使用_5 JdbcTemplate在spring的ioc中使用
查看>>
阶段3 3.SpringMVC·_07.SSM整合案例_02.ssm整合之搭建环境
查看>>
小D课堂 - 零基础入门SpringBoot2.X到实战_第1节零基础快速入门SpringBoot2.0_3、快速创建SpringBoot应用之手工创建web应用...
查看>>
阶段3 3.SpringMVC·_07.SSM整合案例_04.ssm整合之编写SpringMVC框架
查看>>
小D课堂 - 零基础入门SpringBoot2.X到实战_第1节零基础快速入门SpringBoot2.0_5、SpringBoot2.x的依赖默认Maven版本...
查看>>
阶段3 3.SpringMVC·_07.SSM整合案例_08.ssm整合之Spring整合MyBatis框架
查看>>
小D课堂 - 零基础入门SpringBoot2.X到实战_第2节 SpringBoot接口Http协议开发实战_9、SpringBoot基础HTTP其他提交方法请求实战...
查看>>
小D课堂 - 零基础入门SpringBoot2.X到实战_第2节 SpringBoot接口Http协议开发实战_12、SpringBoot2.x文件上传实战...
查看>>
小D课堂 - 零基础入门SpringBoot2.X到实战_第4节 Springboot2.0单元测试进阶实战和自定义异常处理_19、SpringBoot个性化启动banner设置debug日志...
查看>>
小D课堂 - 零基础入门SpringBoot2.X到实战_第4节 Springboot2.0单元测试进阶实战和自定义异常处理_20、SpringBoot2.x配置全局异常实战...
查看>>
小D课堂 - 零基础入门SpringBoot2.X到实战_第5节 SpringBoot部署war项目到tomcat9和启动原理讲解_23、SpringBoot2.x启动原理概述...
查看>>
小D课堂 - 零基础入门SpringBoot2.X到实战_第4节 Springboot2.0单元测试进阶实战和自定义异常处理_21、SpringBoot2.x配置全局异常返回自定义页面...
查看>>
小D课堂 - 零基础入门SpringBoot2.X到实战_第8节 数据库操作之整合Mybaties和事务讲解_32..SpringBoot2.x持久化数据方式介绍...
查看>>
小D课堂 - 零基础入门SpringBoot2.X到实战_第8节 数据库操作之整合Mybaties和事务讲解_34、SpringBoot整合Mybatis实操和打印SQL语句...
查看>>
小D课堂 - 零基础入门SpringBoot2.X到实战_第8节 数据库操作之整合Mybaties和事务讲解_35、事务介绍和常见的隔离级别,传播行为...
查看>>
小D课堂 - 零基础入门SpringBoot2.X到实战_第9节 SpringBoot2.x整合Redis实战_40、Redis工具类封装讲解和实战...
查看>>
小D课堂 - 零基础入门SpringBoot2.X到实战_第9节 SpringBoot2.x整合Redis实战_37、分布式缓存Redis介绍...
查看>>