Files
2025-09-19 16:14:08 +08:00

20 KiB
Raw Permalink Blame History

PowerJob Mu 协议技术设计方案

1. 项目背景

1.1 业务场景

PowerJob 是一个分布式任务调度框架,采用 Server-Worker 架构。在实际部署中,经常遇到以下网络环境限制:

  • Worker 节点部署在内网环境,只能单向访问公网的 Server
  • Server 无法直接访问 Worker 的内网 IP 地址
  • 业务需要支持 Server 向 Worker 的主动通讯(任务下发、状态查询等)

1.2 现有协议局限性

  • HTTP 协议:基于请求-响应模式Server 向 Worker 推送需要 Worker 轮询,实时性差
  • AKKA 协议:功能强大但配置复杂,在 NAT 环境下需要复杂的网络配置

1.3 设计目标

  1. 支持 Worker 仅可出站访问的网络环境
  2. 实现 Server 与 Worker 的双向通讯
  3. 支持完整的节点间通讯矩阵Worker↔Server, Worker↔Worker, Server↔Server
  4. 保持高性能和低延迟
  5. 简化网络配置和部署复杂度

2. 总体设计

2.1 核心设计原则

连接复用 (Connection Reuse)

  • Worker 主动建立到 Server 的长连接
  • Server 通过注册机制维护 Worker 连接映射
  • Server 复用 Worker 建立的连接进行反向通讯

延迟连接 (Lazy Connection)

  • 节点启动时不立即建立所有连接
  • 首次通讯时才建立目标连接
  • 避免启动时的网络依赖和连接失败

统一架构 (Unified Architecture)

  • 所有节点都具备 Server 和 Client 双重能力
  • 统一的消息格式和处理流程
  • 支持任意节点间的直接通讯

2.2 架构图

                    PowerJob Mu Protocol Architecture
    
    ┌──────────────────────────────────────────────────────────────────┐
    │                        Application Layer                         │
    ├──────────────────────────────────────────────────────────────────┤
    │  Actor System (Handler Registration & Message Routing)          │
    ├──────────────────────────────────────────────────────────────────┤
    │                      Transport Layer                             │
    │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
    │  │MuTransporter│  │ChannelMgr   │  │ConnectionMgr│              │
    │  │             │  │             │  │             │              │
    │  │- tell()     │  │- Worker Reg │  │- Lazy Conn  │              │
    │  │- ask()      │  │- Ask/Resp   │  │- Conn Pool  │              │
    │  └─────────────┘  └─────────────┘  └─────────────┘              │
    ├──────────────────────────────────────────────────────────────────┤
    │                       Protocol Layer                             │
    │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
    │  │ MuMessage   │  │MuMessageCodec│  │Message Types│              │
    │  │             │  │             │  │             │              │
    │  │- Type       │  │- Encode     │  │- TELL       │              │
    │  │- RequestId  │  │- Decode     │  │- ASK        │              │
    │  │- Path       │  │- Length     │  │- RESPONSE   │              │
    │  │- Payload    │  │- JSON       │  │- HEARTBEAT  │              │
    │  └─────────────┘  └─────────────┘  └─────────────┘              │
    ├──────────────────────────────────────────────────────────────────┤
    │                      Network Layer                               │
    │                      Netty Framework                             │
    │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │
    │  │EventLoopGroup│  │ Bootstrap   │  │ChannelPipe  │              │
    │  │             │  │             │  │             │              │
    │  │- Boss       │  │- Server     │  │- Codec      │              │
    │  │- Worker     │  │- Client     │  │- Handler    │              │
    │  └─────────────┘  └─────────────┘  └─────────────┘              │
    └──────────────────────────────────────────────────────────────────┘

3. 详细设计

3.1 通讯流程设计

3.1.1 Worker 注册流程

sequenceDiagram
    participant W as Worker
    participant S as Server
    
    W->>S: 1. 建立TCP连接
    W->>S: 2. 发送HEARTBEAT消息 (包含Worker地址)
    S->>S: 3. 注册Worker连接到ChannelManager
    S->>S: 4. 建立 workerAddress -> Channel 映射
    
    Note over W,S: 连接建立完成,可以双向通讯
    
    W->>S: 5. 定期发送HEARTBEAT (保活)
    S->>W: 6. 可以通过已注册连接发送消息

3.1.2 延迟连接流程

sequenceDiagram
    participant W1 as Worker1
    participant W2 as Worker2
    
    W1->>W1: 1. 需要向Worker2发送消息
    W1->>W1: 2. 检查连接池,无现有连接
    W1->>W2: 3. 建立新的TCP连接
    W1->>W2: 4. 发送消息
    W1->>W1: 5. 连接加入连接池,供后续复用

