---初始化项目

This commit is contained in:
2025-09-19 16:14:08 +08:00
parent 902d3d7e3b
commit afee7c03ac
767 changed files with 75809 additions and 82 deletions

48
powerjob-remote/pom.xml Normal file
View File

@ -0,0 +1,48 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>powerjob</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<modules>
<module>powerjob-remote-framework</module>
<module>powerjob-remote-impl-http</module>
<module>powerjob-remote-impl-akka</module>
<module>powerjob-remote-impl-mu</module>
</modules>
<artifactId>powerjob-remote</artifactId>
<name>powerjob-remote</name>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<junit.version>5.9.0</junit.version>
<logback.version>1.2.13</logback.version>
</properties>
<dependencies>
<!-- Junit tests -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!-- log for test stage -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>${logback.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<version>5.1.2</version>
<artifactId>powerjob-remote-framework</artifactId>
<name>powerjob-remote-framework</name>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<powerjob-common.version>5.1.2</powerjob-common.version>
<reflections.version>0.10.2</reflections.version>
</properties>
<dependencies>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-common</artifactId>
<version>${powerjob-common.version}</version>
</dependency>
<dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>${reflections.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,94 @@
package tech.powerjob.remote.framework;
import lombok.Data;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.remote.framework.actor.Handler;
import java.util.Optional;
/**
* 基准测试
*
* @author tjq
* @since 2023/1/1
*/
@Slf4j
@Actor(path = "benchmark")
public class BenchmarkActor {
@Handler(path = "standard")
public BenchmarkResponse standardRequest(BenchmarkRequest request) {
long startTs = System.currentTimeMillis();
log.info("[BenchmarkActor] [standardRequest] receive request: {}", request);
BenchmarkResponse response = new BenchmarkResponse()
.setSuccess(true)
.setContent(request.getContent())
.setProcessThread(Thread.currentThread().getName())
.setServerReceiveTs(System.currentTimeMillis());
if (request.getResponseSize() != null && request.getResponseSize() > 0) {
response.setExtra(RandomStringUtils.randomPrint(request.getResponseSize()));
}
executeSleep(request);
response.setServerCost(System.currentTimeMillis() - startTs);
return response;
}
@Handler(path = "emptyReturn")
public void emptyReturn(BenchmarkRequest request) {
log.info("[BenchmarkActor] [emptyReturn] receive request: {}", request);
executeSleep(request);
}
@Handler(path = "stringReturn")
public String stringReturn(BenchmarkRequest request) {
log.info("[BenchmarkActor] [stringReturn] receive request: {}", request);
executeSleep(request);
return RandomStringUtils.randomPrint(Optional.ofNullable(request.getResponseSize()).orElse(100));
}
private static void executeSleep(BenchmarkRequest request) {
if (request.getBlockingMills() != null && request.getBlockingMills() > 0) {
CommonUtils.easySleep(request.getBlockingMills());
}
}
@Data
@Accessors(chain = true)
public static class BenchmarkRequest implements PowerSerializable {
/**
* 请求内容
*/
private String content;
/**
* 期望的响应大小,可空
*/
private Integer responseSize;
/**
* 阻塞时间,模拟 IO 耗时
*/
private Integer blockingMills;
}
@Data
@Accessors(chain = true)
public static class BenchmarkResponse implements PowerSerializable {
private boolean success;
/**
* 原路返回原来的 content
*/
private String content;
private String processThread;
private long serverReceiveTs;
private long serverCost;
private String extra;
}
}

View File

@ -0,0 +1,21 @@
package tech.powerjob.remote.framework.actor;
import java.lang.annotation.*;
/**
* 行为处理器
*
* @author tjq
* @since 2022/12/31
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Actor {
/**
* root path
* @return root path
*/
String path();
}

View File

@ -0,0 +1,27 @@
package tech.powerjob.remote.framework.actor;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import java.util.List;
/**
* ActorInfo
*
* @author tjq
* @since 2022/12/31
*/
@Getter
@Setter
@Accessors(chain = true)
public class ActorInfo {
private Object actor;
private Actor anno;
private List<HandlerInfo> handlerInfos;
}

View File

@ -0,0 +1,27 @@
package tech.powerjob.remote.framework.actor;
import java.lang.annotation.*;
/**
* Handler
*
* @author tjq
* @since 2022/12/31
*/
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Handler {
/**
* handler path
* @return handler path
*/
String path();
/**
* 处理类型
* @return 阻塞 or 非阻塞
*/
ProcessType processType() default ProcessType.BLOCKING;
}

View File

@ -0,0 +1,34 @@
package tech.powerjob.remote.framework.actor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import tech.powerjob.remote.framework.base.HandlerLocation;
import java.io.Serializable;
import java.lang.reflect.Method;
/**
* HandlerInfo
*
* @author tjq
* @since 2022/12/31
*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class HandlerInfo {
private HandlerLocation location;
/**
* handler 对应的方法
*/
private Method method;
/**
* Handler 注解携带的信息
*/
private Handler anno;
}

View File

@ -0,0 +1,20 @@
package tech.powerjob.remote.framework.actor;
/**
* 处理器类型
*
* @author tjq
* @since 2023/1/1
*/
public enum ProcessType {
/**
* 阻塞式
*/
BLOCKING,
/**
* 非阻塞式
*/
NO_BLOCKING
}

View File

@ -0,0 +1,27 @@
package tech.powerjob.remote.framework.actor;
import lombok.extern.slf4j.Slf4j;
/**
* 内置一个用来通用测试的 TestActor
*
* @author tjq
* @since 2022/12/31
*/
@Slf4j
@Actor(path = "test")
public class TestActor {
public static void simpleStaticMethod() {
}
public void simpleMethod() {
}
@Handler(path = "method1")
public String handlerMethod1() {
log.info("[TestActor] handlerMethod1");
return "1";
}
}

View File

@ -0,0 +1,41 @@
package tech.powerjob.remote.framework.base;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* 地址
*
* @author tjq
* @since 2022/12/31
*/
@Getter
@Setter
@Accessors(chain = true)
public class Address implements Serializable {
private String host;
private int port;
public String toFullAddress() {
return toFullAddress(host, port);
}
public static Address fromIpv4(String ipv4) {
String[] split = ipv4.split(":");
return new Address()
.setHost(split[0])
.setPort(Integer.parseInt(split[1]));
}
public static String toFullAddress(String host, int port) {
return String.format("%s:%d", host, port);
}
@Override
public String toString() {
return toFullAddress();
}
}

View File

@ -0,0 +1,33 @@
package tech.powerjob.remote.framework.base;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* handler location
*
* @author tjq
* @since 2022/12/31
*/
@Getter
@Setter
@ToString
@Accessors(chain = true)
public class HandlerLocation implements Serializable {
/**
* 根路径
*/
private String rootPath;
/**
* 方法路径
*/
private String methodPath;
public String toPath() {
return String.format("/%s/%s", rootPath, methodPath);
}
}

View File

@ -0,0 +1,18 @@
package tech.powerjob.remote.framework.base;
/**
* RemotingException
*
* @author tjq
* @since 2022/12/31
*/
public class RemotingException extends RuntimeException {
public RemotingException(String message) {
super(message);
}
public RemotingException(String message, Throwable throwable) {
super(message, throwable);
}
}

View File

@ -0,0 +1,12 @@
package tech.powerjob.remote.framework.base;
/**
* 服务器类型类型
*
* @author tjq
* @since 2022/12/31
*/
public enum ServerType {
SERVER,
WORKER
}

View File

@ -0,0 +1,32 @@
package tech.powerjob.remote.framework.base;
import lombok.Data;
import lombok.experimental.Accessors;
import java.io.Serializable;
/**
* URL
*
* @author tjq
* @since 2022/12/31
*/
@Data
@Accessors(chain = true)
public class URL implements Serializable {
/**
* 调用的集群类型(用于兼容 AKKA 等除了IP还需要指定 system 访问的情况)
*/
private ServerType serverType;
/**
* remote address
*/
private Address address;
/**
* location
*/
private HandlerLocation location;
}

View File

@ -0,0 +1,43 @@
package tech.powerjob.remote.framework.cs;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.transporter.Transporter;
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
/**
* client & server initializer
*
* @author MuBao
* @since 2022/12/31
*/
public interface CSInitializer {
/**
* 类型名称,比如 akka, netty4httpJson
* @return 名称
*/
String type();
/**
* initialize the framework
* @param config config
*/
void init(CSInitializerConfig config);
/**
* build a Transporter by based network framework
* @return Transporter
*/
Transporter buildTransporter();
/**
* bind Actor, publish handler's service
* @param actorInfos actor infos
*/
void bindHandlers(List<ActorInfo> actorInfos);
void close() throws IOException;
}

View File

@ -0,0 +1,32 @@
package tech.powerjob.remote.framework.cs;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.ServerType;
import java.io.Serializable;
/**
* CSInitializerConfig
*
* @author tjq
* @since 2022/12/31
*/
@Getter
@Setter
@Accessors(chain = true)
public class CSInitializerConfig implements Serializable {
/**
* 需要绑定的地址(本地)
*/
private Address bindAddress;
/**
* 外部地址(需要 NAT 等情况存在)
*/
private Address externalAddress;
private ServerType serverType;
}

View File

@ -0,0 +1,41 @@
package tech.powerjob.remote.framework.engine;
import lombok.Data;
import lombok.experimental.Accessors;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.ServerType;
import java.io.Serializable;
import java.util.List;
/**
* EngineConfig
*
* @author tjq
* @since 2022/12/31
*/
@Data
@Accessors(chain = true)
public class EngineConfig implements Serializable {
/**
* 服务类型
*/
private ServerType serverType;
/**
* 需要启动的引擎类型
*/
private String type;
/**
* 绑定的本地地址
*/
private Address bindAddress;
/**
* 外部地址(需要 NAT 等情况存在)
*/
private Address externalAddress;
/**
* actor实例交由使用侧自己实例化以便自行注入各种 bean
*/
private List<Object> actorList;
}

View File

@ -0,0 +1,18 @@
package tech.powerjob.remote.framework.engine;
import lombok.Getter;
import lombok.Setter;
import tech.powerjob.remote.framework.transporter.Transporter;
/**
* 引擎输出
*
* @author tjq
* @since 2022/12/31
*/
@Getter
@Setter
public class EngineOutput {
private Transporter transporter;
}

View File

@ -0,0 +1,16 @@
package tech.powerjob.remote.framework.engine;
import java.io.IOException;
/**
* RemoteEngine
*
* @author tjq
* @since 2022/12/31
*/
public interface RemoteEngine {
EngineOutput start(EngineConfig engineConfig);
void close() throws IOException;
}

View File

@ -0,0 +1,89 @@
package tech.powerjob.remote.framework.engine.impl;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.remote.framework.actor.Actor;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.actor.Handler;
import tech.powerjob.remote.framework.actor.HandlerInfo;
import tech.powerjob.remote.framework.base.HandlerLocation;
import java.lang.reflect.Method;
import java.util.List;
/**
* load all Actor
*
* @author tjq
* @since 2022/12/31
*/
@Slf4j
class ActorFactory {
static List<ActorInfo> load(List<Object> actorList) {
List<ActorInfo> actorInfos = Lists.newArrayList();
actorList.forEach(actor -> {
final Class<?> clz = actor.getClass();
try {
final Actor anno = clz.getAnnotation(Actor.class);
ActorInfo actorInfo = new ActorInfo().setActor(actor).setAnno(anno);
actorInfo.setHandlerInfos(loadHandlerInfos4Actor(actorInfo));
actorInfos.add(actorInfo);
} catch (Throwable t) {
log.error("[ActorFactory] process Actor[{}] failed!", clz);
ExceptionUtils.rethrow(t);
}
});
return actorInfos;
}
private static List<HandlerInfo> loadHandlerInfos4Actor(ActorInfo actorInfo) {
List<HandlerInfo> ret = Lists.newArrayList();
Actor anno = actorInfo.getAnno();
String rootPath = anno.path();
Object actor = actorInfo.getActor();
findHandlerMethod(rootPath, actor.getClass(), ret);
return ret;
}
private static void findHandlerMethod(String rootPath, Class<?> clz, List<HandlerInfo> result) {
Method[] declaredMethods = clz.getDeclaredMethods();
for (Method handlerMethod: declaredMethods) {
Handler handlerMethodAnnotation = handlerMethod.getAnnotation(Handler.class);
if (handlerMethodAnnotation == null) {
continue;
}
HandlerLocation handlerLocation = new HandlerLocation()
.setRootPath(suitPath(rootPath))
.setMethodPath(suitPath(handlerMethodAnnotation.path()));
HandlerInfo handlerInfo = new HandlerInfo()
.setAnno(handlerMethodAnnotation)
.setMethod(handlerMethod)
.setLocation(handlerLocation);
result.add(handlerInfo);
}
// 递归处理父类
final Class<?> superclass = clz.getSuperclass();
if (superclass != null) {
findHandlerMethod(rootPath, superclass, result);
}
}
static String suitPath(String path) {
if (path.startsWith("/")) {
return path.replaceFirst("/", "");
}
return path;
}
}

View File

