Sekiro RPC 服务改造

目标需求

1, 日志上传 kafka
2, 超时时间增加到 30s
3, 查看 clientList 的时候增加数量字段
4, response 返回值里面增加 group
5, sekiro 切换 group

超时时间

invoke_timeout 调用时带上该参数即可,否则默认 5 秒超时。

其他有用参数

https://sekiro.virjar.com/sekiro-doc/01_user_manual/6.specialParam.html
**__invoke_trace_id **
可以带上该参数用于追踪更详细的调用日志,出问题时候用于排查问题

**__NOT_COMPRESS_FOR_SEKIRO_SEKIRO **
此处调用不压缩

HTTP 请求类

1
2
/** * http业务请求,channelType为: * {@link ChannelType#INVOKER_HTTP} */ public
class ChannelTypeInvokerHttp

clientList 增加数量

1
2
3
4
5
6
7
8
9
public static
<T>
CommonRes<T>
success(T t) { CommonRes<T>
ret = new CommonRes<>(); ret.status = statusOK; ret.message = null;
ret.data = t; ret.size = ((List) t).size(); return ret; }</T
></T
></T
>

response 返回值增加 group

1
2
3
4
//
http调用通道,对于python等其他异构语言,可以通过标准的http协议调用只狼服务,请注意http需要保持keepAlive,减少tcp连接通道建立开销
INVOKER_HTTP(1) { private void writeSekiroPacket(Channel channel, SekiroPacket
sekiroPacket, InvokeRecord invokeRecord) {
1
2
3
4
5
6
7
8
9
10
/** * 增加group * * @param data * @param clientId * @return */ public static
com.virjar.sekiro.business.api.core.SekiroFastJson.FastJson
quickJsonDecode(byte[] data, String clientId, String groupId) { ByteBuf byteBuf
= Unpooled.wrappedBuffer(data); int status = byteBuf.readInt(); String
errorMessage = readString(byteBuf); String dataJson = readString(byteBuf);
String finalJson = "{\"clientId\":" + JSONObject.toJSONString(clientId) +
",\"message\":" + JSONObject.toJSONString(errorMessage) + ",\"status\":" +
status + ",\"data\":" + dataJson + ",\"group\":" + groupId + "}"; return new
com.virjar.sekiro.business.api.core.SekiroFastJson.FastJson(status,
errorMessage, dataJson, finalJson); }

sekiro 切换 group 黑名单功能

考虑到服务端不好做处理,在不影响服务端的正常情况下,考虑由客户端实现
api:
http://localhost:5620/business-demo/invoke?group=XXX&action=switchGroup&groupName=wait

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Action("switchGroup") public class SwitchGroupHandler implements RequestHandler
{ @RequiresApi(api = Build.VERSION_CODES.O) @Override public void
handleRequest(SekiroRequest sekiroRequest, SekiroResponse sekiroResponse) {
String groupName = sekiroRequest.getString("groupName"); new Thread(new
Runnable() { public void run() { //sleep设置的是时长 try { Thread.sleep(1000); }
catch (InterruptedException e) { e.printStackTrace(); } switchGroup(groupName);
} }).start(); sekiroResponse.success(groupName + "do switch group, " + "wait 1
seconds"); } @RequiresApi(api = Build.VERSION_CODES.O) private void
switchGroup(String groupName) { MainActivity.rpcServer.destroy(0);//注销该链接
MainActivity.rpcServer = new SekiroClient(groupName, Utils.clientId, Utils.host,
Utils.port).setupSekiroRequestInitializer(new SekiroRequestInitializer() {
@Override public void onSekiroRequest(SekiroRequest sekiroRequest,
HandlerRegistry handlerRegistry) { handlerRegistry.registerSekiroHandler(new
ClientTimeHandler()); handlerRegistry.registerSekiroHandler(new TbReqHandler());
handlerRegistry.registerSekiroHandler(new SwitchGroupHandler()); } });
MainActivity.rpcServer.start(); } }

测试 ok~

日志写入 kafka

位置:
image.png

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* 日志发送到kafka
*
* @param invokeRecord
* @param json200
* @param status
*/
protected void write2kafka(InvokeRecord invokeRecord, String json200, String status) {
Producer<String, String> producer = null;
try {
Properties props = new Properties();
props.put("bootstrap.servers", "XXXXXX:XXXX");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("crawler_risk_log", "clientId", invokeRecord.getClientId()));
producer.send(new ProducerRecord<>("crawler_risk_log", "group", invokeRecord.getSekiroGroup()));
producer.send(new ProducerRecord<>("crawler_risk_log", "action", invokeRecord.getAction()));
producer.send(new ProducerRecord<>("crawler_risk_log", "status", status));
producer.send(new ProducerRecord<>("crawler_risk_log", "data", json200));
invokeRecord.getLogger().info("write data to kafka success");
} catch (Exception e) {
invokeRecord.getLogger().error("write data to kafka error");
} finally {
if (producer != null) {
producer.close();
}
}
}

Sekiro RPC 服务改造
http://blog.uzilol.cn/2022/03/14/yuque/Sekiro%20RPC%20%E6%9C%8D%E5%8A%A1%E6%94%B9%E9%80%A0/
作者
ive_e (leoli)
发布于
2022年3月14日
许可协议