3.2 核心组件设计

3.2.1 MuCSInitializer

public class MuCSInitializer implements CSInitializer {
    
    // 根据节点类型初始化不同服务
    public void init(CSInitializerConfig config) {
        if (config.getServerType() == ServerType.SERVER) {
            initServer();  // 启动Netty服务端 + 连接管理器
        } else {
            initWorker();  // 启动Netty服务端 + 连接管理器
        }
    }
    
    private void initServer() {
        // 1. 创建ServerHandler
        // 2. 启动Netty服务端监听
        // 3. 初始化连接管理器
    }
    
    private void initWorker() {
        // 1. 创建WorkerHandler  
        // 2. 启动Netty服务端监听 (支持Worker间通讯)
        // 3. 初始化连接管理器
    }
}

3.2.2 MuTransporter

public class MuTransporter implements Transporter {
    
    public void tell(URL url, PowerSerializable request) {
        if (当前节点是Worker) {
            // 使用连接管理器建立到目标的连接
            connectionManager.getOrCreateConnection(url.getAddress())
                .thenAccept(channel -> channel.writeAndFlush(message));
        } else {
            // Server端需要区分目标类型
            if (目标是Worker) {
                // 使用已注册的Worker连接
                Channel channel = channelManager.getWorkerChannel(url.getAddress());
                channel.writeAndFlush(message);
            } else {
                // 使用连接管理器连接到目标Server
                connectionManager.getOrCreateConnection(url.getAddress())
                    .thenAccept(channel -> channel.writeAndFlush(message));
            }
        }
    }
    
    public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) {
        // 1. 生成唯一的requestId
        // 2. 注册Future到ChannelManager
        // 3. 按照tell的逻辑发送ASK消息
        // 4. 返回Future等待响应
    }
}

3.2.3 ChannelManager

public class ChannelManager {
    
    // Worker地址到连接的映射 (Server端用)
    private final ConcurrentMap<String, Channel> workerChannels;
    
    // 请求ID到Future的映射 (Ask模式用)
    private final ConcurrentMap<String, CompletableFuture<Object>> pendingRequests;
    
    // 请求ID到响应类型的映射 (类型转换用)
    private final ConcurrentMap<String, Class<?>> requestResponseTypes;
    
    public void registerWorkerChannel(Address workerAddress, Channel channel) {
        String key = workerAddress.getHost() + ":" + workerAddress.getPort();
        workerChannels.put(key, channel);
        // 监听连接关闭,自动清理映射
    }
    
    public void completePendingRequest(String requestId, Object response) {
        CompletableFuture<Object> future = pendingRequests.remove(requestId);
        Class<?> responseType = requestResponseTypes.remove(requestId);
        
        // 类型转换解决LinkedHashMap问题
        Object convertedResponse = convertResponse(response, responseType);
        future.complete(convertedResponse);
    }
}

3.2.4 MuConnectionManager

public class MuConnectionManager {
    
    // 目标地址到连接的映射
    private final ConcurrentMap<String, Channel> connections;
    
    // 正在建立的连接
    private final ConcurrentMap<String, CompletableFuture<Channel>> pendingConnections;
    
    public CompletableFuture<Channel> getOrCreateConnection(Address targetAddress) {
        String key = targetAddress.getHost() + ":" + targetAddress.getPort();
        
        // 1. 检查现有连接
        Channel existingChannel = connections.get(key);
        if (existingChannel != null && existingChannel.isActive()) {
            return CompletableFuture.completedFuture(existingChannel);
        }
        
        // 2. 检查正在建立的连接
        CompletableFuture<Channel> pendingConnection = pendingConnections.get(key);
        if (pendingConnection != null) {
            return pendingConnection;
        }
        
        // 3. 建立新连接
        return createNewConnection(targetAddress);
    }
}

3.3 消息协议设计

3.3.1 消息格式

public class MuMessage implements PowerSerializable {
    private MessageType messageType;  // 消息类型
    private String requestId;         // 请求ID (Ask模式)
    private String path;              // 处理器路径
    private Address senderAddress;    // 发送方地址 (注册用)
    private Object payload;           // 消息载荷
    private String errorMessage;      // 错误信息
}

public enum MessageType {
    TELL,        // 单向消息
    ASK,         // 请求消息
    RESPONSE,    // 响应消息
    ERROR,       // 错误响应
    HEARTBEAT    // 心跳消息
}

3.3.2 编解码器