@ -0,0 +1,119 @@
package tech.powerjob.remote.framework.engine.impl;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.reflections.Reflections;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.remote.framework.cs.CSInitializer;
import java.util.Optional;
import java.util.Set;
/**
* build CSInitializer
*
* @author tjq
* @since 2022/12/31
*/
@Slf4j
class CSInitializerFactory {
private static final String OFFICIAL_HTTP_CS_INITIALIZER = "tech.powerjob.remote.http.HttpVertxCSInitializer";
/**
* 未来底层框架摆脱 vertx 时可能会用这个 classnameor 开发者自己实现的 http 协议也可以用这个 classname总之预留战未来
*/
private static final String OFFICIAL_HTTP_CS_INITIALIZER2 = "tech.powerjob.remote.http.HttpCSInitializer";
private static final String OFFICIAL_AKKA_CS_INITIALIZER = "tech.powerjob.remote.akka.AkkaCSInitializer";
private static final String OFFICIAL_MU_CS_INITIALIZER = "tech.powerjob.remote.mu.MuCSInitializer";
private static final String EXTEND_CS_INITIALIZER_PATTERN = "tech.powerjob.remote.%s.CSInitializer";
static CSInitializer build(String targetType) {
CSInitializer officialCSInitializer = tryLoadCSInitializerByClassName(targetType);
if (officialCSInitializer != null) {
return officialCSInitializer;
}
log.info("[CSInitializerFactory] try load CSInitializerFactory by name failed, start to use Reflections!");
// JAVA SPI 机制太笨了,短期内继续保留 Reflections 官网下高版本兼容性
Reflections reflections = new Reflections(OmsConstant.PACKAGE);
Set<Class<? extends CSInitializer>> cSInitializerClzSet = reflections.getSubTypesOf(CSInitializer.class);
log.info("[CSInitializerFactory] scan subTypeOf CSInitializer: {}", cSInitializerClzSet);
for (Class<? extends CSInitializer> clz : cSInitializerClzSet) {
try {
CSInitializer csInitializer = clz.getDeclaredConstructor().newInstance();
String type = csInitializer.type();
log.info("[CSInitializerFactory] new instance for CSInitializer[{}] successfully, type={}, object: {}", clz, type, csInitializer);
if (targetType.equalsIgnoreCase(type)) {
return csInitializer;
}
} catch (Exception e) {
log.error("[CSInitializerFactory] new instance for CSInitializer[{}] failed, maybe you should provide a non-parameter constructor", clz);
ExceptionUtils.rethrow(e);
}
}
throw new PowerJobException(String.format("can't load CSInitializer[%s], ensure your package name start with 'tech.powerjob' and import the dependencies!", targetType));
}
/**
* 官方组件直接使用固定类名尝试加载,确保 reflections 不兼容情况下,至少能使用官方通讯协议
* @param targetType 协议类型
* @return CSInitializer
*/
private static CSInitializer tryLoadCSInitializerByClassName(String targetType) {
if (Protocol.HTTP.name().equalsIgnoreCase(targetType)) {
Optional<CSInitializer> httpCsIOpt = tryLoadCSInitializerByClzName(OFFICIAL_HTTP_CS_INITIALIZER);
if (httpCsIOpt.isPresent()) {
return httpCsIOpt.get();
}
Optional<CSInitializer> httpCsIOpt2 = tryLoadCSInitializerByClzName(OFFICIAL_HTTP_CS_INITIALIZER2);
if (httpCsIOpt2.isPresent()) {
return httpCsIOpt2.get();
}
}
if (Protocol.AKKA.name().equalsIgnoreCase(targetType)) {
Optional<CSInitializer> akkaCSIOpt = tryLoadCSInitializerByClzName(OFFICIAL_AKKA_CS_INITIALIZER);
if (akkaCSIOpt.isPresent()) {
return akkaCSIOpt.get();
}
}
if (Protocol.MU.name().equalsIgnoreCase(targetType)) {
Optional<CSInitializer> muCSIOpt = tryLoadCSInitializerByClzName(OFFICIAL_MU_CS_INITIALIZER);
if (muCSIOpt.isPresent()) {
return muCSIOpt.get();
}
}
// 尝试加载按规范命名的处理器,比如使用方自定义了 http2 协议,将其类名定为 tech.powerjob.remote.http2.CSInitializer 依然可确保在 Reflections 不可用的情况下完成加载
String clz = String.format(EXTEND_CS_INITIALIZER_PATTERN, targetType);
Optional<CSInitializer> extOpt = tryLoadCSInitializerByClzName(clz);
return extOpt.orElse(null);
}
private static Optional<CSInitializer> tryLoadCSInitializerByClzName(String clzName) {
try {
log.info("[CSInitializerFactory] try to load CSInitializer by classname: {}", clzName);
Class<?> clz = Class.forName(clzName);
CSInitializer o = (CSInitializer) clz.getDeclaredConstructor().newInstance();
log.info("[CSInitializerFactory] load CSInitializer[{}] successfully, obj: {}", clzName, o);
return Optional.of(o);
} catch (ClassNotFoundException ce) {
log.warn("[CSInitializerFactory] load CSInitializer by classname[{}] failed due to ClassNotFound: {}", clzName, ExceptionUtils.getMessage(ce));
} catch (Exception e) {
log.warn("[CSInitializerFactory] load CSInitializer by classname[{}] failed.", clzName, e);
}
return Optional.empty();
}
}

View File

@ -0,0 +1,79 @@
package tech.powerjob.remote.framework.engine.impl;
import com.google.common.base.Stopwatch;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.utils.SysUtils;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.actor.TestActor;
import tech.powerjob.remote.framework.cs.CSInitializer;
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
import tech.powerjob.remote.framework.engine.EngineConfig;
import tech.powerjob.remote.framework.engine.EngineOutput;
import tech.powerjob.remote.framework.engine.RemoteEngine;
import tech.powerjob.remote.framework.transporter.Transporter;
import java.io.IOException;
import java.util.List;
/**
* 初始化 PowerJob 整个网络层
*
* @author tjq
* @since 2022/12/31
*/
@Slf4j
public class PowerJobRemoteEngine implements RemoteEngine {
private CSInitializer csInitializer;
@Override
public EngineOutput start(EngineConfig engineConfig) {
reConfig(engineConfig);
final String engineType = engineConfig.getType();
EngineOutput engineOutput = new EngineOutput();
log.info("[PowerJobRemoteEngine] [{}] start remote engine with config: {}", engineType, engineConfig);
List<ActorInfo> actorInfos = ActorFactory.load(engineConfig.getActorList());
csInitializer = CSInitializerFactory.build(engineType);
String type = csInitializer.type();
Stopwatch sw = Stopwatch.createStarted();
log.info("[PowerJobRemoteEngine] [{}] try to startup CSInitializer[type={}]", engineType, type);
csInitializer.init(new CSInitializerConfig()
.setBindAddress(engineConfig.getBindAddress())
.setExternalAddress(engineConfig.getExternalAddress())
.setServerType(engineConfig.getServerType())
);
// 构建通讯器
Transporter transporter = csInitializer.buildTransporter();
engineOutput.setTransporter(transporter);
log.info("[PowerJobRemoteEngine] [{}] start to bind Handler", engineType);
actorInfos.forEach(actor -> actor.getHandlerInfos().forEach(handlerInfo -> log.info("[PowerJobRemoteEngine] [{}] PATH={}, handler={}", engineType, handlerInfo.getLocation().toPath(), handlerInfo.getMethod())));
// 绑定 handler
csInitializer.bindHandlers(actorInfos);
log.info("[PowerJobRemoteEngine] [{}] startup successfully, cost: {}", engineType, sw);
return engineOutput;
}
@Override
public void close() throws IOException {
csInitializer.close();
}
private static void reConfig(EngineConfig engineConfig) {
boolean testEnv = SysUtils.isTestEnv();
if (testEnv) {
log.info("[PowerJobRemoteEngine] add 'TestActor' due to current is test env");
engineConfig.getActorList().add(new TestActor());
}
}
}

View File

@ -0,0 +1,7 @@
/**
* PowerJob 网络框架层
*
* @author tjq
* @since 2022/12/31
*/
package tech.powerjob.remote.framework;

View File

@ -0,0 +1,16 @@
package tech.powerjob.remote.framework.transporter;
/**
* 通讯协议
*
* @author tjq
* @since 2022/12/31
*/
public interface Protocol {
/**
* 通讯协议名称
* @return 协议名称
*/
String name();
}

View File

@ -0,0 +1,40 @@
package tech.powerjob.remote.framework.transporter;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.URL;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
/**
* 通讯器,封装与远程服务端交互逻辑
*
* @author tjq
* @since 2022/12/31
*/
public interface Transporter {
/**
* Protocol
* @return return protocol
*/
Protocol getProtocol();
/**
*send message
* @param url url
* @param request request
*/
void tell(URL url, PowerSerializable request);
/**
* ask by request
* @param url url
* @param request request
* @param clz response type
* @return CompletionStage
* @throws RemotingException remote exception
*/
<T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException;
}

View File

@ -0,0 +1,35 @@
package tech.powerjob.remote.framework.utils;
import org.apache.commons.lang3.ArrayUtils;
import tech.powerjob.common.PowerSerializable;
import java.util.Optional;
/**
* RemoteUtils
*
* @author tjq
* @since 2023/1/1
*/
public class RemoteUtils {
public static Optional<Class<?>> findPowerSerialize(Class<?>[] parameterTypes) {
if (ArrayUtils.isEmpty(parameterTypes)) {
return Optional.empty();
}
for (Class<?> clz : parameterTypes) {
final Class<?>[] interfaces = clz.getInterfaces();
if (ArrayUtils.isEmpty(interfaces)) {
continue;
}
if (PowerSerializable.class.isAssignableFrom(clz)) {
return Optional.of(clz);
}
}
return Optional.empty();
}
}

View File

@ -0,0 +1,24 @@
package tech.powerjob.remote.framework.base;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
/**
* test address
*
* @author tjq
* @since 2023/1/20
*/
@Slf4j
class AddressTest {
@Test
void testAddress() {
String ip = "192.168.1.1:10085";
final Address address = Address.fromIpv4(ip);
log.info("[AddressTest] parse address: {}", address);
assert ip.equals(address.toFullAddress());
}
}

View File

@ -0,0 +1,28 @@
package tech.powerjob.remote.framework.engine;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.Test;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
import static org.junit.jupiter.api.Assertions.*;
/**
* RemoteEngineTest
*
* @author tjq
* @since 2022/12/31
*/
class RemoteEngineTest {
@Test
void start() {
RemoteEngine remoteEngine = new PowerJobRemoteEngine();
EngineConfig engineConfig = new EngineConfig();
engineConfig.setType("TEST");
engineConfig.setBindAddress(new Address().setHost("127.0.0.1").setPort(10086));
remoteEngine.start(engineConfig);
}
}

View File

@ -0,0 +1,30 @@
package tech.powerjob.remote.framework.engine.impl;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import tech.powerjob.remote.framework.actor.TestActor;
/**
* HandlerFactoryTest
*
* @author tjq
* @since 2022/12/31
*/
@Slf4j
class ActorFactoryTest {
@Test
void load() {
ActorFactory.load(Lists.newArrayList(new TestActor()));
}
@Test
void testSuitPath() {
final String testPath1 = ActorFactory.suitPath("/test");
final String testPath2 = ActorFactory.suitPath("test");
log.info("[ActorFactoryTest] testPath1: {}, testPath2: {}", testPath1, testPath2);
assert testPath1.equals(testPath2);
}
}

View File

@ -0,0 +1,29 @@
package tech.powerjob.remote.framework.engine.impl;
import com.google.common.collect.Sets;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import tech.powerjob.common.exception.PowerJobException;
import static org.junit.jupiter.api.Assertions.*;
/**
* CSInitializerFactoryTest
*
* @author tjq
* @since 2022/12/31
*/
class CSInitializerFactoryTest {
@Test
void testBuildNormal() {
CSInitializerFactory.build("TEST");
}
@Test
void testNotFind() {
Assertions.assertThrows(PowerJobException.class, () -> {
CSInitializerFactory.build("omicron");
});
}
}

View File

@ -0,0 +1,46 @@
package tech.powerjob.remote.framework.test;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.actor.HandlerInfo;
import tech.powerjob.remote.framework.cs.CSInitializer;
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
import tech.powerjob.remote.framework.transporter.Transporter;
import java.io.IOException;
import java.util.List;
/**
* TestCSInitializer
*
* @author tjq
* @since 2022/12/31
*/
@Slf4j
public class TestCSInitializer implements CSInitializer {
@Override
public String type() {
return "TEST";
}
@Override
public void init(CSInitializerConfig config) {
log.info("TestCSInitializer#init");
}
@Override
public Transporter buildTransporter() {
log.info("TestCSInitializer#buildTransporter");
return null;
}
@Override
public void bindHandlers(List<ActorInfo> actorInfos) {
log.info("TestCSInitializer#actorInfos");
}
@Override
public void close() throws IOException {
log.info("TestCSInitializer#close");
}
}

View File

