diff options
Diffstat (limited to 'xml/impl/src/org/jetbrains/io/fastCgi')
6 files changed, 681 insertions, 0 deletions
diff --git a/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiChannelHandler.java b/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiChannelHandler.java new file mode 100644 index 000000000000..d24078221b30 --- /dev/null +++ b/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiChannelHandler.java @@ -0,0 +1,108 @@ +package org.jetbrains.io.fastCgi; + +import com.intellij.openapi.util.text.StringUtil; +import com.intellij.openapi.util.text.StringUtilRt; +import com.intellij.util.containers.ConcurrentIntObjectMap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.*; +import org.jetbrains.io.Responses; +import org.jetbrains.io.SimpleChannelInboundHandlerAdapter; + +import static org.jetbrains.io.fastCgi.FastCgiService.LOG; + +@ChannelHandler.Sharable +public class FastCgiChannelHandler extends SimpleChannelInboundHandlerAdapter<FastCgiResponse> { + private final ConcurrentIntObjectMap<Channel> requestToChannel; + + public FastCgiChannelHandler(ConcurrentIntObjectMap<Channel> channel) { + requestToChannel = channel; + } + + @Override + protected void messageReceived(ChannelHandlerContext context, FastCgiResponse response) throws Exception { + ByteBuf buffer = response.getData(); + Channel channel = requestToChannel.remove(response.getId()); + if (channel == null || !channel.isActive()) { + if (buffer != null) { + buffer.release(); + } + return; + } + + if (buffer == null) { + Responses.sendStatus(HttpResponseStatus.BAD_GATEWAY, channel); + return; + } + + HttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, buffer); + try { + parseHeaders(httpResponse, buffer); + Responses.addServer(httpResponse); + if (!HttpHeaders.isContentLengthSet(httpResponse)) { + HttpHeaders.setContentLength(httpResponse, buffer.readableBytes()); + } + } + catch (Throwable e) { + buffer.release(); + Responses.sendStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR, channel); + LOG.error(e); + } + channel.writeAndFlush(httpResponse); + } + + private static void parseHeaders(HttpResponse response, ByteBuf buffer) { + StringBuilder builder = new StringBuilder(); + while (buffer.isReadable()) { + builder.setLength(0); + + String key = null; + boolean valueExpected = true; + while (true) { + int b = buffer.readByte(); + if (b < 0 || b == '\n') { + break; + } + + if (b != '\r') { + if (valueExpected && b == ':') { + valueExpected = false; + + key = builder.toString(); + builder.setLength(0); + skipWhitespace(buffer); + } + else { + builder.append((char)b); + } + } + } + + if (builder.length() == 0) { + // end of headers + return; + } + + // skip standard headers + if (StringUtil.isEmpty(key) || StringUtilRt.startsWithIgnoreCase(key, "http") || StringUtilRt.startsWithIgnoreCase(key, "X-Accel-")) { + continue; + } + + String value = builder.toString(); + if (key.equalsIgnoreCase("status")) { + response.setStatus(HttpResponseStatus.valueOf(Integer.parseInt(value.substring(0, value.indexOf(' '))))); + } + else if (!(key.startsWith("http") || key.startsWith("HTTP"))) { + response.headers().add(key, value); + } + } + } + + private static void skipWhitespace(ByteBuf buffer) { + while (buffer.isReadable() && buffer.getByte(buffer.readerIndex()) == ' ') { + buffer.skipBytes(1); + } + } +}
\ No newline at end of file diff --git a/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiConstants.java b/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiConstants.java new file mode 100644 index 000000000000..23a85dead1f9 --- /dev/null +++ b/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiConstants.java @@ -0,0 +1,5 @@ +package org.jetbrains.io.fastCgi; + +public final class FastCgiConstants { + public static final int HEADER_LENGTH = 8; +}
\ No newline at end of file diff --git a/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiDecoder.java b/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiDecoder.java new file mode 100644 index 000000000000..1cd7adbef5d4 --- /dev/null +++ b/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiDecoder.java @@ -0,0 +1,149 @@ +package org.jetbrains.io.fastCgi; + +import com.intellij.util.Consumer; +import gnu.trove.TIntObjectHashMap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.CharsetUtil; +import org.jetbrains.io.Decoder; + +import static org.jetbrains.io.fastCgi.FastCgiService.LOG; + +public class FastCgiDecoder extends Decoder { + private enum State { + HEADER, CONTENT + } + + private State state = State.HEADER; + + private enum ProtocolStatus { + REQUEST_COMPLETE, CANT_MPX_CONN, OVERLOADED, UNKNOWN_ROLE + } + + public static final class RecordType { + public static final int END_REQUEST = 3; + public static final int STDOUT = 6; + public static final int STDERR = 7; + } + + private int type; + private int id; + private int contentLength; + private int paddingLength; + + private final TIntObjectHashMap<ByteBuf> dataBuffers = new TIntObjectHashMap<ByteBuf>(); + + private final Consumer<String> errorOutputConsumer; + + public FastCgiDecoder(Consumer<String> errorOutputConsumer) { + this.errorOutputConsumer = errorOutputConsumer; + } + + @Override + protected void messageReceived(ChannelHandlerContext context, ByteBuf input) throws Exception { + while (true) { + switch (state) { + case HEADER: { + if (paddingLength > 0) { + if (input.readableBytes() >= paddingLength) { + input.skipBytes(paddingLength); + paddingLength = 0; + } + else { + paddingLength -= input.readableBytes(); + input.skipBytes(input.readableBytes()); + input.release(); + return; + } + } + + ByteBuf buffer = getBufferIfSufficient(input, FastCgiConstants.HEADER_LENGTH, context); + if (buffer == null) { + input.release(); + return; + } + + decodeHeader(buffer); + state = State.CONTENT; + } + + case CONTENT: { + if (contentLength > 0) { + ByteBuf buffer = getBufferIfSufficient(input, contentLength, context); + if (buffer == null) { + input.release(); + return; + } + + FastCgiResponse response = readContent(buffer); + if (response != null) { + context.fireChannelRead(response); + } + } + state = State.HEADER; + } + } + } + } + + private void decodeHeader(ByteBuf buffer) { + buffer.skipBytes(1); + type = buffer.readUnsignedByte(); + id = buffer.readUnsignedShort(); + contentLength = buffer.readUnsignedShort(); + paddingLength = buffer.readUnsignedByte(); + buffer.skipBytes(1); + } + + private FastCgiResponse readContent(ByteBuf buffer) { + switch (type) { + case RecordType.END_REQUEST: + int appStatus = buffer.readInt(); + int protocolStatus = buffer.readUnsignedByte(); + buffer.skipBytes(3); + if (appStatus != 0 || protocolStatus != ProtocolStatus.REQUEST_COMPLETE.ordinal()) { + LOG.warn("Protocol status " + protocolStatus); + dataBuffers.remove(id); + return new FastCgiResponse(id, null); + } + else if (protocolStatus == ProtocolStatus.REQUEST_COMPLETE.ordinal()) { + return new FastCgiResponse(id, dataBuffers.remove(id)); + } + break; + + case RecordType.STDOUT: + ByteBuf data = dataBuffers.get(id); + ByteBuf sliced = buffer.slice(buffer.readerIndex(), contentLength); + if (data == null) { + dataBuffers.put(id, sliced); + } + else if (data instanceof CompositeByteBuf) { + ((CompositeByteBuf)data).addComponent(sliced); + data.writerIndex(data.writerIndex() + sliced.readableBytes()); + } + else { + dataBuffers.put(id, Unpooled.wrappedBuffer(data, sliced)); + } + sliced.retain(); + buffer.skipBytes(contentLength); + break; + + case RecordType.STDERR: + try { + errorOutputConsumer.consume(buffer.toString(buffer.readerIndex(), contentLength, CharsetUtil.UTF_8)); + } + catch (Throwable e) { + LOG.error(e); + } + buffer.skipBytes(contentLength); + break; + + default: + LOG.error("Unknown type " + type); + break; + } + return null; + } +}
\ No newline at end of file diff --git a/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiRequest.java b/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiRequest.java new file mode 100644 index 000000000000..e92d20eebffc --- /dev/null +++ b/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiRequest.java @@ -0,0 +1,149 @@ +package org.jetbrains.io.fastCgi; + +import com.intellij.openapi.project.Project; +import com.intellij.openapi.vfs.VirtualFile; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.handler.codec.http.FullHttpRequest; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.util.CharsetUtil; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.builtInWebServer.PathInfo; +import org.jetbrains.builtInWebServer.WebServerPathToFileManager; +import org.jetbrains.io.Responses; + +import java.net.InetSocketAddress; +import java.util.Locale; +import java.util.Map; + +public class FastCgiRequest { + private static final int PARAMS = 4; + private static final int BEGIN_REQUEST = 1; + private static final int RESPONDER = 1; + private static final int FCGI_KEEP_CONNECTION = 1; + private static final int STDIN = 5; + private static final int VERSION = 1; + + private final ByteBuf buffer; + final int requestId; + + public FastCgiRequest(int requestId, @NotNull ByteBufAllocator allocator) { + this.requestId = requestId; + + buffer = allocator.buffer(); + writeHeader(buffer, BEGIN_REQUEST, FastCgiConstants.HEADER_LENGTH); + buffer.writeShort(RESPONDER); + buffer.writeByte(FCGI_KEEP_CONNECTION); + buffer.writeZero(5); + } + + public void writeFileHeaders(@NotNull VirtualFile file, @NotNull Project project, @NotNull CharSequence canonicalRequestPath) { + PathInfo root = WebServerPathToFileManager.getInstance(project).getRoot(file); + FastCgiService.LOG.assertTrue(root != null); + addHeader("DOCUMENT_ROOT", root.getRoot().getPath()); + addHeader("SCRIPT_FILENAME", file.getPath()); + addHeader("SCRIPT_NAME", canonicalRequestPath); + } + + public final void addHeader(@NotNull String key, @Nullable CharSequence value) { + if (value == null) { + return; + } + + int keyLength = key.length(); + int valLength = value.length(); + writeHeader(buffer, PARAMS, keyLength + valLength + (keyLength < 0x80 ? 1 : 4) + (valLength < 0x80 ? 1 : 4)); + + if (keyLength < 0x80) { + buffer.writeByte(keyLength); + } + else { + buffer.writeByte(0x80 | (keyLength >> 24)); + buffer.writeByte(keyLength >> 16); + buffer.writeByte(keyLength >> 8); + buffer.writeByte(keyLength); + } + + if (valLength < 0x80) { + buffer.writeByte(valLength); + } + else { + buffer.writeByte(0x80 | (valLength >> 24)); + buffer.writeByte(valLength >> 16); + buffer.writeByte(valLength >> 8); + buffer.writeByte(valLength); + } + + buffer.writeBytes(key.getBytes(CharsetUtil.US_ASCII)); + buffer.writeBytes(Unpooled.copiedBuffer(value, CharsetUtil.UTF_8)); + } + + public void writeHeaders(FullHttpRequest request, Channel clientChannel) { + addHeader("REQUEST_URI", request.uri()); + addHeader("REQUEST_METHOD", request.method().name()); + + InetSocketAddress remote = (InetSocketAddress)clientChannel.remoteAddress(); + addHeader("REMOTE_ADDR", remote.getAddress().getHostAddress()); + addHeader("REMOTE_PORT", Integer.toString(remote.getPort())); + + InetSocketAddress local = (InetSocketAddress)clientChannel.localAddress(); + addHeader("SERVER_SOFTWARE", Responses.getServerHeaderValue()); + addHeader("SERVER_NAME", Responses.getServerHeaderValue()); + + addHeader("SERVER_ADDR", local.getAddress().getHostAddress()); + addHeader("SERVER_PORT", Integer.toString(local.getPort())); + + addHeader("GATEWAY_INTERFACE", "CGI/1.1"); + addHeader("SERVER_PROTOCOL", request.protocolVersion().text()); + addHeader("CONTENT_TYPE", request.headers().get(HttpHeaders.Names.CONTENT_TYPE)); + + // PHP only, required if PHP was built with --enable-force-cgi-redirect + addHeader("REDIRECT_STATUS", "200"); + + String queryString = ""; + int queryIndex = request.uri().indexOf('?'); + if (queryIndex != -1) { + queryString = request.uri().substring(queryIndex + 1); + } + addHeader("QUERY_STRING", queryString); + + addHeader("CONTENT_LENGTH", String.valueOf(request.content().readableBytes())); + + for (Map.Entry<String, String> entry : request.headers()) { + addHeader("HTTP_" + entry.getKey().replace('-', '_').toUpperCase(Locale.ENGLISH), entry.getValue()); + } + } + + final void writeToServerChannel(ByteBuf content, Channel fastCgiChannel) { + writeHeader(buffer, PARAMS, 0); + fastCgiChannel.write(buffer); + + if (content.isReadable()) { + ByteBuf headerBuffer = fastCgiChannel.alloc().buffer(FastCgiConstants.HEADER_LENGTH, FastCgiConstants.HEADER_LENGTH); + writeHeader(headerBuffer, STDIN, content.readableBytes()); + fastCgiChannel.write(headerBuffer); + + fastCgiChannel.write(content); + + headerBuffer = fastCgiChannel.alloc().buffer(FastCgiConstants.HEADER_LENGTH, FastCgiConstants.HEADER_LENGTH); + writeHeader(headerBuffer, STDIN, 0); + fastCgiChannel.write(headerBuffer); + } + else { + content.release(); + } + + fastCgiChannel.flush(); + } + + private void writeHeader(ByteBuf buffer, int type, int length) { + buffer.writeByte(VERSION); + buffer.writeByte(type); + buffer.writeShort(requestId); + buffer.writeShort(length); + buffer.writeZero(2); + } +}
\ No newline at end of file diff --git a/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiResponse.java b/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiResponse.java new file mode 100644 index 000000000000..e249f7152c7c --- /dev/null +++ b/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiResponse.java @@ -0,0 +1,21 @@ +package org.jetbrains.io.fastCgi; + +import io.netty.buffer.ByteBuf; + +public class FastCgiResponse { + private final int id; + private final ByteBuf data; + + public FastCgiResponse(int id, ByteBuf data) { + this.id = id; + this.data = data; + } + + public ByteBuf getData() { + return data; + } + + public int getId() { + return id; + } +}
\ No newline at end of file diff --git a/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiService.java b/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiService.java new file mode 100644 index 000000000000..54f13c6c8ea1 --- /dev/null +++ b/xml/impl/src/org/jetbrains/io/fastCgi/FastCgiService.java @@ -0,0 +1,249 @@ +package org.jetbrains.io.fastCgi; + +import com.intellij.concurrency.JobScheduler; +import com.intellij.execution.filters.TextConsoleBuilder; +import com.intellij.execution.filters.TextConsoleBuilderFactory; +import com.intellij.execution.process.OSProcessHandler; +import com.intellij.execution.process.ProcessAdapter; +import com.intellij.execution.process.ProcessEvent; +import com.intellij.execution.ui.ConsoleView; +import com.intellij.execution.ui.ConsoleViewContentType; +import com.intellij.openapi.Disposable; +import com.intellij.openapi.application.ApplicationManager; +import com.intellij.openapi.diagnostic.Logger; +import com.intellij.openapi.project.Project; +import com.intellij.openapi.util.AsyncResult; +import com.intellij.openapi.util.AsyncValueLoader; +import com.intellij.openapi.util.Key; +import com.intellij.openapi.wm.ToolWindow; +import com.intellij.openapi.wm.ToolWindowAnchor; +import com.intellij.openapi.wm.ToolWindowManager; +import com.intellij.ui.content.ContentFactory; +import com.intellij.util.Consumer; +import com.intellij.util.containers.ContainerUtil; +import com.intellij.util.containers.StripedLockIntObjectConcurrentHashMap; +import com.intellij.util.net.NetUtils; +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; +import io.netty.handler.codec.http.HttpResponseStatus; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.io.ChannelExceptionHandler; +import org.jetbrains.io.NettyUtil; +import org.jetbrains.io.Responses; + +import javax.swing.*; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +// todo send FCGI_ABORT_REQUEST if client channel disconnected +public abstract class FastCgiService implements Disposable { + static final Logger LOG = Logger.getInstance(FastCgiService.class); + + protected final Project project; + + private final AtomicInteger requestIdCounter = new AtomicInteger(); + private final StripedLockIntObjectConcurrentHashMap<Channel> requests = new StripedLockIntObjectConcurrentHashMap<Channel>(); + + private volatile Channel fastCgiChannel; + + protected final AsyncValueLoader<OSProcessHandler> processHandler = new AsyncValueLoader<OSProcessHandler>() { + @Override + protected boolean isCancelOnReject() { + return true; + } + + @Override + protected void load(@NotNull final AsyncResult<OSProcessHandler> result) throws IOException { + final int port = NetUtils.findAvailableSocketPort(); + final OSProcessHandler processHandler = createProcessHandler(project, port); + if (processHandler == null) { + result.setRejected(); + return; + } + + result.doWhenRejected(new Runnable() { + @Override + public void run() { + processHandler.destroyProcess(); + } + }); + + final MyProcessAdapter processListener = new MyProcessAdapter(); + processHandler.addProcessListener(processListener); + processHandler.startNotify(); + + if (result.isRejected()) { + return; + } + + JobScheduler.getScheduler().schedule(new Runnable() { + @Override + public void run() { + if (result.isRejected()) { + return; + } + + ApplicationManager.getApplication().executeOnPooledThread(new Runnable() { + @Override + public void run() { + if (!result.isRejected()) { + try { + connectToProcess(result, port, processHandler, processListener); + } + catch (Throwable e) { + result.setRejected(); + LOG.error(e); + } + } + } + }); + } + }, NettyUtil.MIN_START_TIME, TimeUnit.MILLISECONDS); + } + + @Override + protected void disposeResult(@NotNull OSProcessHandler processHandler) { + try { + Channel currentFastCgiChannel = fastCgiChannel; + if (currentFastCgiChannel != null) { + fastCgiChannel = null; + NettyUtil.closeAndReleaseFactory(currentFastCgiChannel); + } + processHandler.destroyProcess(); + } + finally { + requestIdCounter.set(0); + if (!requests.isEmpty()) { + List<Channel> waitingClients = ContainerUtil.toList(requests.elements()); + requests.clear(); + for (Channel channel : waitingClients) { + try { + if (channel.isActive()) { + Responses.sendStatus(HttpResponseStatus.BAD_GATEWAY, channel); + } + } + catch (Throwable e) { + NettyUtil.log(e, LOG); + } + } + } + } + } + }; + + private ConsoleView console; + + protected FastCgiService(Project project) { + this.project = project; + } + + protected abstract OSProcessHandler createProcessHandler(Project project, int port); + + private void connectToProcess(final AsyncResult<OSProcessHandler> asyncResult, final int port, final OSProcessHandler processHandler, final Consumer<String> errorOutputConsumer) { + Bootstrap bootstrap = NettyUtil.oioClientBootstrap(); + final FastCgiChannelHandler fastCgiChannelHandler = new FastCgiChannelHandler(requests); + bootstrap.handler(new ChannelInitializer() { + @Override + protected void initChannel(Channel channel) throws Exception { + channel.pipeline().addLast(new FastCgiDecoder(errorOutputConsumer), fastCgiChannelHandler, ChannelExceptionHandler.getInstance()); + } + }); + fastCgiChannel = NettyUtil.connectClient(bootstrap, new InetSocketAddress(NetUtils.getLoopbackAddress(), port), asyncResult); + if (fastCgiChannel != null) { + asyncResult.setDone(processHandler); + } + } + + public void send(final FastCgiRequest fastCgiRequest, final ByteBuf content) { + content.retain(); + + if (processHandler.has()) { + fastCgiRequest.writeToServerChannel(content, fastCgiChannel); + } + else { + processHandler.get().doWhenDone(new Runnable() { + @Override + public void run() { + fastCgiRequest.writeToServerChannel(content, fastCgiChannel); + } + }).doWhenRejected(new Runnable() { + @Override + public void run() { + content.release(); + Channel channel = requests.get(fastCgiRequest.requestId); + if (channel != null && channel.isActive()) { + Responses.sendStatus(HttpResponseStatus.BAD_GATEWAY, channel); + } + } + }); + } + } + + public int allocateRequestId(Channel channel) { + int requestId = requestIdCounter.getAndIncrement(); + if (requestId >= Short.MAX_VALUE) { + requestIdCounter.set(0); + requestId = requestIdCounter.getAndDecrement(); + } + requests.put(requestId, channel); + return requestId; + } + + @Override + public void dispose() { + processHandler.reset(); + } + + protected abstract void buildConsole(@NotNull TextConsoleBuilder consoleBuilder); + + @NotNull + protected abstract String getConsoleToolWindowId(); + + @NotNull + protected abstract Icon getConsoleToolWindowIcon(); + + private final class MyProcessAdapter extends ProcessAdapter implements Consumer<String> { + private void createConsole() { + TextConsoleBuilder consoleBuilder = TextConsoleBuilderFactory.getInstance().createBuilder(project); + buildConsole(consoleBuilder); + console = consoleBuilder.getConsole(); + + ApplicationManager.getApplication().invokeLater(new Runnable() { + @Override + public void run() { + ToolWindow toolWindow = ToolWindowManager.getInstance(project).registerToolWindow(getConsoleToolWindowId(), false, ToolWindowAnchor.BOTTOM, project, true); + toolWindow.setIcon(getConsoleToolWindowIcon()); + toolWindow.getContentManager().addContent(ContentFactory.SERVICE.getInstance().createContent(console.getComponent(), "", false)); + } + }, project.getDisposed()); + } + + @Override + public void onTextAvailable(ProcessEvent event, Key outputType) { + print(event.getText(), ConsoleViewContentType.getConsoleViewType(outputType)); + } + + private void print(String text, ConsoleViewContentType contentType) { + if (console == null) { + createConsole(); + } + console.print(text, contentType); + } + + @Override + public void processTerminated(ProcessEvent event) { + processHandler.reset(); + print(getConsoleToolWindowId() + " terminated\n", ConsoleViewContentType.SYSTEM_OUTPUT); + } + + @Override + public void consume(String message) { + print(message, ConsoleViewContentType.ERROR_OUTPUT); + } + } +}
\ No newline at end of file |