public class MuMessageCodec extends ByteToMessageCodec<MuMessage> {
    
    @Override
    protected void encode(ChannelHandlerContext ctx, MuMessage msg, ByteBuf out) {
        byte[] data = OBJECT_MAPPER.writeValueAsBytes(msg);
        out.writeInt(data.length);  // 长度前缀
        out.writeBytes(data);       // 消息内容
    }
    
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        if (in.readableBytes() < 4) return;  // 长度不足
        
        int length = in.readInt();
        if (in.readableBytes() < length) {
            in.resetReaderIndex();  // 重置读取位置
            return;
        }
        
        byte[] data = new byte[length];
        in.readBytes(data);
        MuMessage message = OBJECT_MAPPER.readValue(data, MuMessage.class);
        out.add(message);
    }
}

4. 关键技术方案

4.1 连接复用机制

问题描述

在单向网络环境中Server 无法主动连接到 Worker但需要向 Worker 发送消息。

解决方案

  1. Worker 主动注册Worker 启动时主动连接到 Server并发送包含自身地址的心跳消息
  2. 连接映射维护Server 维护 Worker 地址到连接通道的映射关系
  3. 反向通讯Server 需要向 Worker 发送消息时,从映射中查找对应的连接通道
// Worker端发送心跳注册
MuMessage heartbeat = new MuMessage(
    MessageType.HEARTBEAT, null, null,
    workerAddress,  // 关键携带Worker地址
    null, null
);
channel.writeAndFlush(heartbeat);

// Server端处理心跳并注册
public void handleHeartbeat(ChannelHandlerContext ctx, MuMessage msg) {
    if (msg.getSenderAddress() != null) {
        channelManager.registerWorkerChannel(msg.getSenderAddress(), ctx.channel());
    }
}

4.2 延迟连接机制

问题描述

节点启动时立即建立所有连接会导致:

  • 启动时间长
  • 网络故障影响启动
  • 不必要的资源占用

解决方案

  1. 按需连接:只在首次需要通讯时才建立连接
  2. 连接缓存:建立的连接保存在连接池中供后续复用
  3. 并发控制:避免同时建立到同一目标的多个连接
public CompletableFuture<Channel> getOrCreateConnection(Address targetAddress) {
    // 1. 检查缓存连接
    Channel cached = connectionCache.get(addressKey);
    if (cached != null && cached.isActive()) {
        return CompletableFuture.completedFuture(cached);
    }
    
    // 2. 检查正在建立的连接,避免重复建立
    CompletableFuture<Channel> pending = pendingConnections.get(addressKey);
    if (pending != null) {
        return pending;
    }
    
    // 3. 建立新连接
    return createNewConnection(targetAddress);
}

4.3 消息路由机制

问题描述

不同的通讯场景需要使用不同的连接方式,需要智能路由。

解决方案

基于调用方类型和目标类型的二维路由表:

public void routeMessage(URL url, MuMessage message) {
    if (currentNodeType == ServerType.WORKER) {
        // Worker作为发送方统一使用连接管理器
        connectionManager.getOrCreateConnection(url.getAddress())
            .thenAccept(channel -> channel.writeAndFlush(message));
    } else {
        // Server作为发送方根据目标类型选择策略
        if (url.getServerType() == ServerType.WORKER) {
            // 目标是Worker使用已注册的连接
            Channel workerChannel = channelManager.getWorkerChannel(url.getAddress());
            workerChannel.writeAndFlush(message);
        } else {
            // 目标是Server使用连接管理器
            connectionManager.getOrCreateConnection(url.getAddress())
                .thenAccept(channel -> channel.writeAndFlush(message));
        }
    }
}

4.4 类型转换机制

问题描述

Jackson 反序列化时可能将对象反序列化为 LinkedHashMap导致类型转换异常。

解决方案

  1. 类型映射Ask 请求时记录期望的响应类型
  2. 智能转换:响应时根据期望类型进行转换
// 发送Ask请求时记录类型
public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) {
    String requestId = UUID.randomUUID().toString();
    channelManager.registerPendingRequest(requestId, future, clz);  // 记录期望类型
    // ... 发送消息
}

// 接收响应时转换类型
public void completePendingRequest(String requestId, Object response) {
    Class<?> expectedType = requestResponseTypes.remove(requestId);
    Object convertedResponse = JsonUtils.toJavaObject(response, expectedType);
    future.complete(convertedResponse);
}

5. 性能优化

5.1 连接池管理

  • 连接复用:相同目标的多次请求复用连接
  • 连接清理:自动检测和清理失效连接
  • 连接限制:控制最大连接数,避免资源耗尽