@ -0,0 +1,35 @@
package tech.powerjob.remote.framework.utils;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import tech.powerjob.common.model.AlarmConfig;
import tech.powerjob.common.request.ServerScheduleJobReq;
import java.util.Optional;
/**
* RemoteUtilsTest
*
* @author tjq
* @since 2023/1/1
*/
@Slf4j
class RemoteUtilsTest {
@Test
void findPowerSerialize() {
Class<?>[] contains = {AlarmConfig.class, ServerScheduleJobReq.class};
Class<?>[] notContains = {AlarmConfig.class};
final Optional<Class<?>> notContainsResult = RemoteUtils.findPowerSerialize(notContains);
log.info("[RemoteUtilsTest] notContainsResult: {}", notContainsResult);
final Optional<Class<?>> containsResult = RemoteUtils.findPowerSerialize(contains);
log.info("[RemoteUtilsTest] containsResult: {}", containsResult);
assert !notContainsResult.isPresent();
assert containsResult.isPresent();
}
}

View File

@ -0,0 +1,46 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-remote-impl-akka</artifactId>
<name>powerjob-remote-impl-akka</name>
<version>5.1.2</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<powerjob-remote-framework.version>5.1.2</powerjob-remote-framework.version>
<akka.version>2.6.13</akka.version>
</properties>
<dependencies>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-remote-framework</artifactId>
<version>${powerjob-remote-framework.version}</version>
</dependency>
<!-- akka remote -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.13</artifactId>
<version>${akka.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,110 @@
package tech.powerjob.remote.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.DeadLetter;
import akka.actor.Props;
import akka.routing.RoundRobinPool;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.common.utils.SysUtils;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.cs.CSInitializer;
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
import tech.powerjob.remote.framework.transporter.Transporter;
import java.io.IOException;
import java.util.List;
import java.util.Map;
/**
* AkkaCSInitializer
*
* @author tjq
* @since 2022/12/31
*/
@Slf4j
public class AkkaCSInitializer implements CSInitializer {
private ActorSystem actorSystem;
private CSInitializerConfig config;
@Override
public String type() {
return tech.powerjob.common.enums.Protocol.AKKA.name();
}
@Override
public void init(CSInitializerConfig config) {
this.config = config;
Address bindAddress = config.getBindAddress();
log.info("[PowerJob-AKKA] bindAddress: {}", bindAddress);
// 初始化 ActorSystemmacOS上 new ServerSocket 检测端口占用的方法并不生效可能是AKKA是Scala写的缘故没办法...只能靠异常重试了)
Map<String, Object> overrideConfig = Maps.newHashMap();
Address externalAddress = config.getExternalAddress();
if (externalAddress == null || StringUtils.equalsIgnoreCase(externalAddress.toFullAddress(), bindAddress.toFullAddress())) {
overrideConfig.put("akka.remote.artery.canonical.hostname", bindAddress.getHost());
overrideConfig.put("akka.remote.artery.canonical.port", bindAddress.getPort());
log.info("[PowerJob-AKKA] not exist externalIp, overrideConfig: {}", overrideConfig);
} else {
overrideConfig.put("akka.remote.artery.canonical.hostname", externalAddress.getHost());
overrideConfig.put("akka.remote.artery.canonical.port", externalAddress.getPort());
overrideConfig.put("akka.remote.artery.bind.hostname", "0.0.0.0");
overrideConfig.put("akka.remote.artery.bind.port", bindAddress.getPort());
log.info("[PowerJob-AKKA] exist externalAddress[{}], final overrideConfig: {}", externalAddress, overrideConfig);
}
Config akkaBasicConfig = ConfigFactory.load(AkkaConstant.AKKA_CONFIG);
Config akkaFinalConfig = ConfigFactory.parseMap(overrideConfig).withFallback(akkaBasicConfig);
log.info("[PowerJob-AKKA] try to start AKKA System.");
// 启动时绑定当前的 actorSystemName
String actorSystemName = AkkaConstant.fetchActorSystemName(config.getServerType());
this.actorSystem = ActorSystem.create(actorSystemName, akkaFinalConfig);
// 处理系统中产生的异常情况
ActorRef troubleshootingActor = actorSystem.actorOf(Props.create(AkkaTroubleshootingActor.class), "troubleshooting");
actorSystem.eventStream().subscribe(troubleshootingActor, DeadLetter.class);
log.info("[PowerJob-AKKA] initialize actorSystem[{}] successfully!", actorSystem.name());
}
@Override
public Transporter buildTransporter() {
return new AkkaTransporter(actorSystem);
}
@Override
public void bindHandlers(List<ActorInfo> actorInfos) {
int cores = SysUtils.availableProcessors();
actorInfos.forEach(actorInfo -> {
String rootPath = actorInfo.getAnno().path();
AkkaMappingService.ActorConfig actorConfig = AkkaMappingService.parseActorName(rootPath);
log.info("[PowerJob-AKKA] start to process actor[path={},config={}]", rootPath, JsonUtils.toJSONString(actorConfig));
actorSystem.actorOf(AkkaProxyActor.props(actorInfo)
.withDispatcher("akka.".concat(actorConfig.getDispatcherName()))
.withRouter(new RoundRobinPool(cores)), actorConfig.getActorName());
});
}
@Override
public void close() throws IOException {
actorSystem.terminate();
}
}

View File

@ -0,0 +1,29 @@
package tech.powerjob.remote.akka;
import tech.powerjob.remote.framework.base.ServerType;
/**
* AkkaConstant
*
* @author tjq
* @since 2022/12/31
*/
public class AkkaConstant {
public static final String AKKA_CONFIG = "powerjob.akka.conf";
public static final String WORKER_ACTOR_SYSTEM_NAME = "oms";
public static final String SERVER_ACTOR_SYSTEM_NAME = "oms-server";
/**
* 获取 actorSystem 名称
* @param serverType 当前服务器类型powerjob-server 为 serverpowerjob-worker 为 worker
* @return actorSystemName
*/
public static String fetchActorSystemName(ServerType serverType) {
return serverType == ServerType.SERVER ? SERVER_ACTOR_SYSTEM_NAME : WORKER_ACTOR_SYSTEM_NAME;
}
}

View File

@ -0,0 +1,61 @@
package tech.powerjob.remote.akka;
import com.google.common.collect.Maps;
import lombok.Getter;
import lombok.Setter;
import lombok.experimental.Accessors;
import tech.powerjob.common.RemoteConstant;
import java.util.Map;
/**
* 构建 Actor Mapping
*
* @author tjq
* @since 2023/1/7
*/
public class AkkaMappingService {
/**
* Actor's RootPath -> Akka Actor Name
*/
private static final Map<String, ActorConfig> RP_2_ACTOR_CFG = Maps.newHashMap();
static {
addMappingRule(RemoteConstant.S4W_PATH, "server_actor", "w-r-c-d");
addMappingRule(RemoteConstant.S4S_PATH, "friend_actor", "friend-request-actor-dispatcher");
addMappingRule(RemoteConstant.WTT_PATH, "task_tracker", "task-tracker-dispatcher");
addMappingRule(RemoteConstant.WPT_PATH, "processor_tracker", "processor-tracker-dispatcher");
}
private static final String DEFAULT_DISPATCH_NAME = "common-dispatcher";
/**
* 根据 actor 的 rootPath 获取 Akka Actor Name不存在改写则使用当前路径
* @param actorRootPath actorRootPath
* @return actorName
*/
public static ActorConfig parseActorName(String actorRootPath) {
return RP_2_ACTOR_CFG.getOrDefault(actorRootPath,
new ActorConfig()
.setActorName(actorRootPath)
.setDispatcherName(DEFAULT_DISPATCH_NAME)
);
}
@Getter
@Setter
@Accessors(chain = true)
public static class ActorConfig {
private String actorName;
private String dispatcherName;
}
private static void addMappingRule(String newActorPath, String oldActorName, String dispatchName) {
ActorConfig actorConfig = new ActorConfig()
.setActorName(oldActorName)
.setDispatcherName(dispatchName == null ? DEFAULT_DISPATCH_NAME : dispatchName);
RP_2_ACTOR_CFG.put(newActorPath, actorConfig);
}
}

View File

@ -0,0 +1,16 @@
package tech.powerjob.remote.akka;
import tech.powerjob.remote.framework.transporter.Protocol;
/**
* AkkaProtocol
*
* @author tjq
* @since 2022/12/31
*/
public class AkkaProtocol implements Protocol {
@Override
public String name() {
return tech.powerjob.common.enums.Protocol.AKKA.name();
}
}

View File

@ -0,0 +1,70 @@
package tech.powerjob.remote.akka;
import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.japi.pf.ReceiveBuilder;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.actor.HandlerInfo;
import tech.powerjob.remote.framework.base.HandlerLocation;
import tech.powerjob.remote.framework.utils.RemoteUtils;
import java.lang.reflect.Method;
import java.util.Optional;
/**
* 代理用的 actor
*
* @author tjq
* @since 2023/1/6
*/
@Slf4j
public class AkkaProxyActor extends AbstractActor {
private final Receive receive;
private final ActorInfo actorInfo;
public static Props props(ActorInfo actorInfo) {
return Props.create(AkkaProxyActor.class, () -> new AkkaProxyActor(actorInfo));
}
public AkkaProxyActor(ActorInfo actorInfo) {
this.actorInfo = actorInfo;
final ReceiveBuilder receiveBuilder = receiveBuilder();
actorInfo.getHandlerInfos().forEach(handlerInfo -> {
final HandlerLocation location = handlerInfo.getLocation();
final Method handlerMethod = handlerInfo.getMethod();
final Optional<Class<?>> powerSerializeClz = RemoteUtils.findPowerSerialize(handlerMethod.getParameterTypes());
if (!powerSerializeClz.isPresent()) {
throw new PowerJobException("build proxy for handler failed due to handler args is not PowerSerialize: " + location);
}
final Class<?> bindClz = powerSerializeClz.get();
receiveBuilder.match(bindClz, req -> onReceiveProcessorReportTaskStatusReq(req, handlerInfo));
});
this.receive = receiveBuilder.build();
}
@Override
public Receive createReceive() {
return receive;
}
private <T> void onReceiveProcessorReportTaskStatusReq(T req, HandlerInfo handlerInfo) {
try {
final Object ret = handlerInfo.getMethod().invoke(actorInfo.getActor(), req);
if (ret == null) {
return;
}
if (ret instanceof Optional) {
if (!((Optional<?>) ret).isPresent()) {
return;
}
}
getSender().tell(ret, getSelf());
} catch (Exception e) {
log.error("[PowerJob-AKKA] process failed!", e);
}
}
}

View File

@ -0,0 +1,69 @@
package tech.powerjob.remote.akka;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.pattern.Patterns;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.common.RemoteConstant;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.remote.framework.base.HandlerLocation;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.transporter.Protocol;
import tech.powerjob.remote.framework.transporter.Transporter;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
/**
* AkkaTransporter
*
* @author tjq
* @since 2022/12/31
*/
public class AkkaTransporter implements Transporter {
private final ActorSystem actorSystem;
/**
* akka://<actor system>@<hostname>:<port>/<actor path>
*/
private static final String AKKA_NODE_PATH = "akka://%s@%s/user/%s";
public AkkaTransporter(ActorSystem actorSystem) {
this.actorSystem = actorSystem;
}
@Override
public Protocol getProtocol() {
return new AkkaProtocol();
}
@Override
public void tell(URL url, PowerSerializable request) {
ActorSelection actorSelection = fetchActorSelection(url);
actorSelection.tell(request, null);
}
@Override
@SuppressWarnings("unchecked")
public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
ActorSelection actorSelection = fetchActorSelection(url);
return (CompletionStage<T>) Patterns.ask(actorSelection, request, Duration.ofMillis(RemoteConstant.DEFAULT_TIMEOUT_MS));
}
private ActorSelection fetchActorSelection(URL url) {
HandlerLocation location = url.getLocation();
String targetActorSystemName = AkkaConstant.fetchActorSystemName(url.getServerType());
String targetActorName = AkkaMappingService.parseActorName(location.getRootPath()).getActorName();
CommonUtils.requireNonNull(targetActorName, "can't find actor by URL: " + location);
String address = url.getAddress().toFullAddress();
return actorSystem.actorSelection(String.format(AKKA_NODE_PATH, targetActorSystemName, address, targetActorName));
}
}

View File

@ -0,0 +1,25 @@
package tech.powerjob.remote.akka;
import akka.actor.AbstractActor;
import akka.actor.DeadLetter;
import lombok.extern.slf4j.Slf4j;
/**
* TroubleshootingActor
*
* @author tjq
* @since 2022/12/31
*/
@Slf4j
public class AkkaTroubleshootingActor extends AbstractActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(DeadLetter.class, this::onReceiveDeadLetter)
.build();
}
public void onReceiveDeadLetter(DeadLetter dl) {
log.warn("[PowerJob-AKKA] receive DeadLetter: {}", dl);
}
}

View File

@ -0,0 +1,35 @@
package tech.powerjob.remote.akka;
import akka.serialization.JSerializer;
import tech.powerjob.common.serialize.SerializerUtils;
/**
* Using custom serializers for akka-remote
* https://doc.akka.io/docs/akka/current/serialization.html
*
* @author tjq
* @since 2021/3/21
*/
public class PowerAkkaSerializer extends JSerializer {
@Override
public Object fromBinaryJava(byte[] bytes, Class<?> manifest) {
return SerializerUtils.deSerialized(bytes);
}
@Override
public int identifier() {
return 277777;
}
@Override
public byte[] toBinary(Object o) {
return SerializerUtils.serialize(o);
}
@Override
public boolean includeManifest() {
return false;
}
}

