package org.jetbrains.io.fastCgi; import com.intellij.util.Consumer; import gnu.trove.TIntObjectHashMap; import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; import org.jetbrains.io.Decoder; import static org.jetbrains.io.fastCgi.FastCgiService.LOG; public class FastCgiDecoder extends Decoder { private enum State { HEADER, CONTENT } private State state = State.HEADER; private enum ProtocolStatus { REQUEST_COMPLETE, CANT_MPX_CONN, OVERLOADED, UNKNOWN_ROLE } public static final class RecordType { public static final int END_REQUEST = 3; public static final int STDOUT = 6; public static final int STDERR = 7; } private int type; private int id; private int contentLength; private int paddingLength; private final TIntObjectHashMap dataBuffers = new TIntObjectHashMap(); private final Consumer errorOutputConsumer; public FastCgiDecoder(Consumer errorOutputConsumer) { this.errorOutputConsumer = errorOutputConsumer; } @Override protected void messageReceived(ChannelHandlerContext context, ByteBuf input) throws Exception { while (true) { switch (state) { case HEADER: { if (paddingLength > 0) { if (input.readableBytes() >= paddingLength) { input.skipBytes(paddingLength); paddingLength = 0; } else { paddingLength -= input.readableBytes(); input.skipBytes(input.readableBytes()); input.release(); return; } } ByteBuf buffer = getBufferIfSufficient(input, FastCgiConstants.HEADER_LENGTH, context); if (buffer == null) { input.release(); return; } decodeHeader(buffer); state = State.CONTENT; } case CONTENT: { if (contentLength > 0) { ByteBuf buffer = getBufferIfSufficient(input, contentLength, context); if (buffer == null) { input.release(); return; } FastCgiResponse response = readContent(buffer); if (response != null) { context.fireChannelRead(response); } } state = State.HEADER; } } } } private void decodeHeader(ByteBuf buffer) { buffer.skipBytes(1); type = buffer.readUnsignedByte(); id = buffer.readUnsignedShort(); contentLength = buffer.readUnsignedShort(); paddingLength = buffer.readUnsignedByte(); buffer.skipBytes(1); } private FastCgiResponse readContent(ByteBuf buffer) { switch (type) { case RecordType.END_REQUEST: int appStatus = buffer.readInt(); int protocolStatus = buffer.readUnsignedByte(); buffer.skipBytes(3); if (appStatus != 0 || protocolStatus != ProtocolStatus.REQUEST_COMPLETE.ordinal()) { LOG.warn("Protocol status " + protocolStatus); dataBuffers.remove(id); return new FastCgiResponse(id, null); } else if (protocolStatus == ProtocolStatus.REQUEST_COMPLETE.ordinal()) { return new FastCgiResponse(id, dataBuffers.remove(id)); } break; case RecordType.STDOUT: ByteBuf data = dataBuffers.get(id); ByteBuf sliced = buffer.slice(buffer.readerIndex(), contentLength); if (data == null) { dataBuffers.put(id, sliced); } else if (data instanceof CompositeByteBuf) { ((CompositeByteBuf)data).addComponent(sliced); data.writerIndex(data.writerIndex() + sliced.readableBytes()); } else { dataBuffers.put(id, Unpooled.wrappedBuffer(data, sliced)); } sliced.retain(); buffer.skipBytes(contentLength); break; case RecordType.STDERR: try { errorOutputConsumer.consume(buffer.toString(buffer.readerIndex(), contentLength, CharsetUtil.UTF_8)); } catch (Throwable e) { LOG.error(e); } buffer.skipBytes(contentLength); break; default: LOG.error("Unknown type " + type); break; } return null; } }