5.2 异步处理

  • 非阻塞IO基于Netty的异步IO模型
  • 事件驱动:消息处理采用事件驱动方式
  • 线程池优化合理配置EventLoopGroup大小

5.3 内存管理

  • 零拷贝利用Netty的零拷贝特性
  • 对象池复用消息对象减少GC压力
  • 缓冲区管理:合理设置接收和发送缓冲区大小

6. 容错设计

6.1 连接故障处理

// 连接断开监听
channel.closeFuture().addListener(future -> {
    // 清理连接映射
    connectionCache.remove(addressKey);
    workerChannels.remove(workerKey);
    
    // 失败所有待处理的请求
    failPendingRequests(channel);
});

// 自动重连
public CompletableFuture<Channel> reconnect(Address address) {
    return CompletableFuture
        .runAsync(() -> Thread.sleep(retryInterval))
        .thenCompose(v -> createConnection(address));
}

6.2 消息超时处理

// Ask请求超时 (JDK8兼容方式)
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.schedule(() -> {
    if (!future.isDone()) {
        channelManager.removePendingRequest(requestId);
        future.completeExceptionally(new TimeoutException("Request timeout"));
    }
}, timeout, TimeUnit.SECONDS);

6.3 异常传播

  • 网络异常:连接失败、超时等网络层异常
  • 协议异常:消息格式错误、编解码异常
  • 业务异常:处理器执行异常

7. 监控与运维

7.1 关键指标

  • 连接数统计:活跃连接数、总连接数
  • 消息统计:发送/接收消息数、消息大小
  • 性能指标:响应时间、吞吐量
  • 错误统计:连接失败、消息失败、超时次数

7.2 日志记录

// 连接事件
log.info("[MuConnectionManager] Connected to {}", targetAddress);
log.warn("[MuConnectionManager] Connection failed to {}", targetAddress);

// 消息事件  
log.debug("[MuTransporter] Sent {} message to {}", messageType, url);
log.error("[MuHandler] Failed to process message", exception);

7.3 健康检查

  • 连接健康:定期检查连接状态
  • 心跳监控监控Worker心跳状态
  • 性能监控:监控关键性能指标

8. 测试方案

8.1 单元测试

  • 组件测试:各核心组件的独立测试
  • 协议测试:消息编解码正确性测试
  • 异常测试:各种异常场景的处理测试

8.2 集成测试

  • 通讯测试:各种通讯场景的端到端测试
  • 故障测试:网络故障、节点故障的恢复测试
  • 性能测试:高并发、大数据量的性能测试

8.3 场景测试

// Worker到Server通讯测试
@Test
public void testWorkerToServerCommunication() {
    // 1. 启动Server
    // 2. 启动Worker并连接到Server
    // 3. Worker发送消息到Server
    // 4. 验证Server收到消息
}

// 网络故障恢复测试
@Test  
public void testNetworkFailureRecovery() {
    // 1. 建立正常连接
    // 2. 模拟网络中断
    // 3. 恢复网络连接
    // 4. 验证通讯自动恢复
}

9. 部署指南

9.1 网络配置

# Server配置
server:
  host: 0.0.0.0        # 监听所有网卡
  port: 7700           # 监听端口
  external_host: 公网IP # 外部访问地址

# Worker配置  
worker:
  host: 0.0.0.0        # 本地监听地址
  port: 27777          # 本地监听端口
  server_host: 公网IP   # Server地址
  server_port: 7700    # Server端口

9.2 防火墙配置

# Server端开放监听端口
iptables -A INPUT -p tcp --dport 7700 -j ACCEPT

# Worker端确保可以访问Server端口
# 通常不需要额外配置,确保出站不受限即可

9.3 高可用部署

  • Server集群多个Server实例负载均衡
  • Worker多连接Worker配置多个Server地址
  • 故障转移:自动检测和切换故障节点

10. 总结

PowerJob Mu 协议通过创新的连接复用和延迟连接机制,成功解决了单向网络环境下的双向通讯问题。主要技术特点包括:

  1. 网络适应性:专为受限网络环境设计,无需复杂的网络配置
  2. 高性能基于Netty的异步IO支持高并发和低延迟
  3. 可靠性:完善的故障检测和恢复机制
  4. 扩展性:统一的架构设计,支持多种通讯场景
  5. 易部署:简化的配置和部署流程

该协议特别适合云原生、容器化部署以及混合云环境为PowerJob在复杂网络环境下提供了可靠的通讯基础。


PowerJob Mu Protocol Technical Design
Version: 1.0
Date: January 2025