View File

@ -0,0 +1,8 @@
/**
* 由于 AKKA 后续转向收费运营模式PowerJob 计划移除 akka 支持,因此不再维护该 module。
* 如果存在任何使用上的问题,请切换到其他通讯协议(建议使用 HTTP
*
* @author PowerJob发言人
* @since 2022/12/31
*/
package tech.powerjob.remote.akka;

View File

@ -0,0 +1,133 @@
akka {
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "WARNING"
actor {
# cluster is better(recommend by official document), but I prefer remote
provider = remote
allow-java-serialization = off
serializers {
power-serializer = "tech.powerjob.remote.akka.PowerAkkaSerializer"
}
serialization-bindings {
"tech.powerjob.common.PowerSerializable" = power-serializer
}
}
remote {
artery {
transport = tcp # See Selecting a transport below
# over write by code
canonical.hostname = "127.0.0.1"
canonical.port = 25520
}
}
# dispatcher
task-tracker-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 4.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 64
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}
processor-tracker-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 64
}
throughput = 10
}
worker-common-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 8
}
throughput = 10
}
##################### server config #####################
# worker-request-core-dispatcher
w-r-c-d {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 4.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 128
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}
friend-request-actor-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 4.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 128
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 5
}
##################### default config #####################
common-dispatcher {
# Dispatcher is the name of the event-based dispatcher
type = Dispatcher
# What kind of ExecutionService to use
executor = "fork-join-executor"
# Configuration for the fork join pool
fork-join-executor {
# Min number of threads to cap factor-based parallelism number to
parallelism-min = 2
# Parallelism (threads) ... ceil(available processors * factor)
parallelism-factor = 4.0
# Max number of threads to cap factor-based parallelism number to
parallelism-max = 64
}
# Throughput defines the maximum number of messages to be
# processed per actor before the thread jumps to the next actor.
# Set to 1 for as fair as possible.
throughput = 10
}
}

View File

@ -0,0 +1,50 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-remote-impl-http</artifactId>
<name>powerjob-remote-impl-http</name>
<version>5.1.2</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<vertx.version>4.3.7</vertx.version>
<powerjob-remote-framework.version>5.1.2</powerjob-remote-framework.version>
</properties>
<dependencies>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-remote-framework</artifactId>
<version>${powerjob-remote-framework.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.vertx/vertx-core -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-web</artifactId>
<version>${vertx.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,17 @@
package tech.powerjob.remote.http;
import tech.powerjob.remote.framework.transporter.Protocol;
/**
* HttpProtocol
*
* @author tjq
* @since 2022/12/31
*/
public class HttpProtocol implements Protocol {
@Override
public String name() {
return tech.powerjob.common.enums.Protocol.HTTP.name();
}
}

View File

@ -0,0 +1,166 @@
package tech.powerjob.remote.http;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationFeature;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpServer;
import io.vertx.core.json.JsonObject;
import io.vertx.ext.web.RequestBody;
import io.vertx.ext.web.Route;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import tech.powerjob.common.exception.PowerJobException;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.actor.HandlerInfo;
import tech.powerjob.remote.framework.actor.ProcessType;
import tech.powerjob.remote.framework.cs.CSInitializer;
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
import tech.powerjob.remote.framework.transporter.Transporter;
import tech.powerjob.remote.framework.utils.RemoteUtils;
import tech.powerjob.remote.http.vertx.VertxInitializer;
import tech.powerjob.remote.http.vertx.VertxTransporter;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* HttpCSInitializer
* 在纠结了1晚上后最终决定选用 vertx 作为 http 底层,而不是直接使用 netty理由如下
* - netty 实现容易,但性能调优方面需要时间成本和实践经验,而 vertx 作为 netty 的"嫡系"框架,对 netty 的封装理论上炉火纯青,性能不成问题
* - vertx 唯一的缺点是其作为相对上层的框架,可能存在较为严重的包冲突问题,尤其是对于那些本身跑在 vertx-framework 上的用户
* - 不过该问题可以通过更换协议解决,预计后续提供一个基于 netty 和自定义协议的实现
*
* 20240316 note注意类名被强依赖后续若有改动需要同步更改
*
* @author tjq
* @since 2022/12/31
*/
@Slf4j
public class HttpVertxCSInitializer implements CSInitializer {
private Vertx vertx;
private HttpServer httpServer;
private HttpClient httpClient;
private CSInitializerConfig config;
@Override
public String type() {
return tech.powerjob.common.enums.Protocol.HTTP.name();
}
@Override
public void init(CSInitializerConfig config) {
this.config = config;
// 【Vertx 版本升级时必须注意】临时解决 vertx 自带的 jackson 序列化无法支持字段升级问题(默认特性居然是不支持增删字段的序列化方式,外国框架也是一坨...
try {
io.vertx.core.json.jackson.DatabindCodec.mapper()
.configure(com.fasterxml.jackson.databind.MapperFeature.PROPAGATE_TRANSIENT_MARKER, true)
.configure(JsonParser.Feature.ALLOW_SINGLE_QUOTES, true)
.configure(JsonParser.Feature.IGNORE_UNDEFINED, true)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.setSerializationInclusion(JsonInclude.Include.NON_NULL);
} catch (Throwable t) {
log.warn("[HttpVertxCSInitializer] hack jackson failed!", t);
}
vertx = VertxInitializer.buildVertx();
httpServer = VertxInitializer.buildHttpServer(vertx);
httpClient = VertxInitializer.buildHttpClient(vertx);
}
@Override
public Transporter buildTransporter() {
return new VertxTransporter(httpClient);
}
@Override
@SneakyThrows
public void bindHandlers(List<ActorInfo> actorInfos) {
Router router = Router.router(vertx);
// 处理请求响应
router.route().handler(BodyHandler.create());
actorInfos.forEach(actorInfo -> {
Optional.ofNullable(actorInfo.getHandlerInfos()).orElse(Collections.emptyList()).forEach(handlerInfo -> {
String handlerHttpPath = handlerInfo.getLocation().toPath();
ProcessType processType = handlerInfo.getAnno().processType();
Handler<RoutingContext> routingContextHandler = buildRequestHandler(actorInfo, handlerInfo);
Route route = router.post(handlerHttpPath);
if (processType == ProcessType.BLOCKING) {
route.blockingHandler(routingContextHandler, false);
} else {
route.handler(routingContextHandler);
}
});
});
// 启动 vertx http server
final int port = config.getBindAddress().getPort();
final String host = config.getBindAddress().getHost();
httpServer.requestHandler(router)
.exceptionHandler(e -> log.error("[PowerJob] unknown exception in Actor communication!", e))
.listen(port, host)
.toCompletionStage()
.toCompletableFuture()
.get(1, TimeUnit.MINUTES);
log.info("[PowerJobRemoteEngine] startup vertx HttpServer successfully!");
}
private Handler<RoutingContext> buildRequestHandler(ActorInfo actorInfo, HandlerInfo handlerInfo) {
Method method = handlerInfo.getMethod();
Optional<Class<?>> powerSerializeClz = RemoteUtils.findPowerSerialize(method.getParameterTypes());
// 内部框架,严格模式,绑定失败直接报错
if (ArrayUtils.isNotEmpty(method.getParameterTypes())) {
if (!powerSerializeClz.isPresent()) {
throw new PowerJobException("can't find any 'PowerSerialize' object in handler args: " + handlerInfo.getLocation());
}
}
return ctx -> {
final RequestBody body = ctx.body();
final Object convertResult = body.asPojo(powerSerializeClz.get());
try {
Object response = method.invoke(actorInfo.getActor(), convertResult);
if (response != null) {
if (response instanceof String) {
ctx.end((String) response);
} else {
ctx.json(JsonObject.mapFrom(response));
}
return;
}
ctx.end();
} catch (Throwable t) {
// 注意这里是框架实际运行时,日志输出用标准 PowerJob 格式
log.error("[PowerJob] invoke Handler[{}] failed!", handlerInfo.getLocation(), t);
ctx.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(), t);
}
};
}
@Override
public void close() throws IOException {
httpClient.close();
httpServer.close();
vertx.close();
}
}

View File

@ -0,0 +1,88 @@
package tech.powerjob.remote.http.vertx;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientOptions;
import io.vertx.core.http.HttpServer;
import io.vertx.core.http.HttpServerOptions;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import tech.powerjob.common.OmsConstant;
import tech.powerjob.common.PowerJobDKey;
import tech.powerjob.common.utils.SysUtils;
/**
* VertxInitializer
* PowerJob 只是将 vertx 作为 toolkit 使用
*
* @author tjq
* @since 2023/1/1
*/
@Slf4j
public class VertxInitializer {
/**
* 默认开启长连接,且 75S 超时
*/
private static final int DEFAULT_KEEP_ALIVE_TIMEOUT = 75;
private static final int CONNECTION_TIMEOUT_MS = 3000;
private static final int SERVER_IDLE_TIMEOUT_S = 300;
public static Vertx buildVertx() {
final int cpuCores = SysUtils.availableProcessors();
VertxOptions options = new VertxOptions()
.setWorkerPoolSize(Math.max(16, 2 * cpuCores))
.setInternalBlockingPoolSize(Math.max(32, 4 * cpuCores));
log.info("[PowerJob-Vertx] use vertx options: {}", options);
return Vertx.vertx(options);
}
public static HttpServer buildHttpServer(Vertx vertx) {
HttpServerOptions httpServerOptions = new HttpServerOptions()
.setIdleTimeout(SERVER_IDLE_TIMEOUT_S);
tryEnableCompression(httpServerOptions);
log.info("[PowerJob-Vertx] use HttpServerOptions: {}", httpServerOptions.toJson());
return vertx.createHttpServer(httpServerOptions);
}
private static void tryEnableCompression(HttpServerOptions httpServerOptions) {
// 非核心组件,不直接依赖类(无 import加载报错可忽略
try {
httpServerOptions
.addCompressor(io.netty.handler.codec.compression.StandardCompressionOptions.gzip())
.setCompressionSupported(true);
log.warn("[PowerJob-Vertx] enable server side compression successfully!");
} catch (Throwable t) {
log.warn("[PowerJob-Vertx] enable server side compression failed. The error is not fatal, but performance may be degraded", t);
}
}
public static HttpClient buildHttpClient(Vertx vertx) {
HttpClientOptions httpClientOptions = new HttpClientOptions()
.setMetricsName(OmsConstant.PACKAGE)
.setConnectTimeout(CONNECTION_TIMEOUT_MS)
.setMaxPoolSize(Math.max(8, SysUtils.availableProcessors()) * 2);
// 长连接
String keepaliveTimeout = System.getProperty(PowerJobDKey.TRANSPORTER_KEEP_ALIVE_TIMEOUT, String.valueOf(DEFAULT_KEEP_ALIVE_TIMEOUT));
int keepaliveTimeoutInt = Integer.parseInt(keepaliveTimeout);
if (keepaliveTimeoutInt > 0) {
httpClientOptions.setKeepAlive(true).setKeepAliveTimeout(keepaliveTimeoutInt);
} else {
httpClientOptions.setKeepAlive(false);
}
// 压缩判定
String enableCompressing = System.getProperty(PowerJobDKey.TRANSPORTER_USE_COMPRESSING);
if (StringUtils.isNotEmpty(enableCompressing)) {
httpClientOptions.setTryUseCompression(StringUtils.equalsIgnoreCase(enableCompressing, Boolean.TRUE.toString()));
}
log.info("[PowerJob-Vertx] use HttpClientOptions: {}", httpClientOptions.toJson());
return vertx.createHttpClient(httpClientOptions);
}
}

View File

@ -0,0 +1,100 @@
package tech.powerjob.remote.http.vertx;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.vertx.core.Future;
import io.vertx.core.http.HttpClient;
import io.vertx.core.http.HttpClientRequest;
import io.vertx.core.http.HttpClientResponse;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.RequestOptions;
import io.vertx.core.json.JsonObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.transporter.Protocol;
import tech.powerjob.remote.framework.transporter.Transporter;
import tech.powerjob.remote.http.HttpProtocol;
import java.util.concurrent.CompletionStage;
/**
* VertxTransporter
*
* @author tjq
* @since 2023/1/1
*/
@Slf4j
public class VertxTransporter implements Transporter {
private final HttpClient httpClient;
private static final Protocol PROTOCOL = new HttpProtocol();
public VertxTransporter(HttpClient httpClient) {
this.httpClient = httpClient;
}
@Override
public Protocol getProtocol() {
return PROTOCOL;
}
@Override
public void tell(URL url, PowerSerializable request) {
post(url, request, null);
}
@Override
public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
return post(url, request, clz);
}
@SuppressWarnings("unchecked")
private <T> CompletionStage<T> post(URL url, PowerSerializable request, Class<T> clz) {
final String host = url.getAddress().getHost();
final int port = url.getAddress().getPort();
final String path = url.getLocation().toPath();
RequestOptions requestOptions = new RequestOptions()
.setMethod(HttpMethod.POST)
.setHost(host)
.setPort(port)
.setURI(path);
// 获取远程服务器的HTTP连接
Future<HttpClientRequest> httpClientRequestFuture = httpClient.request(requestOptions);
// 转换 -> 发送请求获取响应
Future<HttpClientResponse> responseFuture = httpClientRequestFuture.compose(httpClientRequest ->
httpClientRequest
.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON)
.send(JsonObject.mapFrom(request).toBuffer())
);
return responseFuture.compose(httpClientResponse -> {
// throw exception
final int statusCode = httpClientResponse.statusCode();
if (statusCode != HttpResponseStatus.OK.code()) {
// CompletableFuture.get() 时会传递抛出该异常
throw new RemotingException(String.format("request [host:%s,port:%s,url:%s] failed, status: %d, msg: %s",
host, port, path, statusCode, httpClientResponse.statusMessage()
));
}
return httpClientResponse.body().compose(x -> {
if (clz == null) {
return Future.succeededFuture(null);
}
if (clz.equals(String.class)) {
return Future.succeededFuture((T) x.toString());
}
return Future.succeededFuture(x.toJsonObject().mapTo(clz));
});
})
.onFailure(t -> log.warn("[VertxTransporter] post to url[{}] failed,msg: {}", url, ExceptionUtils.getMessage(t)))
.toCompletionStage();
}
}

