Skip to content

RabbitMQ 协议

概述

RabbitMQ 协议支持为 Ryze 测试框架提供了与 RabbitMQ 消息代理进行交互的能力。支持多种交换机类型(direct、topic、fanout、headers)、队列管理、路由规则等 AMQP 协议特性。

📊 配置项参考表

连接配置

配置项类型默认值必需描述
urlString-RabbitMQ 服务器连接信息
timeoutInteger60000连接超时时间 (毫秒)

队列配置

配置项类型默认值必需描述
queue.nameString-✅*队列名称
queue.durableBooleanfalse是否持久化队列
queue.exclusiveBooleanfalse是否为占用队列
queue.auto_deleteBooleanfalse是否自动删除
queue.argumentsMap-队列其他属性

交换机配置

配置项类型默认值必需描述
exchange.nameString-交换机名称
exchange.typeStringtopic交换机类型 (direct/topic/fanout/headers)
exchange.routing_keyString-路由 Key

消息配置

配置项类型默认值必需描述
messageObject-发送的消息 (JSON/字符串)

配置优先级: 取样器配置 > RabbitMQ 默认配置

: *表示 queue 和 exchange 二选一

依赖引入

xml

<dependency>
    <groupId>io.github.xiaomisum</groupId>
    <artifactId>ryze-rabbit</artifactId>
    <version>${version}</version>
</dependency>

RabbitMQ 配置元件

YAML 配置方式

yaml
# rabbitmq 默认配置,各配置项的优先级为:取样器 > 默认配置
testclass: rabbitmq_defaults  # 配置元件类型
config:
  url: amqp://admin:MyPassword@192.168.1.100:5672/prod_vhost # RabbitMQ 服务器连接信息
  timeout: 5000 # 连接超时时间,毫秒,默认值:60000
  queue: # 队列配置
    name: ryze  # 消息队列名称
    durable:  # 是否持久化队列,可空,默认值:false
    exclusive:  # 是否独占队列,可空,默认值:false
    auto_delete:  # 是否自动删除,可空,默认值:false
    arguments:  # 消息队列其他属性,keyword对象,可空
  exchange: # exchange 配置,可空,简单模式(不经过交换机)时,此配置为空
    name: ryze # exchange 名称
    type: topic # 交换机类型,参考值:fanout、direct、topic,可空,默认值:topic
    routing_key:  # 路由key,当 type为 fanout 时可空

RabbitMQ 取样器

YAML 配置方式

yaml
title: 标准rabbitmq取样器
testclass: rabbitmqSampler  # 取样器类型
config: # 取样器配置
  url: amqp://admin:MyPassword@192.168.1.100:5672/prod_vhost # RabbitMQ 服务器连接信息
  timeout: 5000 # 连接超时时间,毫秒
  queue: # 队列配置
    name: ryze  # 消息队列名称
    durable:  # 是否持久化队列
    exclusive:  # 是否独占队列
    auto_delete:  # 是否自动删除
    arguments:  # 消息队列其他属性
  exchange: # exchange 配置
    name: ryze # exchange 名称
    type: topic # 交换机类型:fanout、direct、topic
    routing_key:  # 路由key
  message: # 发送的消息
    name: test
    age: 18

常见问题

  1. 连接失败:检查 RabbitMQ 服务是否正常运行
  2. 认证失败:确认用户名和密码是否正确
  3. 队列创建失败:检查用户是否有创建队列的权限
  4. 交换机不存在:确保交换机已创建或有创建权限
  5. 路由失败:检查路由键和绑定规则是否正确

Java API 示例

java
import static io.github.xiaomisum.ryze.protocol.rabbitmq.RabbitMQMagicBox.*;

public class RabbitMQApiExample {

    public void testDirectExchange() {
        // 直接交换机消息
        Result result = rabbitmq("发送直接消息", builder -> builder
                .url("amqp://guest:guest@localhost:5672/")
                .queue(queue -> queue
                        .name("user.notifications")
                        .durable(true)
                        .autoDelete(false)
                )
                .exchange(exchange -> exchange
                        .name("user.direct")
                        .type("direct")
                        .routingKey("user.notification")
                )
                .message(Map.of(
                        "userId", 12345,
                        "title", "系统通知",
                        "content", "您的账户已成功注册"
                ))
        );

        assertThat(result.isSuccess()).isTrue();
    }

    public void testTopicExchange() {
        // 主题交换机消息
        rabbitmq("发送主题消息", builder -> builder
                .url("amqp://guest:guest@localhost:5672/")
                .exchange(exchange -> exchange
                        .name("logs.topic")
                        .type("topic")
                        .routingKey("application.error.database")
                )
                .message("数据库连接失败: Connection timeout")
        );
    }
}

Groovy API 示例

groovy
import static io.github.xiaomisum.ryze.protocol.rabbitmq.RabbitMQMagicBox.*

// 简单队列消息
def sendQueueMessage() {
    rabbitmq("发送队列消息") { builder ->
        builder.url("amqp://guest:guest@localhost:5672/")
                .queue { queue ->
                    queue.name("task.queue")
                            .durable(true)
                }
                .message([
                        taskId   : UUID.randomUUID().toString(),
                        type     : "email",
                        recipient: "user@example.com",
                        subject  : "欢迎使用我们的服务"
                ])
    }
}

// 完整测试套件
suite("RabbitMQ消息测试") { builder ->
    builder.configure { configure ->
        configure.rabbitmq { rabbitmq ->
            rabbitmq.url("amqp://guest:guest@localhost:5672/")
                    .timeout(5000)
        }
    }

    builder.children { children ->
        // 1. 发送直接消息
        children.rabbitmq("发送直接消息") { rabbitmq ->
            rabbitmq.queue { queue ->
                queue.name("direct.queue")
                        .durable(true)
            }
                    .message("直接消息内容")
        }

        // 2. 发送交换机消息
        children.rabbitmq("发送交换机消息") { rabbitmq ->
            rabbitmq.exchange { exchange ->
                exchange.name("user.fanout")
                        .type("fanout")
            }
                    .message([
                            eventType: "user_updated",
                            userId   : "12345",
                            changes  : ["email", "profile"],
                            timestamp: new Date()
                    ])
        }

        // 3. 发送主题消息
        ["info", "warning", "error"].each { level ->
            children.rabbitmq("发送${level}日志") { rabbitmq ->
                rabbitmq.exchange { exchange ->
                    exchange.name("logs.topic")
                            .type("topic")
                            .routingKey("application.${level}.service")
                }
                        .message("日志内容: ${level} level message")
            }
        }
    }
}

相关文档


💡 提示: 更多详细示例请参考 example/rabbit-example 目录下的完整示例代码。

Released under the MIT License.