目标需求
1, 日志上传 kafka
2, 超时时间增加到 30s
3, 查看 clientList 的时候增加数量字段
4, response 返回值里面增加 group
5, sekiro 切换 group
超时时间
invoke_timeout 调用时带上该参数即可,否则默认 5 秒超时。
其他有用参数
https://sekiro.virjar.com/sekiro-doc/01_user_manual/6.specialParam.html
**__invoke_trace_id **
可以带上该参数用于追踪更详细的调用日志,出问题时候用于排查问题
**__NOT_COMPRESS_FOR_SEKIRO_SEKIRO **
此处调用不压缩
HTTP 请求类
1 2
| /** * http业务请求,channelType为: * {@link ChannelType#INVOKER_HTTP} */ public class ChannelTypeInvokerHttp
|
clientList 增加数量
1 2 3 4 5 6 7 8 9
| public static <T> CommonRes<T> success(T t) { CommonRes<T> ret = new CommonRes<>(); ret.status = statusOK; ret.message = null; ret.data = t; ret.size = ((List) t).size(); return ret; }</T ></T ></T >
|
response 返回值增加 group
1 2 3 4
| // http调用通道,对于python等其他异构语言,可以通过标准的http协议调用只狼服务,请注意http需要保持keepAlive,减少tcp连接通道建立开销 INVOKER_HTTP(1) { private void writeSekiroPacket(Channel channel, SekiroPacket sekiroPacket, InvokeRecord invokeRecord) {
|
1 2 3 4 5 6 7 8 9 10
| /** * 增加group * * @param data * @param clientId * @return */ public static com.virjar.sekiro.business.api.core.SekiroFastJson.FastJson quickJsonDecode(byte[] data, String clientId, String groupId) { ByteBuf byteBuf = Unpooled.wrappedBuffer(data); int status = byteBuf.readInt(); String errorMessage = readString(byteBuf); String dataJson = readString(byteBuf); String finalJson = "{\"clientId\":" + JSONObject.toJSONString(clientId) + ",\"message\":" + JSONObject.toJSONString(errorMessage) + ",\"status\":" + status + ",\"data\":" + dataJson + ",\"group\":" + groupId + "}"; return new com.virjar.sekiro.business.api.core.SekiroFastJson.FastJson(status, errorMessage, dataJson, finalJson); }
|
sekiro 切换 group 黑名单功能
考虑到服务端不好做处理,在不影响服务端的正常情况下,考虑由客户端实现
api:
http://localhost:5620/business-demo/invoke?group=XXX&action=switchGroup&groupName=wait
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @Action("switchGroup") public class SwitchGroupHandler implements RequestHandler { @RequiresApi(api = Build.VERSION_CODES.O) @Override public void handleRequest(SekiroRequest sekiroRequest, SekiroResponse sekiroResponse) { String groupName = sekiroRequest.getString("groupName"); new Thread(new Runnable() { public void run() { //sleep设置的是时长 try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } switchGroup(groupName); } }).start(); sekiroResponse.success(groupName + "do switch group, " + "wait 1 seconds"); } @RequiresApi(api = Build.VERSION_CODES.O) private void switchGroup(String groupName) { MainActivity.rpcServer.destroy(0);//注销该链接 MainActivity.rpcServer = new SekiroClient(groupName, Utils.clientId, Utils.host, Utils.port).setupSekiroRequestInitializer(new SekiroRequestInitializer() { @Override public void onSekiroRequest(SekiroRequest sekiroRequest, HandlerRegistry handlerRegistry) { handlerRegistry.registerSekiroHandler(new ClientTimeHandler()); handlerRegistry.registerSekiroHandler(new TbReqHandler()); handlerRegistry.registerSekiroHandler(new SwitchGroupHandler()); } }); MainActivity.rpcServer.start(); } }
|
测试 ok~
日志写入 kafka
位置:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
protected void write2kafka(InvokeRecord invokeRecord, String json200, String status) { Producer<String, String> producer = null; try { Properties props = new Properties(); props.put("bootstrap.servers", "XXXXXX:XXXX"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<>(props); producer.send(new ProducerRecord<>("crawler_risk_log", "clientId", invokeRecord.getClientId())); producer.send(new ProducerRecord<>("crawler_risk_log", "group", invokeRecord.getSekiroGroup())); producer.send(new ProducerRecord<>("crawler_risk_log", "action", invokeRecord.getAction())); producer.send(new ProducerRecord<>("crawler_risk_log", "status", status)); producer.send(new ProducerRecord<>("crawler_risk_log", "data", json200)); invokeRecord.getLogger().info("write data to kafka success"); } catch (Exception e) { invokeRecord.getLogger().error("write data to kafka error"); } finally { if (producer != null) { producer.close(); } } }
|