View File

@ -0,0 +1,77 @@
package tech.powerjob.remote.http;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import tech.powerjob.common.enums.Protocol;
import tech.powerjob.common.utils.CommonUtils;
import tech.powerjob.remote.framework.BenchmarkActor;
import tech.powerjob.remote.framework.base.Address;
import tech.powerjob.remote.framework.base.HandlerLocation;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.engine.EngineConfig;
import tech.powerjob.remote.framework.engine.EngineOutput;
import tech.powerjob.remote.framework.engine.RemoteEngine;
import tech.powerjob.remote.framework.engine.impl.PowerJobRemoteEngine;
import tech.powerjob.remote.framework.transporter.Transporter;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* HttpVertxCSInitializerTest
*
* @author tjq
* @since 2023/1/2
*/
@Slf4j
class HttpVertxCSInitializerTest {
@Test
void testHttpVertxCSInitializerTest() throws Exception {
final Address address = new Address().setPort(7890).setHost("127.0.0.1");
EngineConfig engineConfig = new EngineConfig()
.setType(Protocol.HTTP.name())
.setBindAddress(address)
.setActorList(Lists.newArrayList(new BenchmarkActor()));
RemoteEngine engine = new PowerJobRemoteEngine();
EngineOutput engineOutput = engine.start(engineConfig);
log.info("[HttpVertxCSInitializerTest] engine start up successfully!");
Transporter transporter = engineOutput.getTransporter();
BenchmarkActor.BenchmarkRequest request = new BenchmarkActor.BenchmarkRequest()
.setContent("request from test")
.setBlockingMills(100)
.setResponseSize(10240);
log.info("[HttpVertxCSInitializerTest] test empty request!");
URL emptyURL = new URL()
.setAddress(address)
.setLocation(new HandlerLocation().setMethodPath("emptyReturn").setRootPath("benchmark"));
transporter.tell(emptyURL, request);
log.info("[HttpVertxCSInitializerTest] test string request!");
URL stringURL = new URL()
.setAddress(address)
.setLocation(new HandlerLocation().setMethodPath("stringReturn").setRootPath("benchmark"));
final String strResponse = transporter.ask(stringURL, request, String.class).toCompletableFuture().get();
log.info("[HttpVertxCSInitializerTest] strResponse: {}", strResponse);
log.info("[HttpVertxCSInitializerTest] test normal request!");
URL url = new URL()
.setAddress(address)
.setLocation(new HandlerLocation().setMethodPath("standard").setRootPath("benchmark"));
final CompletionStage<BenchmarkActor.BenchmarkResponse> benchmarkResponseCompletionStage = transporter.ask(url, request, BenchmarkActor.BenchmarkResponse.class);
final BenchmarkActor.BenchmarkResponse response = benchmarkResponseCompletionStage.toCompletableFuture().get(10, TimeUnit.SECONDS);
log.info("[HttpVertxCSInitializerTest] response: {}", response);
CommonUtils.easySleep(10000);
}
}

View File

@ -0,0 +1,563 @@
# PowerJob Mu 协议技术设计方案
## 1. 项目背景
### 1.1 业务场景
PowerJob 是一个分布式任务调度框架,采用 Server-Worker 架构。在实际部署中,经常遇到以下网络环境限制:
- Worker 节点部署在内网环境,只能单向访问公网的 Server
- Server 无法直接访问 Worker 的内网 IP 地址
- 业务需要支持 Server 向 Worker 的主动通讯(任务下发、状态查询等)
### 1.2 现有协议局限性
- **HTTP 协议**:基于请求-响应模式Server 向 Worker 推送需要 Worker 轮询,实时性差
- **AKKA 协议**:功能强大但配置复杂,在 NAT 环境下需要复杂的网络配置
### 1.3 设计目标
1. 支持 Worker 仅可出站访问的网络环境
2. 实现 Server 与 Worker 的双向通讯
3. 支持完整的节点间通讯矩阵Worker↔Server, Worker↔Worker, Server↔Server
4. 保持高性能和低延迟
5. 简化网络配置和部署复杂度
## 2. 总体设计
### 2.1 核心设计原则
#### 连接复用 (Connection Reuse)
- Worker 主动建立到 Server 的长连接
- Server 通过注册机制维护 Worker 连接映射
- Server 复用 Worker 建立的连接进行反向通讯
#### 延迟连接 (Lazy Connection)
- 节点启动时不立即建立所有连接
- 首次通讯时才建立目标连接
- 避免启动时的网络依赖和连接失败
#### 统一架构 (Unified Architecture)
- 所有节点都具备 Server 和 Client 双重能力
- 统一的消息格式和处理流程
- 支持任意节点间的直接通讯
### 2.2 架构图
```
PowerJob Mu Protocol Architecture
┌──────────────────────────────────────────────────────────────────┐
│ Application Layer │
├──────────────────────────────────────────────────────────────────┤
│ Actor System (Handler Registration & Message Routing) │
├──────────────────────────────────────────────────────────────────┤
│ Transport Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │MuTransporter│ │ChannelMgr │ │ConnectionMgr│ │
│ │ │ │ │ │ │ │
│ │- tell() │ │- Worker Reg │ │- Lazy Conn │ │
│ │- ask() │ │- Ask/Resp │ │- Conn Pool │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├──────────────────────────────────────────────────────────────────┤
│ Protocol Layer │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ MuMessage │ │MuMessageCodec│ │Message Types│ │
│ │ │ │ │ │ │ │
│ │- Type │ │- Encode │ │- TELL │ │
│ │- RequestId │ │- Decode │ │- ASK │ │
│ │- Path │ │- Length │ │- RESPONSE │ │
│ │- Payload │ │- JSON │ │- HEARTBEAT │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
├──────────────────────────────────────────────────────────────────┤
│ Network Layer │
│ Netty Framework │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │EventLoopGroup│ │ Bootstrap │ │ChannelPipe │ │
│ │ │ │ │ │ │ │
│ │- Boss │ │- Server │ │- Codec │ │
│ │- Worker │ │- Client │ │- Handler │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└──────────────────────────────────────────────────────────────────┘
```
## 3. 详细设计
### 3.1 通讯流程设计
#### 3.1.1 Worker 注册流程
```mermaid
sequenceDiagram
participant W as Worker
participant S as Server
W->>S: 1. 建立TCP连接
W->>S: 2. 发送HEARTBEAT消息 (包含Worker地址)
S->>S: 3. 注册Worker连接到ChannelManager
S->>S: 4. 建立 workerAddress -> Channel 映射
Note over W,S: 连接建立完成,可以双向通讯
W->>S: 5. 定期发送HEARTBEAT (保活)
S->>W: 6. 可以通过已注册连接发送消息
```
#### 3.1.2 延迟连接流程
```mermaid
sequenceDiagram
participant W1 as Worker1
participant W2 as Worker2
W1->>W1: 1. 需要向Worker2发送消息
W1->>W1: 2. 检查连接池,无现有连接
W1->>W2: 3. 建立新的TCP连接
W1->>W2: 4. 发送消息
W1->>W1: 5. 连接加入连接池,供后续复用
```
### 3.2 核心组件设计
#### 3.2.1 MuCSInitializer
```java
public class MuCSInitializer implements CSInitializer {
// 根据节点类型初始化不同服务
public void init(CSInitializerConfig config) {
if (config.getServerType() == ServerType.SERVER) {
initServer(); // 启动Netty服务端 + 连接管理器
} else {
initWorker(); // 启动Netty服务端 + 连接管理器
}
}
private void initServer() {
// 1. 创建ServerHandler
// 2. 启动Netty服务端监听
// 3. 初始化连接管理器
}
private void initWorker() {
// 1. 创建WorkerHandler
// 2. 启动Netty服务端监听 (支持Worker间通讯)
// 3. 初始化连接管理器
}
}
```
#### 3.2.2 MuTransporter
```java
public class MuTransporter implements Transporter {
public void tell(URL url, PowerSerializable request) {
if (当前节点是Worker) {
// 使用连接管理器建立到目标的连接
connectionManager.getOrCreateConnection(url.getAddress())
.thenAccept(channel -> channel.writeAndFlush(message));
} else {
// Server端需要区分目标类型
if (目标是Worker) {
// 使用已注册的Worker连接
Channel channel = channelManager.getWorkerChannel(url.getAddress());
channel.writeAndFlush(message);
} else {
// 使用连接管理器连接到目标Server
connectionManager.getOrCreateConnection(url.getAddress())
.thenAccept(channel -> channel.writeAndFlush(message));
}
}
}
public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) {
// 1. 生成唯一的requestId
// 2. 注册Future到ChannelManager
// 3. 按照tell的逻辑发送ASK消息
// 4. 返回Future等待响应
}
}
```
#### 3.2.3 ChannelManager
```java
public class ChannelManager {
// Worker地址到连接的映射 (Server端用)
private final ConcurrentMap<String, Channel> workerChannels;
// 请求ID到Future的映射 (Ask模式用)
private final ConcurrentMap<String, CompletableFuture<Object>> pendingRequests;
// 请求ID到响应类型的映射 (类型转换用)
private final ConcurrentMap<String, Class<?>> requestResponseTypes;
public void registerWorkerChannel(Address workerAddress, Channel channel) {
String key = workerAddress.getHost() + ":" + workerAddress.getPort();
workerChannels.put(key, channel);
// 监听连接关闭,自动清理映射
}
public void completePendingRequest(String requestId, Object response) {
CompletableFuture<Object> future = pendingRequests.remove(requestId);
Class<?> responseType = requestResponseTypes.remove(requestId);
// 类型转换解决LinkedHashMap问题
Object convertedResponse = convertResponse(response, responseType);
future.complete(convertedResponse);
}
}
```
#### 3.2.4 MuConnectionManager
```java
public class MuConnectionManager {
// 目标地址到连接的映射
private final ConcurrentMap<String, Channel> connections;
// 正在建立的连接
private final ConcurrentMap<String, CompletableFuture<Channel>> pendingConnections;
public CompletableFuture<Channel> getOrCreateConnection(Address targetAddress) {
String key = targetAddress.getHost() + ":" + targetAddress.getPort();
// 1. 检查现有连接
Channel existingChannel = connections.get(key);
if (existingChannel != null && existingChannel.isActive()) {
return CompletableFuture.completedFuture(existingChannel);
}
// 2. 检查正在建立的连接
CompletableFuture<Channel> pendingConnection = pendingConnections.get(key);
if (pendingConnection != null) {
return pendingConnection;
}
// 3. 建立新连接
return createNewConnection(targetAddress);
}
}
```
### 3.3 消息协议设计
#### 3.3.1 消息格式
```java
public class MuMessage implements PowerSerializable {
private MessageType messageType; // 消息类型
private String requestId; // 请求ID (Ask模式)
private String path; // 处理器路径
private Address senderAddress; // 发送方地址 (注册用)
private Object payload; // 消息载荷
private String errorMessage; // 错误信息
}
public enum MessageType {
TELL, // 单向消息
ASK, // 请求消息
RESPONSE, // 响应消息
ERROR, // 错误响应
HEARTBEAT // 心跳消息
}
```
#### 3.3.2 编解码器
```java
public class MuMessageCodec extends ByteToMessageCodec<MuMessage> {
@Override
protected void encode(ChannelHandlerContext ctx, MuMessage msg, ByteBuf out) {
byte[] data = OBJECT_MAPPER.writeValueAsBytes(msg);
out.writeInt(data.length); // 长度前缀
out.writeBytes(data); // 消息内容
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) return; // 长度不足
int length = in.readInt();
if (in.readableBytes() < length) {
in.resetReaderIndex(); // 重置读取位置
return;
}
byte[] data = new byte[length];
in.readBytes(data);
MuMessage message = OBJECT_MAPPER.readValue(data, MuMessage.class);
out.add(message);
}
}
```
## 4. 关键技术方案
### 4.1 连接复用机制
#### 问题描述
在单向网络环境中Server 无法主动连接到 Worker但需要向 Worker 发送消息。
#### 解决方案
1. **Worker 主动注册**Worker 启动时主动连接到 Server并发送包含自身地址的心跳消息
2. **连接映射维护**Server 维护 Worker 地址到连接通道的映射关系
3. **反向通讯**Server 需要向 Worker 发送消息时,从映射中查找对应的连接通道
```java
// Worker端发送心跳注册
MuMessage heartbeat = new MuMessage(
MessageType.HEARTBEAT, null, null,
workerAddress, // 关键携带Worker地址
null, null
);
channel.writeAndFlush(heartbeat);
// Server端处理心跳并注册
public void handleHeartbeat(ChannelHandlerContext ctx, MuMessage msg) {
if (msg.getSenderAddress() != null) {
channelManager.registerWorkerChannel(msg.getSenderAddress(), ctx.channel());
}
}
```
### 4.2 延迟连接机制
#### 问题描述
节点启动时立即建立所有连接会导致:
- 启动时间长
- 网络故障影响启动
- 不必要的资源占用
#### 解决方案
1. **按需连接**:只在首次需要通讯时才建立连接
2. **连接缓存**:建立的连接保存在连接池中供后续复用
3. **并发控制**:避免同时建立到同一目标的多个连接
```java
public CompletableFuture<Channel> getOrCreateConnection(Address targetAddress) {
// 1. 检查缓存连接
Channel cached = connectionCache.get(addressKey);
if (cached != null && cached.isActive()) {
return CompletableFuture.completedFuture(cached);
}
// 2. 检查正在建立的连接,避免重复建立
CompletableFuture<Channel> pending = pendingConnections.get(addressKey);
if (pending != null) {
return pending;
}
// 3. 建立新连接
return createNewConnection(targetAddress);
}
```
### 4.3 消息路由机制
#### 问题描述
不同的通讯场景需要使用不同的连接方式,需要智能路由。
#### 解决方案
基于调用方类型和目标类型的二维路由表:
```java
public void routeMessage(URL url, MuMessage message) {
if (currentNodeType == ServerType.WORKER) {
// Worker作为发送方统一使用连接管理器
connectionManager.getOrCreateConnection(url.getAddress())
.thenAccept(channel -> channel.writeAndFlush(message));
} else {
// Server作为发送方根据目标类型选择策略
if (url.getServerType() == ServerType.WORKER) {
// 目标是Worker使用已注册的连接
Channel workerChannel = channelManager.getWorkerChannel(url.getAddress());
workerChannel.writeAndFlush(message);
} else {
// 目标是Server使用连接管理器
connectionManager.getOrCreateConnection(url.getAddress())
.thenAccept(channel -> channel.writeAndFlush(message));
}
}
}
```
### 4.4 类型转换机制
#### 问题描述
Jackson 反序列化时可能将对象反序列化为 LinkedHashMap导致类型转换异常。
#### 解决方案
1. **类型映射**Ask 请求时记录期望的响应类型
2. **智能转换**:响应时根据期望类型进行转换
```java
// 发送Ask请求时记录类型
public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) {
String requestId = UUID.randomUUID().toString();
channelManager.registerPendingRequest(requestId, future, clz); // 记录期望类型
// ... 发送消息
}
// 接收响应时转换类型
public void completePendingRequest(String requestId, Object response) {
Class<?> expectedType = requestResponseTypes.remove(requestId);
Object convertedResponse = JsonUtils.toJavaObject(response, expectedType);
future.complete(convertedResponse);
}
```
## 5. 性能优化
### 5.1 连接池管理
- **连接复用**:相同目标的多次请求复用连接
- **连接清理**:自动检测和清理失效连接
- **连接限制**:控制最大连接数,避免资源耗尽
### 5.2 异步处理
- **非阻塞IO**基于Netty的异步IO模型
- **事件驱动**:消息处理采用事件驱动方式
- **线程池优化**合理配置EventLoopGroup大小
### 5.3 内存管理
- **零拷贝**利用Netty的零拷贝特性
- **对象池**复用消息对象减少GC压力
- **缓冲区管理**:合理设置接收和发送缓冲区大小
## 6. 容错设计
### 6.1 连接故障处理
```java
// 连接断开监听
channel.closeFuture().addListener(future -> {
// 清理连接映射
connectionCache.remove(addressKey);
workerChannels.remove(workerKey);
// 失败所有待处理的请求
failPendingRequests(channel);
});
// 自动重连
public CompletableFuture<Channel> reconnect(Address address) {
return CompletableFuture
.runAsync(() -> Thread.sleep(retryInterval))
.thenCompose(v -> createConnection(address));
}
```
### 6.2 消息超时处理
```java
// Ask请求超时 (JDK8兼容方式)
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.schedule(() -> {
if (!future.isDone()) {
channelManager.removePendingRequest(requestId);
future.completeExceptionally(new TimeoutException("Request timeout"));
}
}, timeout, TimeUnit.SECONDS);
```
### 6.3 异常传播
- **网络异常**:连接失败、超时等网络层异常
- **协议异常**:消息格式错误、编解码异常
- **业务异常**:处理器执行异常
## 7. 监控与运维
### 7.1 关键指标
- **连接数统计**:活跃连接数、总连接数
- **消息统计**:发送/接收消息数、消息大小
- **性能指标**:响应时间、吞吐量
- **错误统计**:连接失败、消息失败、超时次数
### 7.2 日志记录
```java
// 连接事件
log.info("[MuConnectionManager] Connected to {}", targetAddress);
log.warn("[MuConnectionManager] Connection failed to {}", targetAddress);
// 消息事件
log.debug("[MuTransporter] Sent {} message to {}", messageType, url);
log.error("[MuHandler] Failed to process message", exception);
```
### 7.3 健康检查
- **连接健康**:定期检查连接状态
- **心跳监控**监控Worker心跳状态
- **性能监控**:监控关键性能指标
## 8. 测试方案
### 8.1 单元测试
- **组件测试**:各核心组件的独立测试
- **协议测试**:消息编解码正确性测试
- **异常测试**:各种异常场景的处理测试
### 8.2 集成测试
- **通讯测试**:各种通讯场景的端到端测试
- **故障测试**:网络故障、节点故障的恢复测试
- **性能测试**:高并发、大数据量的性能测试
### 8.3 场景测试
```java
// Worker到Server通讯测试
@Test
public void testWorkerToServerCommunication() {
// 1. 启动Server
// 2. 启动Worker并连接到Server
// 3. Worker发送消息到Server
// 4. 验证Server收到消息
}
// 网络故障恢复测试
@Test
public void testNetworkFailureRecovery() {
// 1. 建立正常连接
// 2. 模拟网络中断
// 3. 恢复网络连接
// 4. 验证通讯自动恢复
}
```
## 9. 部署指南
### 9.1 网络配置
```yaml
# Server配置
server:
host: 0.0.0.0 # 监听所有网卡
port: 7700 # 监听端口
external_host: 公网IP # 外部访问地址
# Worker配置
worker:
host: 0.0.0.0 # 本地监听地址
port: 27777 # 本地监听端口
server_host: 公网IP # Server地址
server_port: 7700 # Server端口
```
### 9.2 防火墙配置
```bash
# Server端开放监听端口
iptables -A INPUT -p tcp --dport 7700 -j ACCEPT
# Worker端确保可以访问Server端口
# 通常不需要额外配置,确保出站不受限即可
```
### 9.3 高可用部署
- **Server集群**多个Server实例负载均衡
- **Worker多连接**Worker配置多个Server地址
- **故障转移**:自动检测和切换故障节点
## 10. 总结
PowerJob Mu 协议通过创新的连接复用和延迟连接机制,成功解决了单向网络环境下的双向通讯问题。主要技术特点包括:
1. **网络适应性**:专为受限网络环境设计,无需复杂的网络配置
2. **高性能**基于Netty的异步IO支持高并发和低延迟
3. **可靠性**:完善的故障检测和恢复机制
4. **扩展性**:统一的架构设计,支持多种通讯场景
5. **易部署**:简化的配置和部署流程
该协议特别适合云原生、容器化部署以及混合云环境为PowerJob在复杂网络环境下提供了可靠的通讯基础。
---
*PowerJob Mu Protocol Technical Design*
*Version: 1.0*
*Date: January 2025*

