序
本文主要研究一下rocketmq的NettyEncoder及NettyDecoder
NettyEncoder
org/apache/rocketmq/remoting/netty/NettyEncoder.java
public class NettyEncoder extends MessageToByteEncoder{ private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); @Override public void encode(ChannelHandlerContext ctx, RemotingCommand remotingCommand, ByteBuf out) throws Exception { try { ByteBuffer header = remotingCommand.encodeHeader(); out.writeBytes(header); byte[] body = remotingCommand.getBody(); if (body != null) { out.writeBytes(body); } } catch (Exception e) { log.error("encode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); if (remotingCommand != null) { log.error(remotingCommand.toString()); } RemotingUtil.closeChannel(ctx.channel()); } }}
- 这里继承MessageToByteEncoder,类型是RemotingCommand,先写入header再写入body
RemotingCommand.encodeHeader
org/apache/rocketmq/remoting/protocol/RemotingCommand.java
public ByteBuffer encodeHeader() { return encodeHeader(this.body != null ? this.body.length : 0); } public ByteBuffer encodeHeader(final int bodyLength) { // 1> header length size int length = 4; // 2> header data length byte[] headerData; headerData = this.headerEncode(); length += headerData.length; // 3> body data length length += bodyLength; ByteBuffer result = ByteBuffer.allocate(4 + length - bodyLength); // length result.putInt(length); // header length result.put(markProtocolType(headerData.length, serializeTypeCurrentRPC)); // header data result.put(headerData); result.flip(); return result; }
- 这里先写数据总长度,再写入header长度,最后写入header的数据,在写入body数据
- 整体结构为:Length | Header length | Header data | Body
NettyDecoder
org/apache/rocketmq/remoting/netty/NettyDecoder.java
public class NettyDecoder extends LengthFieldBasedFrameDecoder { private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); private static final int FRAME_MAX_LENGTH = Integer.parseInt(System.getProperty("com.rocketmq.remoting.frameMaxLength", "16777216")); public NettyDecoder() { super(FRAME_MAX_LENGTH, 0, 4, 0, 4); } @Override public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; try { frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; } ByteBuffer byteBuffer = frame.nioBuffer(); return RemotingCommand.decode(byteBuffer); } catch (Exception e) { log.error("decode exception, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()), e); RemotingUtil.closeChannel(ctx.channel()); } finally { if (null != frame) { frame.release(); } } return null; }}
- 这里继承LengthFieldBasedFrameDecoder,maxFrameLength为FRAME_MAX_LENGTH,lengthFieldOffset为0,lengthFieldLength为4,lengthAdjustment为0,initialBytesToStrip为4
- 获取ByteBuffer之后调用RemotingCommand.decode(byteBuffer)
RemotingCommand.decode
org/apache/rocketmq/remoting/protocol/RemotingCommand.java
public static RemotingCommand decode(final ByteBuffer byteBuffer) { int length = byteBuffer.limit(); int oriHeaderLen = byteBuffer.getInt(); int headerLength = getHeaderLength(oriHeaderLen); byte[] headerData = new byte[headerLength]; byteBuffer.get(headerData); RemotingCommand cmd = headerDecode(headerData, getProtocolType(oriHeaderLen)); int bodyLength = length - 4 - headerLength; byte[] bodyData = null; if (bodyLength > 0) { bodyData = new byte[bodyLength]; byteBuffer.get(bodyData); } cmd.body = bodyData; return cmd; } public static int getHeaderLength(int length) { return length & 0xFFFFFF; } private static RemotingCommand headerDecode(byte[] headerData, SerializeType type) { switch (type) { case JSON: RemotingCommand resultJson = RemotingSerializable.decode(headerData, RemotingCommand.class); resultJson.setSerializeTypeCurrentRPC(type); return resultJson; case ROCKETMQ: RemotingCommand resultRMQ = RocketMQSerializable.rocketMQProtocolDecode(headerData); resultRMQ.setSerializeTypeCurrentRPC(type); return resultRMQ; default: break; } return null; }
- 这里先获取header长度,获取header的数据,decode出来RemotingCommand,之后获取body
小结
rocketmq的netty编码解码,使用的是RemotingCommand对象,编码继承MessageToByteEncoder,解码采用的是LengthFieldBasedFrameDecoder