RabbitMQ 协议
概述
RabbitMQ 协议支持为 Ryze 测试框架提供了与 RabbitMQ 消息代理进行交互的能力。支持多种交换机类型(direct、topic、fanout、headers)、队列管理、路由规则等 AMQP 协议特性。
依赖引入
xml
<dependency>
<groupId>io.github.xiaomisum</groupId>
<artifactId>ryze-rabbit</artifactId>
<version>6.0.1</version>
</dependency>
RabbitMQ 配置元件
YAML 配置方式
yaml
# rabbitmq 默认配置,各配置项的优先级为:取样器 > 默认配置
testclass: rabbitmq_defaults # 配置元件类型
config:
host: 'localhost' # rabbitmq服务器地址
port: 5672 # rabbitmq服务器端口,可空,默认值:5672
username: guest # rabbitmq 用户名,可空,默认值:guest
password: guest # rabbitmq 密码,可空,默认值:guest
timeout: 5000 # 连接超时时间,毫秒,默认值:60000
queue: # 队列配置
name: ryze # 消息队列名称
durable: # 是否持久化队列,可空,默认值:false
exclusive: # 是否独占队列,可空,默认值:false
auto_delete: # 是否自动删除,可空,默认值:false
arguments: # 消息队列其他属性,keyword对象,可空
exchange: # exchange 配置,可空,简单模式(不经过交换机)时,此配置为空
name: ryze # exchange 名称
type: topic # 交换机类型,参考值:fanout、direct、topic,可空,默认值:topic
routing_key: # 路由key,当 type为 fanout 时可空
RabbitMQ 取样器
YAML 配置方式
yaml
title: 标准rabbitmq取样器
testclass: rabbitmqSampler # 取样器类型
config: # 取样器配置
host: 'localhost' # rabbitmq服务器地址
port: 5672 # rabbitmq服务器端口
username: guest # rabbitmq 用户名
password: guest # rabbitmq 密码
timeout: 5000 # 连接超时时间,毫秒
queue: # 队列配置
name: ryze # 消息队列名称
durable: # 是否持久化队列
exclusive: # 是否独占队列
auto_delete: # 是否自动删除
arguments: # 消息队列其他属性
exchange: # exchange 配置
name: ryze # exchange 名称
type: topic # 交换机类型:fanout、direct、topic
routing_key: # 路由key
message: # 发送的消息
name: test
age: 18
常见问题
- 连接失败:检查 RabbitMQ 服务是否正常运行
- 认证失败:确认用户名和密码是否正确
- 队列创建失败:检查用户是否有创建队列的权限
- 交换机不存在:确保交换机已创建或有创建权限
- 路由失败:检查路由键和绑定规则是否正确
Java API 示例
java
import static io.github.xiaomisum.ryze.protocol.rabbitmq.RabbitMQMagicBox.*;
public class RabbitMQApiExample {
public void testDirectExchange() {
// 直接交换机消息
Result result = rabbitmq("发送直接消息", builder -> builder
.host("localhost")
.port(5672)
.username("guest")
.password("guest")
.queue(queue -> queue
.name("user.notifications")
.durable(true)
.autoDelete(false)
)
.exchange(exchange -> exchange
.name("user.direct")
.type("direct")
.routingKey("user.notification")
)
.message(Map.of(
"userId", 12345,
"title", "系统通知",
"content", "您的账户已成功注册"
))
);
assertThat(result.isSuccess()).isTrue();
}
public void testTopicExchange() {
// 主题交换机消息
rabbitmq("发送主题消息", builder -> builder
.host("localhost")
.exchange(exchange -> exchange
.name("logs.topic")
.type("topic")
.routingKey("application.error.database")
)
.message("数据库连接失败: Connection timeout")
);
}
}
Groovy API 示例
groovy
import static io.github.xiaomisum.ryze.protocol.rabbitmq.RabbitMQMagicBox.*
// 简单队列消息
def sendQueueMessage() {
rabbitmq("发送队列消息") { builder ->
builder.host("localhost")
.port(5672)
.username("guest")
.password("guest")
.queue { queue ->
queue.name("task.queue")
.durable(true)
}
.message([
taskId : UUID.randomUUID().toString(),
type : "email",
recipient: "user@example.com",
subject : "欢迎使用我们的服务"
])
}
}
// 完整测试套件
suite("RabbitMQ消息测试") { builder ->
builder.configure { configure ->
configure.rabbitmq { rabbitmq ->
rabbitmq.host("localhost")
.port(5672)
.username("guest")
.password("guest")
.timeout(5000)
}
}
builder.children { children ->
// 1. 发送直接消息
children.rabbitmq("发送直接消息") { rabbitmq ->
rabbitmq.queue { queue ->
queue.name("direct.queue")
.durable(true)
}
.message("直接消息内容")
}
// 2. 发送交换机消息
children.rabbitmq("发送交换机消息") { rabbitmq ->
rabbitmq.exchange { exchange ->
exchange.name("user.fanout")
.type("fanout")
}
.message([
eventType: "user_updated",
userId : "12345",
changes : ["email", "profile"],
timestamp: new Date()
])
}
// 3. 发送主题消息
["info", "warning", "error"].each { level ->
children.rabbitmq("发送${level}日志") { rabbitmq ->
rabbitmq.exchange { exchange ->
exchange.name("logs.topic")
.type("topic")
.routingKey("application.${level}.service")
}
.message("日志内容: ${level} level message")
}
}
}
}