View File

@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>powerjob-remote</artifactId>
<groupId>tech.powerjob</groupId>
<version>5.1.2</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>powerjob-remote-impl-mu</artifactId>
<name>powerjob-remote-impl-mu</name>
<version>5.1.2</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<netty.version>4.1.104.Final</netty.version>
<powerjob-remote-framework.version>5.1.2</powerjob-remote-framework.version>
</properties>
<dependencies>
<dependency>
<groupId>tech.powerjob</groupId>
<artifactId>powerjob-remote-framework</artifactId>
<version>${powerjob-remote-framework.version}</version>
</dependency>
<!-- Netty for NIO communication -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
</project>

View File

@ -0,0 +1,122 @@
package tech.powerjob.remote.mu;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.remote.framework.base.Address;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Channel manager for maintaining worker address to channel mapping
* Supports both tell and ask modes for reverse communication
*
* @author claude
* @since 2025/1/1
*/
@Slf4j
public class ChannelManager {
private final ConcurrentMap<String, Channel> workerChannels = new ConcurrentHashMap<>();
private final ConcurrentMap<String, CompletableFuture<Object>> pendingRequests = new ConcurrentHashMap<>();
private final ConcurrentMap<String, Class<?>> requestResponseTypes = new ConcurrentHashMap<>();
/**
* Register a worker channel
* @param workerAddress worker address
* @param channel Netty channel
*/
public void registerWorkerChannel(Address workerAddress, Channel channel) {
String key = workerAddress.getHost() + ":" + workerAddress.getPort();
workerChannels.put(key, channel);
log.info("[ChannelManager] Registered worker channel: {}", key);
// Remove channel when it becomes inactive
channel.closeFuture().addListener(future -> {
workerChannels.remove(key);
log.info("[ChannelManager] Removed inactive worker channel: {}", key);
});
}
/**
* Get channel for worker
* @param workerAddress worker address
* @return Channel or null if not found
*/
public Channel getWorkerChannel(Address workerAddress) {
String key = workerAddress.getHost() + ":" + workerAddress.getPort();
return workerChannels.get(key);
}
/**
* Store pending request for ask mode
* @param requestId request ID
* @param future future to complete when response received
* @param responseType expected response type
*/
public void registerPendingRequest(String requestId, CompletableFuture<Object> future, Class<?> responseType) {
pendingRequests.put(requestId, future);
requestResponseTypes.put(requestId, responseType);
}
/**
* Complete pending request with response
* @param requestId request ID
* @param response response object
*/
public void completePendingRequest(String requestId, Object response) {
CompletableFuture<Object> future = pendingRequests.remove(requestId);
Class<?> responseType = requestResponseTypes.remove(requestId);
if (future != null) {
Object convertedResponse = convertResponse(response, responseType);
future.complete(convertedResponse);
} else {
log.warn("[ChannelManager] No pending request found for ID: {}", requestId);
}
}
/**
* Complete pending request with exception
* @param requestId request ID
* @param exception exception
*/
public void completePendingRequestExceptionally(String requestId, Throwable exception) {
CompletableFuture<Object> future = pendingRequests.remove(requestId);
requestResponseTypes.remove(requestId); // Clean up response type mapping
if (future != null) {
future.completeExceptionally(exception);
} else {
log.warn("[ChannelManager] No pending request found for ID: {}", requestId);
}
}
/**
* Remove pending request (timeout cleanup)
* @param requestId request ID
*/
public void removePendingRequest(String requestId) {
pendingRequests.remove(requestId);
requestResponseTypes.remove(requestId);
}
/**
* Convert response to expected type
* @param response raw response object
* @param responseType expected response type
* @return converted response
*/
private Object convertResponse(Object response, Class<?> responseType) {
if (response == null || responseType == null) {
return response;
}
if (responseType.isInstance(response)) {
return response;
}
return JsonUtils.toJavaObject(response, responseType);
}
}

View File

@ -0,0 +1,179 @@
package tech.powerjob.remote.mu;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.cs.CSInitializer;
import tech.powerjob.remote.framework.cs.CSInitializerConfig;
import tech.powerjob.remote.framework.transporter.Transporter;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* Mu CSInitializer implementation using Netty
* Supports bidirectional communication with worker-only outbound connectivity
*
* @author claude
* @since 2025/1/1
*/
@Slf4j
public class MuCSInitializer implements CSInitializer {
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
private Channel serverChannel;
private CSInitializerConfig config;
private final ChannelManager channelManager = new ChannelManager();
private MuServerHandler serverHandler;
private MuWorkerHandler workerHandler;
private MuConnectionManager connectionManager;
@Override
public String type() {
return "MU";
}
@Override
public void init(CSInitializerConfig config) {
this.config = config;
bossGroup = new NioEventLoopGroup(1);
workerGroup = new NioEventLoopGroup();
if (config.getServerType() == ServerType.SERVER) {
initServer();
} else {
initWorker();
}
}
private void initServer() {
try {
serverHandler = new MuServerHandler(channelManager);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS))
.addLast(new MuMessageCodec())
.addLast(serverHandler);
}
});
ChannelFuture future = bootstrap.bind(
config.getBindAddress().getHost(),
config.getBindAddress().getPort()
).sync();
serverChannel = future.channel();
log.info("[MuCSInitializer] Server started on {}:{}",
config.getBindAddress().getHost(),
config.getBindAddress().getPort());
// 初始化连接管理器用于Server连接到其他Server
connectionManager = new MuConnectionManager(workerGroup, channelManager, serverHandler, config.getBindAddress());
log.info("[MuCSInitializer] Server initialized with client capabilities for server-to-server communication");
} catch (Exception e) {
log.error("[MuCSInitializer] Failed to start server", e);
throw new RuntimeException("Failed to start Mu server", e);
}
}
private void initWorker() {
try {
// Worker需要同时具备服务端和客户端能力
// 服务端接受其他Worker的连接
// 客户端连接到Server或其他Worker
// 初始化handler
workerHandler = new MuWorkerHandler(channelManager);
// 启动服务端接受其他Worker的连接
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS))
.addLast(new MuMessageCodec())
.addLast(workerHandler);
}
});
ChannelFuture serverFuture = serverBootstrap.bind(
config.getBindAddress().getHost(),
config.getBindAddress().getPort()
).sync();
serverChannel = serverFuture.channel();
log.info("[MuCSInitializer] Worker server started on {}:{}",
config.getBindAddress().getHost(),
config.getBindAddress().getPort());
// 初始化连接管理器,用于连接到其他节点
connectionManager = new MuConnectionManager(workerGroup, channelManager, workerHandler, config.getBindAddress());
log.info("[MuCSInitializer] Worker initialized with server and client capabilities");
} catch (Exception e) {
log.error("[MuCSInitializer] Failed to initialize worker", e);
throw new RuntimeException("Failed to initialize Mu worker", e);
}
}
@Override
public Transporter buildTransporter() {
return new MuTransporter(channelManager, config.getServerType(), connectionManager);
}
@Override
public void bindHandlers(List<ActorInfo> actorInfos) {
if (config.getServerType() == ServerType.SERVER && serverHandler != null) {
serverHandler.bindHandlers(actorInfos);
} else if (config.getServerType() == ServerType.WORKER && workerHandler != null) {
workerHandler.bindHandlers(actorInfos);
}
}
@Override
public void close() throws IOException {
try {
if (serverChannel != null) {
serverChannel.close().sync();
}
if (connectionManager != null) {
connectionManager.closeAllConnections();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("[MuCSInitializer] Interrupted while closing channels", e);
} finally {
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
log.info("[MuCSInitializer] Mu CSInitializer closed");
}
}

View File

@ -0,0 +1,136 @@
package tech.powerjob.remote.mu;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.remote.framework.base.Address;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
/**
* Connection manager for worker-side lazy connection to server
*
* @author claude
* @since 2025/1/1
*/
@Slf4j
public class MuConnectionManager {
private final EventLoopGroup workerGroup;
private final ChannelManager channelManager;
private final Object messageHandler; // Can be MuWorkerHandler or MuServerHandler
private final Address localAddress;
private final ConcurrentMap<String, Channel> serverConnections = new ConcurrentHashMap<>();
private final ConcurrentMap<String, CompletableFuture<Channel>> pendingConnections = new ConcurrentHashMap<>();
public MuConnectionManager(EventLoopGroup workerGroup, ChannelManager channelManager,
Object messageHandler, Address localAddress) {
this.workerGroup = workerGroup;
this.channelManager = channelManager;
this.messageHandler = messageHandler;
this.localAddress = localAddress;
}
/**
* Get or create connection to target address
* @param targetAddress target address
* @return CompletableFuture of channel
*/
public CompletableFuture<Channel> getOrCreateConnection(Address targetAddress) {
String key = targetAddress.getHost() + ":" + targetAddress.getPort();
// Check if we already have an active connection
Channel existingChannel = serverConnections.get(key);
if (existingChannel != null && existingChannel.isActive()) {
return CompletableFuture.completedFuture(existingChannel);
}
// Check if there's already a pending connection
CompletableFuture<Channel> pendingConnection = pendingConnections.get(key);
if (pendingConnection != null) {
return pendingConnection;
}
// Create new connection
CompletableFuture<Channel> connectionFuture = new CompletableFuture<>();
pendingConnections.put(key, connectionFuture);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new IdleStateHandler(0, 30, 0, TimeUnit.SECONDS))
.addLast(new MuMessageCodec())
.addLast((io.netty.channel.ChannelHandler) messageHandler);
// Only add heartbeat handler for Worker connections
if (messageHandler instanceof MuWorkerHandler) {
ch.pipeline().addLast(new MuWorkerHeartbeatHandler(localAddress));
}
}
});
ChannelFuture future = bootstrap.connect(targetAddress.getHost(), targetAddress.getPort());
future.addListener(f -> {
pendingConnections.remove(key);
if (f.isSuccess()) {
Channel channel = future.channel();
serverConnections.put(key, channel);
// Remove connection when it becomes inactive
channel.closeFuture().addListener(closeFuture -> {
serverConnections.remove(key);
log.info("[MuConnectionManager] Removed inactive server connection: {}", key);
});
connectionFuture.complete(channel);
log.info("[MuConnectionManager] Connected to server: {}", key);
} else {
connectionFuture.completeExceptionally(f.cause());
log.error("[MuConnectionManager] Failed to connect to server: {}", key, f.cause());
}
});
} catch (Exception e) {
pendingConnections.remove(key);
connectionFuture.completeExceptionally(e);
log.error("[MuConnectionManager] Error creating connection to server: {}", key, e);
}
return connectionFuture;
}
/**
* Close all connections
*/
public void closeAllConnections() {
for (Channel channel : serverConnections.values()) {
if (channel.isActive()) {
channel.close();
}
}
serverConnections.clear();
// Complete all pending connections with exception
for (CompletableFuture<Channel> future : pendingConnections.values()) {
future.completeExceptionally(new RuntimeException("Connection manager is closing"));
}
pendingConnections.clear();
}
}

View File

@ -0,0 +1,37 @@
package tech.powerjob.remote.mu;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.remote.framework.base.Address;
/**
* Mu protocol message format
*
* @author claude
* @since 2025/1/1
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class MuMessage implements PowerSerializable {
/**
* Message types
*/
public enum MessageType {
TELL, // Fire-and-forget
ASK, // Request-response
RESPONSE, // Response to ASK
HEARTBEAT, // Worker heartbeat/registration
ERROR // Error response
}
private MessageType messageType;
private String requestId; // Unique ID for ask/response correlation
private String path; // Handler path
private Address senderAddress; // Sender address for registration
private Object payload; // Actual message payload
private String errorMessage; // Error message for ERROR type
}

View File

@ -0,0 +1,75 @@
package tech.powerjob.remote.mu;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* Mu message codec for encoding/decoding messages over Netty
*
* @author claude
* @since 2025/1/1
*/
@Slf4j
public class MuMessageCodec extends ByteToMessageCodec<MuMessage> {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final int MAX_MESSAGE_SIZE = 64 * 1024 * 1024; // 64MB
@Override
protected void encode(ChannelHandlerContext ctx, MuMessage msg, ByteBuf out) throws Exception {
try {
byte[] data = OBJECT_MAPPER.writeValueAsBytes(msg);
if (data.length > MAX_MESSAGE_SIZE) {
throw new IllegalArgumentException("Message too large: " + data.length + " bytes");
}
// Write message length followed by message data
out.writeInt(data.length);
out.writeBytes(data);
} catch (Exception e) {
log.error("[MuMessageCodec] Failed to encode message", e);
throw e;
}
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
try {
// Need at least 4 bytes for length
if (in.readableBytes() < 4) {
return;
}
// Mark reader index to reset if not enough data
in.markReaderIndex();
// Read message length
int length = in.readInt();
if (length <= 0 || length > MAX_MESSAGE_SIZE) {
throw new IllegalArgumentException("Invalid message length: " + length);
}
// Check if we have enough bytes for the full message
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}
// Read and decode message
byte[] data = new byte[length];
in.readBytes(data);
MuMessage message = OBJECT_MAPPER.readValue(data, MuMessage.class);
out.add(message);
} catch (Exception e) {
log.error("[MuMessageCodec] Failed to decode message", e);
throw e;
}
}
}

View File

@ -0,0 +1,18 @@
package tech.powerjob.remote.mu;
import tech.powerjob.remote.framework.transporter.Protocol;
/**
* Mu Protocol implementation using Netty for bidirectional communication
* with support for worker-only outbound connectivity
*
* @author claude
* @since 2025/1/1
*/
public class MuProtocol implements Protocol {
@Override
public String name() {
return tech.powerjob.common.enums.Protocol.MU.name();
}
}

View File

@ -0,0 +1,185 @@
package tech.powerjob.remote.mu;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.actor.HandlerInfo;
import tech.powerjob.remote.framework.utils.RemoteUtils;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* Server-side message handler for Mu protocol
* Handles incoming messages from workers and manages channel registration
*
* @author claude
* @since 2025/1/1
*/
@Slf4j
@ChannelHandler.Sharable
public class MuServerHandler extends SimpleChannelInboundHandler<MuMessage> {
private final ChannelManager channelManager;
private final Map<String, ActorInfo> handlerMap = new ConcurrentHashMap<>();
public MuServerHandler(ChannelManager channelManager) {
this.channelManager = channelManager;
}
public void bindHandlers(List<ActorInfo> actorInfos) {
for (ActorInfo actorInfo : actorInfos) {
if (actorInfo.getHandlerInfos() != null) {
for (HandlerInfo handlerInfo : actorInfo.getHandlerInfos()) {
String path = handlerInfo.getLocation().toPath();
handlerMap.put(path, actorInfo);
log.info("[MuServerHandler] Bound handler: {}", path);
}
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MuMessage msg) throws Exception {
try {
switch (msg.getMessageType()) {
case HEARTBEAT:
handleHeartbeat(ctx, msg);
break;
case TELL:
handleTell(ctx, msg);
break;
case ASK:
handleAsk(ctx, msg);
break;
case RESPONSE:
handleResponse(ctx, msg);
break;
case ERROR:
handleError(ctx, msg);
break;
default:
log.warn("[MuServerHandler] Unknown message type: {}", msg.getMessageType());
}
} catch (Exception e) {
log.error("[MuServerHandler] Error processing message", e);
if (msg.getMessageType() == MuMessage.MessageType.ASK) {
// Send error response for ASK messages
MuMessage errorResponse = new MuMessage(
MuMessage.MessageType.ERROR,
msg.getRequestId(),
null,
null,
null,
"Internal server error: " + e.getMessage()
);
ctx.writeAndFlush(errorResponse);
}
}
}
private void handleHeartbeat(ChannelHandlerContext ctx, MuMessage msg) {
if (msg.getSenderAddress() != null) {
channelManager.registerWorkerChannel(msg.getSenderAddress(), ctx.channel());
log.debug("[MuServerHandler] Registered worker: {}", msg.getSenderAddress());
}
}
private void handleTell(ChannelHandlerContext ctx, MuMessage msg) {
invokeHandler(msg, false, ctx);
}
private void handleAsk(ChannelHandlerContext ctx, MuMessage msg) {
Object response = invokeHandler(msg, true, ctx);
MuMessage.MessageType responseType = response != null ?
MuMessage.MessageType.RESPONSE : MuMessage.MessageType.ERROR;
String errorMessage = response == null ? "Handler returned null" : null;
MuMessage responseMsg = new MuMessage(
responseType,
msg.getRequestId(),
null,
null,
response,
errorMessage
);
ctx.writeAndFlush(responseMsg);
}
private void handleResponse(ChannelHandlerContext ctx, MuMessage msg) {
channelManager.completePendingRequest(msg.getRequestId(), msg.getPayload());
}
private void handleError(ChannelHandlerContext ctx, MuMessage msg) {
Exception exception = new RuntimeException(msg.getErrorMessage());
channelManager.completePendingRequestExceptionally(msg.getRequestId(), exception);
}
private Object invokeHandler(MuMessage msg, boolean needResponse, ChannelHandlerContext ctx) {
try {
String path = msg.getPath();
ActorInfo actorInfo = handlerMap.get(path);
if (actorInfo == null) {
log.warn("[MuServerHandler] No handler found for path: {}", path);
return null;
}
HandlerInfo handlerInfo = actorInfo.getHandlerInfos().stream()
.filter(h -> h.getLocation().toPath().equals(path))
.findFirst()
.orElse(null);
if (handlerInfo == null) {
log.warn("[MuServerHandler] Handler info not found for path: {}", path);
return null;
}
Method method = handlerInfo.getMethod();
Optional<Class<?>> powerSerializeClz = RemoteUtils.findPowerSerialize(method.getParameterTypes());
if (!powerSerializeClz.isPresent()) {
log.error("[MuServerHandler] No PowerSerializable parameter found for handler: {}", path);
return null;
}
Object convertedPayload = convertPayload(msg.getPayload(), powerSerializeClz.get());
Object response = method.invoke(actorInfo.getActor(), convertedPayload);
if (needResponse) {
return response;
}
return null;
} catch (Exception e) {
log.error("[MuServerHandler] Failed to invoke handler for path: {}", msg.getPath(), e);
return null;
}
}
private Object convertPayload(Object payload, Class<?> targetClass) {
if (payload == null) {
return null;
}
if (targetClass.isInstance(payload)) {
return payload;
}
return JsonUtils.toJavaObject(payload, targetClass);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("[MuServerHandler] Channel exception", cause);
ctx.close();
}
}

View File

@ -0,0 +1,195 @@
package tech.powerjob.remote.mu;
import io.netty.channel.Channel;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.PowerSerializable;
import tech.powerjob.remote.framework.base.RemotingException;
import tech.powerjob.remote.framework.base.ServerType;
import tech.powerjob.remote.framework.base.URL;
import tech.powerjob.remote.framework.transporter.Protocol;
import tech.powerjob.remote.framework.transporter.Transporter;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* Mu protocol transporter implementation
* Handles both client-side (worker) and reverse (server-to-worker) communication
*
* @author claude
* @since 2025/1/1
*/
@Slf4j
public class MuTransporter implements Transporter {
private static final Protocol PROTOCOL = new MuProtocol();
private static final long ASK_TIMEOUT_SECONDS = 30;
private final ChannelManager channelManager;
private final ServerType serverType;
private final MuConnectionManager connectionManager; // For worker-side lazy connection
public MuTransporter(ChannelManager channelManager, ServerType serverType, MuConnectionManager connectionManager) {
this.channelManager = channelManager;
this.serverType = serverType;
this.connectionManager = connectionManager;
}
@Override
public Protocol getProtocol() {
return PROTOCOL;
}
@Override
public void tell(URL url, PowerSerializable request) {
try {
MuMessage message = new MuMessage(
MuMessage.MessageType.TELL,
null,
url.getLocation().toPath(),
null,
request,
null
);
if (serverType == ServerType.WORKER) {
// Worker to server/worker: use connection manager for lazy connection
connectionManager.getOrCreateConnection(url.getAddress())
.thenAccept(channel -> {
if (channel.isActive()) {
channel.writeAndFlush(message);
log.debug("[MuTransporter] Sent TELL message to {}", url);
} else {
log.error("[MuTransporter] Channel is not active for {}", url);
}
})
.exceptionally(throwable -> {
log.error("[MuTransporter] Failed to get connection for TELL to {}", url, throwable);
return null;
});
} else {
// Server side: distinguish between worker and server targets
if (url.getServerType() == ServerType.WORKER) {
// Server to worker: use stored channel from worker registration
Channel channel = channelManager.getWorkerChannel(url.getAddress());
if (channel != null && channel.isActive()) {
channel.writeAndFlush(message);
log.debug("[MuTransporter] Sent TELL message to worker {}", url);
} else {
log.error("[MuTransporter] No active channel available for worker {}", url);
throw new RemotingException("No active channel available for " + url);
}
} else {
// Server to server: use connection manager for direct connection
connectionManager.getOrCreateConnection(url.getAddress())
.thenAccept(channel -> {
if (channel.isActive()) {
channel.writeAndFlush(message);
log.debug("[MuTransporter] Sent TELL message to server {}", url);
} else {
log.error("[MuTransporter] Channel is not active for server {}", url);
}
})
.exceptionally(throwable -> {
log.error("[MuTransporter] Failed to get connection for TELL to server {}", url, throwable);
return null;
});
}
}
} catch (Exception e) {
log.error("[MuTransporter] Failed to send TELL message to {}", url, e);
throw new RemotingException("Failed to send TELL message", e);
}
}
@Override
public <T> CompletionStage<T> ask(URL url, PowerSerializable request, Class<T> clz) throws RemotingException {
try {
String requestId = UUID.randomUUID().toString();
CompletableFuture<T> future = new CompletableFuture<>();
// Register the future for response handling
channelManager.registerPendingRequest(requestId, (CompletableFuture<Object>) future, clz);
// Set timeout for the request (JDK8 compatible)
future.whenComplete((result, throwable) -> {
if (throwable != null) {
channelManager.removePendingRequest(requestId);
}
});
// Schedule timeout manually for JDK8 compatibility
java.util.concurrent.Executors.newSingleThreadScheduledExecutor().schedule(() -> {
if (!future.isDone()) {
channelManager.removePendingRequest(requestId);
future.completeExceptionally(new java.util.concurrent.TimeoutException("Request timeout after " + ASK_TIMEOUT_SECONDS + " seconds"));
}
}, ASK_TIMEOUT_SECONDS, TimeUnit.SECONDS);
MuMessage message = new MuMessage(
MuMessage.MessageType.ASK,
requestId,
url.getLocation().toPath(),
null,
request,
null
);
if (serverType == ServerType.WORKER) {
// Worker to server/worker: use connection manager for lazy connection
connectionManager.getOrCreateConnection(url.getAddress())
.thenAccept(channel -> {
if (channel.isActive()) {
channel.writeAndFlush(message);
log.debug("[MuTransporter] Sent ASK message to {} with requestId {}", url, requestId);
} else {
channelManager.removePendingRequest(requestId);
future.completeExceptionally(new RemotingException("Channel is not active for " + url));
}
})
.exceptionally(throwable -> {
channelManager.removePendingRequest(requestId);
future.completeExceptionally(new RemotingException("Failed to get connection for ASK to " + url, throwable));
return null;
});
} else {
// Server side: distinguish between worker and server targets
if (url.getServerType() == ServerType.WORKER) {
// Server to worker: use stored channel from worker registration
Channel channel = channelManager.getWorkerChannel(url.getAddress());
if (channel != null && channel.isActive()) {
channel.writeAndFlush(message);
log.debug("[MuTransporter] Sent ASK message to worker {} with requestId {}", url, requestId);
} else {
channelManager.removePendingRequest(requestId);
future.completeExceptionally(new RemotingException("No active channel available for " + url));
}
} else {
// Server to server: use connection manager for direct connection
connectionManager.getOrCreateConnection(url.getAddress())
.thenAccept(channel -> {
if (channel.isActive()) {
channel.writeAndFlush(message);
log.debug("[MuTransporter] Sent ASK message to server {} with requestId {}", url, requestId);
} else {
channelManager.removePendingRequest(requestId);
future.completeExceptionally(new RemotingException("Channel is not active for server " + url));
}
})
.exceptionally(throwable -> {
channelManager.removePendingRequest(requestId);
future.completeExceptionally(new RemotingException("Failed to get connection for ASK to server " + url, throwable));
return null;
});
}
}
return future;
} catch (Exception e) {
log.error("[MuTransporter] Failed to send ASK message to {}", url, e);
throw new RemotingException("Failed to send ASK message", e);
}
}
}

View File

@ -0,0 +1,190 @@
package tech.powerjob.remote.mu;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.common.serialize.JsonUtils;
import tech.powerjob.remote.framework.actor.ActorInfo;
import tech.powerjob.remote.framework.actor.HandlerInfo;
import tech.powerjob.remote.framework.utils.RemoteUtils;
import java.lang.reflect.Method;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
/**
* Worker-side message handler for Mu protocol
* Handles incoming messages from server and processes local handlers
*
* @author claude
* @since 2025/1/1
*/
@Slf4j
@ChannelHandler.Sharable
public class MuWorkerHandler extends SimpleChannelInboundHandler<MuMessage> {
private final Map<String, ActorInfo> handlerMap = new ConcurrentHashMap<>();
private final ChannelManager channelManager;
public MuWorkerHandler(ChannelManager channelManager) {
this.channelManager = channelManager;
}
public void bindHandlers(List<ActorInfo> actorInfos) {
for (ActorInfo actorInfo : actorInfos) {
if (actorInfo.getHandlerInfos() != null) {
for (HandlerInfo handlerInfo : actorInfo.getHandlerInfos()) {
String path = handlerInfo.getLocation().toPath();
handlerMap.put(path, actorInfo);
log.info("[MuWorkerHandler] Bound handler: {}", path);
}
}
}
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, MuMessage msg) throws Exception {
try {
switch (msg.getMessageType()) {
case TELL:
handleTell(ctx, msg);
break;
case ASK:
handleAsk(ctx, msg);
break;
case RESPONSE:
handleResponse(ctx, msg);
break;
case ERROR:
handleError(ctx, msg);
break;
case HEARTBEAT:
handleHeartbeat(ctx, msg);
break;
default:
log.warn("[MuWorkerHandler] Unknown message type: {}", msg.getMessageType());
}
} catch (Exception e) {
log.error("[MuWorkerHandler] Error processing message", e);
if (msg.getMessageType() == MuMessage.MessageType.ASK) {
// Send error response for ASK messages
MuMessage errorResponse = new MuMessage(
MuMessage.MessageType.ERROR,
msg.getRequestId(),
null,
null,
null,
"Internal worker error: " + e.getMessage()
);
ctx.writeAndFlush(errorResponse);
}
}
}
private void handleTell(ChannelHandlerContext ctx, MuMessage msg) {
invokeHandler(msg, false, ctx);
}
private void handleAsk(ChannelHandlerContext ctx, MuMessage msg) {
Object response = invokeHandler(msg, true, ctx);
MuMessage.MessageType responseType = response != null ?
MuMessage.MessageType.RESPONSE : MuMessage.MessageType.ERROR;
String errorMessage = response == null ? "Handler returned null" : null;
MuMessage responseMsg = new MuMessage(
responseType,
msg.getRequestId(),
null,
null,
response,
errorMessage
);
ctx.writeAndFlush(responseMsg);
}
private void handleResponse(ChannelHandlerContext ctx, MuMessage msg) {
channelManager.completePendingRequest(msg.getRequestId(), msg.getPayload());
}
private void handleError(ChannelHandlerContext ctx, MuMessage msg) {
Exception exception = new RuntimeException(msg.getErrorMessage());
channelManager.completePendingRequestExceptionally(msg.getRequestId(), exception);
}
private void handleHeartbeat(ChannelHandlerContext ctx, MuMessage msg) {
// Worker接收到心跳消息时通常不需要特殊处理
// 但记录一下调试信息,表明收到了心跳
if (msg.getSenderAddress() != null) {
log.debug("[MuWorkerHandler] Received heartbeat from: {}", msg.getSenderAddress());
} else {
log.debug("[MuWorkerHandler] Received heartbeat");
}
}
private Object invokeHandler(MuMessage msg, boolean needResponse, ChannelHandlerContext ctx) {
try {
String path = msg.getPath();
ActorInfo actorInfo = handlerMap.get(path);
if (actorInfo == null) {
log.warn("[MuWorkerHandler] No handler found for path: {}", path);
return null;
}
HandlerInfo handlerInfo = actorInfo.getHandlerInfos().stream()
.filter(h -> h.getLocation().toPath().equals(path))
.findFirst()
.orElse(null);
if (handlerInfo == null) {
log.warn("[MuWorkerHandler] Handler info not found for path: {}", path);
return null;
}
Method method = handlerInfo.getMethod();
Optional<Class<?>> powerSerializeClz = RemoteUtils.findPowerSerialize(method.getParameterTypes());
if (!powerSerializeClz.isPresent()) {
log.error("[MuWorkerHandler] No PowerSerializable parameter found for handler: {}", path);
return null;
}
Object convertedPayload = convertPayload(msg.getPayload(), powerSerializeClz.get());
Object response = method.invoke(actorInfo.getActor(), convertedPayload);
if (needResponse) {
return response;
}
return null;
} catch (Exception e) {
log.error("[MuWorkerHandler] Failed to invoke handler for path: {}", msg.getPath(), e);
return null;
}
}
@SneakyThrows
private Object convertPayload(Object payload, Class<?> targetClass) {
if (payload == null) {
return null;
}
if (targetClass.isInstance(payload)) {
return payload;
}
return JsonUtils.toJavaObject(payload, targetClass);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("[MuWorkerHandler] Channel exception", cause);
ctx.close();
}
}

View File

@ -0,0 +1,61 @@
package tech.powerjob.remote.mu;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import lombok.extern.slf4j.Slf4j;
import tech.powerjob.remote.framework.base.Address;
/**
* Worker heartbeat handler for maintaining connection and registration
*
* @author claude
* @since 2025/1/1
*/
@Slf4j
public class MuWorkerHeartbeatHandler extends ChannelInboundHandlerAdapter {
private final Address workerAddress;
public MuWorkerHeartbeatHandler(Address workerAddress) {
this.workerAddress = workerAddress;
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
sendHeartbeat(ctx);
log.info("[MuWorkerHeartbeatHandler] Worker connected and sent initial heartbeat");
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
sendHeartbeat(ctx);
log.debug("[MuWorkerHeartbeatHandler] Sent heartbeat");
}
}
super.userEventTriggered(ctx, evt);
}
private void sendHeartbeat(ChannelHandlerContext ctx) {
MuMessage heartbeat = new MuMessage(
MuMessage.MessageType.HEARTBEAT,
null,
null,
workerAddress,
null,
null
);
ctx.writeAndFlush(heartbeat);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.error("[MuWorkerHeartbeatHandler] Exception in heartbeat handler", cause);
ctx.close();
}
}