diff options
Diffstat (limited to 'engine/src/networking/com/jme3/network/kernel')
16 files changed, 2161 insertions, 0 deletions
diff --git a/engine/src/networking/com/jme3/network/kernel/AbstractKernel.java b/engine/src/networking/com/jme3/network/kernel/AbstractKernel.java new file mode 100644 index 0000000..9881bfa --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/AbstractKernel.java @@ -0,0 +1,122 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicLong; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Base implementation of the Kernel interface providing several + * useful default implementations of some methods. This implementation + * assumes that the kernel will be managing its own internal threads + * and queuing any results for the caller to retrieve on their own + * thread. + * + * @version $Revision: 8843 $ + * @author Paul Speed + */ +public abstract class AbstractKernel implements Kernel +{ + static Logger log = Logger.getLogger(AbstractKernel.class.getName()); + + private AtomicLong nextId = new AtomicLong(1); + + /** + * Contains the pending endpoint events waiting for the caller + * to retrieve them. + */ + private ConcurrentLinkedQueue<EndpointEvent> endpointEvents = new ConcurrentLinkedQueue<EndpointEvent>(); + + /** + * Contains the pending envelopes waiting for the caller to + * retrieve them. + */ + private LinkedBlockingQueue<Envelope> envelopes = new LinkedBlockingQueue<Envelope>(); + + protected AbstractKernel() + { + } + + protected void reportError( Exception e ) + { + // Should really be queued up so the outer thread can + // retrieve them. For now we'll just log it. FIXME + log.log( Level.SEVERE, "Unhanddled kernel error", e ); + } + + protected long nextEndpointId() + { + return nextId.getAndIncrement(); + } + + /** + * Returns true if there are waiting envelopes. + */ + public boolean hasEnvelopes() + { + return !envelopes.isEmpty(); + } + + /** + * Removes one envelope from the received messages queue or + * blocks until one is available. + */ + public Envelope read() throws InterruptedException + { + return envelopes.take(); + } + + /** + * Removes and returnsn one endpoint event from the event queue or + * null if there are no endpoint events. + */ + public EndpointEvent nextEvent() + { + return endpointEvents.poll(); + } + + protected void addEvent( EndpointEvent e ) + { + endpointEvents.add( e ); + } + + protected void addEnvelope( Envelope env ) + { + if( !envelopes.offer( env ) ) { + throw new KernelException( "Critical error, could not enqueue envelope." ); + } + } +} diff --git a/engine/src/networking/com/jme3/network/kernel/Connector.java b/engine/src/networking/com/jme3/network/kernel/Connector.java new file mode 100644 index 0000000..f86c17b --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/Connector.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel; + +import java.nio.ByteBuffer; + +/** + * A single channel remote connection allowing the sending + * and receiving of data. As opposed to the Kernel, this will + * only ever receive data from one Endpoint and so bypasses + * the envelope wrapping. + * + * @version $Revision: 7012 $ + * @author Paul Speed + */ +public interface Connector +{ + /** + * Returns true if this connector is currently connected. + */ + public boolean isConnected(); + + /** + * Closes the connection. Any subsequent attempts to read + * or write will fail with an exception. + */ + public void close(); + + /** + * Returns true if there is currently data available for + * reading. Some connector implementations may not be able + * to answer this question accurately and will always return + * false. + */ + public boolean available(); + + /** + * Reads a chunk of data from the connection, blocking if + * there is no data available. The buffer may only be valid + * until the next read() call is made. Callers should copy + * the data if they need it for longer than that. + * + * @return The data read or null if there is no more data + * because the connection is closed. + */ + public ByteBuffer read(); + + /** + * Writes a chunk of data to the connection from data.position() + * to data.limit(). + */ + public void write( ByteBuffer data ); +} diff --git a/engine/src/networking/com/jme3/network/kernel/ConnectorException.java b/engine/src/networking/com/jme3/network/kernel/ConnectorException.java new file mode 100644 index 0000000..31d02b0 --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/ConnectorException.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel; + + +/** + * Represents a client-side connection error, usually encapsulating + * an IOException as its cause. + * + * @version $Revision: 7009 $ + * @author Paul Speed + */ +public class ConnectorException extends RuntimeException +{ + public ConnectorException( String message, Throwable cause ) + { + super( message, cause ); + } + + public ConnectorException( String message ) + { + super( message ); + } +} diff --git a/engine/src/networking/com/jme3/network/kernel/Endpoint.java b/engine/src/networking/com/jme3/network/kernel/Endpoint.java new file mode 100644 index 0000000..d63fdae --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/Endpoint.java @@ -0,0 +1,88 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel; + +import java.nio.ByteBuffer; + +/** + * An abstract endpoint in a Kernel that can be used for + * sending/receiving messages within the kernel space. + * + * @version $Revision: 7049 $ + * @author Paul Speed + */ +public interface Endpoint +{ + /** + * Returns an ID that is unique for this endpoint within its + * Kernel instance. + */ + public long getId(); + + /** + * Returns the transport specific remote address of this endpoint + * as a string. This may or may not be unique per endpoint depending + * on the type of transport. + */ + public String getAddress(); + + /** + * Returns the kernel to which this endpoint belongs. + */ + public Kernel getKernel(); + + /** + * Returns true if this endpoint is currently connected. + */ + public boolean isConnected(); + + /** + * Sends data to the other end of the connection represented + * by this endpoint. + */ + public void send( ByteBuffer data ); + + /** + * Closes this endpoint without flushing any of its + * currently enqueued outbound data. + */ + public void close(); + + /** + * Closes this endpoint, optionally flushing any queued + * data before closing. As soon as this method is called, + * ne send() calls will fail with an exception... even while + * close() is still flushing the earlier queued messages. + */ + public void close(boolean flushData); +} diff --git a/engine/src/networking/com/jme3/network/kernel/EndpointEvent.java b/engine/src/networking/com/jme3/network/kernel/EndpointEvent.java new file mode 100644 index 0000000..feb48a6 --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/EndpointEvent.java @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel; + + +/** + * Provides information about an added or + * removed connection. + * + * @version $Revision: 7009 $ + * @author Paul Speed + */ +public class EndpointEvent +{ + public enum Type { ADD, REMOVE }; + + private Kernel source; + private Endpoint endpoint; + private Type type; + + public EndpointEvent( Kernel source, Endpoint p, Type type ) + { + this.source = source; + this.endpoint = p; + this.type = type; + } + + public static EndpointEvent createAdd( Kernel source, Endpoint p ) + { + return new EndpointEvent( source, p, Type.ADD ); + } + + public static EndpointEvent createRemove( Kernel source, Endpoint p ) + { + return new EndpointEvent( source, p, Type.REMOVE ); + } + + public Kernel getSource() + { + return source; + } + + public Endpoint getEndpoint() + { + return endpoint; + } + + public Type getType() + { + return type; + } + + public String toString() + { + return "EndpointEvent[" + type + ", " + endpoint + "]"; + } +} diff --git a/engine/src/networking/com/jme3/network/kernel/Envelope.java b/engine/src/networking/com/jme3/network/kernel/Envelope.java new file mode 100644 index 0000000..3c0028b --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/Envelope.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel; + +/** + * Encapsulates a received piece of data. This is used by the Kernel + * to track incoming chunks of data. + * + * @version $Revision: 8843 $ + * @author Paul Speed + */ +public class Envelope +{ + private Endpoint source; + private byte[] data; + private boolean reliable; + + /** + * Creates an incoming envelope holding the data from the specified + * source. The 'reliable' flag further indicates on which mode of + * transport the data arrrived. + */ + public Envelope( Endpoint source, byte[] data, boolean reliable ) + { + this.source = source; + this.data = data; + this.reliable = reliable; + } + + public Endpoint getSource() + { + return source; + } + + public byte[] getData() + { + return data; + } + + public boolean isReliable() + { + return reliable; + } + + public String toString() + { + return "Envelope[" + source + ", " + (reliable?"reliable":"unreliable") + ", " + data.length + "]"; + } +} diff --git a/engine/src/networking/com/jme3/network/kernel/Kernel.java b/engine/src/networking/com/jme3/network/kernel/Kernel.java new file mode 100644 index 0000000..c13b736 --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/Kernel.java @@ -0,0 +1,94 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel; + +import com.jme3.network.Filter; +import java.nio.ByteBuffer; + +/** + * Defines the basic byte[] passing messaging + * kernel. + * + * @version $Revision: 8843 $ + * @author Paul Speed + */ +public interface Kernel +{ + /** + * A marker envelope returned from read() that indicates that + * there are events pending. This allows a single thread to + * more easily process the envelopes and endpoint events. + */ + public static final Envelope EVENTS_PENDING = new Envelope( null, new byte[0], false ); + + /** + * Initializes the kernel and starts any internal processing. + */ + public void initialize(); + + /** + * Gracefully terminates the kernel and stops any internal + * daemon processing. This method will not return until all + * internal threads have been shut down. + */ + public void terminate() throws InterruptedException; + + /** + * Dispatches the data to all endpoints managed by the + * kernel that match the specified endpoint filter.. + * If 'copy' is true then the implementation will copy the byte buffer + * before delivering it to endpoints. This allows the caller to reuse + * the data buffer. Though it is important that the buffer not be changed + * by another thread while this call is running. + * Only the bytes from data.position() to data.remaining() are sent. + */ + public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable, + boolean copy ); + + /** + * Returns true if there are waiting envelopes. + */ + public boolean hasEnvelopes(); + + /** + * Removes one envelope from the received messages queue or + * blocks until one is available. + */ + public Envelope read() throws InterruptedException; + + /** + * Removes and returnsn one endpoint event from the event queue or + * null if there are no endpoint events. + */ + public EndpointEvent nextEvent(); +} diff --git a/engine/src/networking/com/jme3/network/kernel/KernelException.java b/engine/src/networking/com/jme3/network/kernel/KernelException.java new file mode 100644 index 0000000..62a57e4 --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/KernelException.java @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel; + + +/** + * Represents a kernel-level error, usually encapsulating + * an IOException as its cause. + * + * @version $Revision: 7009 $ + * @author Paul Speed + */ +public class KernelException extends RuntimeException +{ + public KernelException( String message, Throwable cause ) + { + super( message, cause ); + } + + public KernelException( String message ) + { + super( message ); + } +} diff --git a/engine/src/networking/com/jme3/network/kernel/NamedThreadFactory.java b/engine/src/networking/com/jme3/network/kernel/NamedThreadFactory.java new file mode 100644 index 0000000..f56c23a --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/NamedThreadFactory.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel; + +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; + +/** + * A simple factory that delegates to java.util.concurrent's + * default thread factory but adds a prefix to the beginning + * of the thread name. + * + * @version $Revision: 7314 $ + * @author Paul Speed + */ +public class NamedThreadFactory implements ThreadFactory +{ + private String name; + private boolean daemon; + private ThreadFactory delegate; + + public NamedThreadFactory( String name ) + { + this( name, Executors.defaultThreadFactory() ); + } + + public NamedThreadFactory( String name, boolean daemon ) + { + this( name, daemon, Executors.defaultThreadFactory() ); + } + + public NamedThreadFactory( String name, ThreadFactory delegate ) + { + this( name, false, delegate ); + } + + public NamedThreadFactory( String name, boolean daemon, ThreadFactory delegate ) + { + this.name = name; + this.daemon = daemon; + this.delegate = delegate; + } + + public Thread newThread( Runnable r ) + { + Thread result = delegate.newThread(r); + String s = result.getName(); + result.setName( name + "[" + s + "]" ); + result.setDaemon(daemon); + return result; + } +} + diff --git a/engine/src/networking/com/jme3/network/kernel/package.html b/engine/src/networking/com/jme3/network/kernel/package.html new file mode 100644 index 0000000..7029c95 --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/package.html @@ -0,0 +1,34 @@ +<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> + +<head> +<title></title> +<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> +</head> + +<body> +The kernel package is the heart of the JME networking module +and controls the routing and dispatch of message data over +different transport implementations. Most users will never +have to deal with these classes unless they are writing their own +client and server implementations that diverge from the standard +classes that are provided. + +<p>{@link com.jme3.network.kernel.Kernel} defines the core of a server-side message +broker that abstracts away the specific transport and underlying +threading model used. For example, it might use NIO selectors +in a single threaded model or straight multithreaded socket +model. Or it might implement SSL connections. Once created, +{@link com.jme3.network.kernel.Kernel} users don't need to care about the details.</p> + +<p>{@link com.jme3.network.kernel.Endpoint} is a managed connection within a +{@link com.jme3.network.kernel.Kernel} providing kernel to client connectivity.</p> + +<p>{@link com.jme3.network.kernel.Connector} defines the basic client-side message sender +and these objects are typically used to connect to a {@link com.jme3.network.kernel.Kernel} +though they can connect to any network port that supports the implementation's +protocol. Implementations are provided for straight TCP and UDP communication +and could be extended to support SSL or different threading models.</p> + +</body> +</html> diff --git a/engine/src/networking/com/jme3/network/kernel/tcp/NioEndpoint.java b/engine/src/networking/com/jme3/network/kernel/tcp/NioEndpoint.java new file mode 100644 index 0000000..87b721f --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/tcp/NioEndpoint.java @@ -0,0 +1,181 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel.tcp; + +import com.jme3.network.kernel.Endpoint; +import com.jme3.network.kernel.Kernel; +import com.jme3.network.kernel.KernelException; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; +import java.util.concurrent.ConcurrentLinkedQueue; + + +/** + * Endpoint implementation that encapsulates the + * channel IO based connection information and keeps + * track of the outbound data queue for the channel. + * + * @version $Revision: 8944 $ + * @author Paul Speed + */ +public class NioEndpoint implements Endpoint +{ + protected static final ByteBuffer CLOSE_MARKER = ByteBuffer.allocate(0); + + private long id; + private SocketChannel socket; + private SelectorKernel kernel; + private ConcurrentLinkedQueue<ByteBuffer> outbound = new ConcurrentLinkedQueue<ByteBuffer>(); + private boolean closing = false; + + public NioEndpoint( SelectorKernel kernel, long id, SocketChannel socket ) + { + this.id = id; + this.socket = socket; + this.kernel = kernel; + } + + public Kernel getKernel() + { + return kernel; + } + + public void close() + { + close(false); + } + + public void close( boolean flushData ) + { + if( flushData ) { + closing = true; + + // Enqueue a close marker message to let the server + // know we should close + send( CLOSE_MARKER, false, true ); + + return; + } + + try { + // Note: even though we may be disconnected from the socket.isConnected() + // standpoint, it's still safest to tell the kernel so that it can be sure + // to stop managing us gracefully. + kernel.closeEndpoint(this); + } catch( IOException e ) { + throw new KernelException( "Error closing endpoint for socket:" + socket, e ); + } + } + + public long getId() + { + return id; + } + + public String getAddress() + { + return String.valueOf(socket.socket().getRemoteSocketAddress()); + } + + public boolean isConnected() + { + return socket.isConnected(); + } + + /** + * The wakeup option is used internally when the kernel is + * broadcasting out to a bunch of endpoints and doesn't want to + * necessarily wakeup right away. + */ + protected void send( ByteBuffer data, boolean copy, boolean wakeup ) + { + // We create a ByteBuffer per endpoint since we + // use it to track the data sent to each endpoint + // separately. + ByteBuffer buffer; + if( !copy ) { + buffer = data; + } else { + // Copy the buffer + buffer = ByteBuffer.allocate(data.remaining()); + buffer.put(data); + buffer.flip(); + } + + // Queue it up + outbound.add(buffer); + + if( wakeup ) + kernel.wakeupSelector(); + } + + /** + * Called by the SelectorKernel to get the current top + * buffer for writing. + */ + protected ByteBuffer peekPending() + { + return outbound.peek(); + } + + /** + * Called by the SelectorKernel when the top buffer + * has been exhausted. + */ + protected ByteBuffer removePending() + { + return outbound.poll(); + } + + protected boolean hasPending() + { + return !outbound.isEmpty(); + } + + public void send( ByteBuffer data ) + { + if( data == null ) { + throw new IllegalArgumentException( "Data cannot be null." ); + } + if( closing ) { + throw new KernelException( "Endpoint has been closed:" + socket ); + } + send( data, true, true ); + } + + public String toString() + { + return "NioEndpoint[" + id + ", " + socket + "]"; + } +} diff --git a/engine/src/networking/com/jme3/network/kernel/tcp/SelectorKernel.java b/engine/src/networking/com/jme3/network/kernel/tcp/SelectorKernel.java new file mode 100644 index 0000000..5fa75f3 --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/tcp/SelectorKernel.java @@ -0,0 +1,472 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel.tcp; + +import com.jme3.network.Filter; +import com.jme3.network.kernel.*; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.net.Socket; +import java.nio.ByteBuffer; +import java.nio.channels.*; +import java.nio.channels.spi.SelectorProvider; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + + +/** + * A Kernel implementation based on NIO selectors. + * + * @version $Revision: 8944 $ + * @author Paul Speed + */ +public class SelectorKernel extends AbstractKernel +{ + static Logger log = Logger.getLogger(SelectorKernel.class.getName()); + + private InetSocketAddress address; + private SelectorThread thread; + + private Map<Long,NioEndpoint> endpoints = new ConcurrentHashMap<Long,NioEndpoint>(); + + public SelectorKernel( InetAddress host, int port ) + { + this( new InetSocketAddress(host, port) ); + } + + public SelectorKernel( int port ) throws IOException + { + this( new InetSocketAddress(port) ); + } + + public SelectorKernel( InetSocketAddress address ) + { + this.address = address; + } + + protected SelectorThread createSelectorThread() + { + return new SelectorThread(); + } + + public void initialize() + { + if( thread != null ) + throw new IllegalStateException( "Kernel already initialized." ); + + thread = createSelectorThread(); + + try { + thread.connect(); + thread.start(); + } catch( IOException e ) { + throw new KernelException( "Error hosting:" + address, e ); + } + } + + public void terminate() throws InterruptedException + { + if( thread == null ) + throw new IllegalStateException( "Kernel not initialized." ); + + try { + thread.close(); + thread = null; + } catch( IOException e ) { + throw new KernelException( "Error closing host connection:" + address, e ); + } + } + + public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable, + boolean copy ) + { + if( !reliable ) + throw new UnsupportedOperationException( "Unreliable send not supported by this kernel." ); + + if( copy ) { + // Copy the data just once + byte[] temp = new byte[data.remaining()]; + System.arraycopy(data.array(), data.position(), temp, 0, data.remaining()); + data = ByteBuffer.wrap(temp); + } + + // Hand it to all of the endpoints that match our routing + for( NioEndpoint p : endpoints.values() ) { + // Does it match the filter? + if( filter != null && !filter.apply(p) ) + continue; + + // Give it the data... but let each endpoint track their + // own completion over the shared array of bytes by + // duplicating it + p.send( data.duplicate(), false, false ); + } + + // Wake up the selector so it can reinitialize its + // state accordingly. + wakeupSelector(); + } + + protected NioEndpoint addEndpoint( SocketChannel c ) + { + // Note: we purposely do NOT put the key in the endpoint. + // SelectionKeys are dangerous outside the selector thread + // and this is safer. + NioEndpoint p = new NioEndpoint( this, nextEndpointId(), c ); + + endpoints.put( p.getId(), p ); + + // Enqueue an endpoint event for the listeners + addEvent( EndpointEvent.createAdd( this, p ) ); + + return p; + } + + protected void removeEndpoint( NioEndpoint p, SocketChannel c ) + { + endpoints.remove( p.getId() ); + log.log( Level.FINE, "Endpoints size:{0}", endpoints.size() ); + + // Enqueue an endpoint event for the listeners + addEvent( EndpointEvent.createRemove( this, p ) ); + + // If there are no pending messages then add one so that the + // kernel-user knows to wake up if it is only listening for + // envelopes. + if( !hasEnvelopes() ) { + // Note: this is not really a race condition. At worst, our + // event has already been handled by now and it does no harm + // to check again. + addEnvelope( EVENTS_PENDING ); + } + } + + /** + * Called by the endpoints when they need to be closed. + */ + protected void closeEndpoint( NioEndpoint p ) throws IOException + { + //log.log( Level.INFO, "Closing endpoint:{0}.", p ); + + thread.cancel(p); + } + + /** + * Used internally by the endpoints to wakeup the selector + * when they have data to send. + */ + protected void wakeupSelector() + { + thread.wakeupSelector(); + } + + protected void newData( NioEndpoint p, SocketChannel c, ByteBuffer shared, int size ) + { + // Note: if ever desirable, it would be possible to accumulate + // data per source channel and only 'finalize' it when + // asked for more envelopes then were ready. I just don't + // think it will be an issue in practice. The busier the + // server, the more the buffers will fill before we get to them. + // And if the server isn't busy, who cares if we chop things up + // smaller... the network is still likely to deliver things in + // bulk anyway. + + // Must copy the shared data before we use it + byte[] dataCopy = new byte[size]; + System.arraycopy(shared.array(), 0, dataCopy, 0, size); + + Envelope env = new Envelope( p, dataCopy, true ); + addEnvelope( env ); + } + + /** + * This class is purposely tucked neatly away because + * messing with the selector from other threads for any + * reason is very bad. This is the safest architecture. + */ + protected class SelectorThread extends Thread + { + private ServerSocketChannel serverChannel; + private Selector selector; + private AtomicBoolean go = new AtomicBoolean(true); + private ByteBuffer working = ByteBuffer.allocate( 8192 ); + + /** + * Because we want to keep the keys to ourselves, we'll do + * the endpoint -> key mapping internally. + */ + private Map<NioEndpoint,SelectionKey> endpointKeys = new ConcurrentHashMap<NioEndpoint,SelectionKey>(); + + public SelectorThread() + { + setName( "Selector@" + address ); + setDaemon(true); + } + + public void connect() throws IOException + { + // Create a new selector + this.selector = SelectorProvider.provider().openSelector(); + + // Create a new non-blocking server socket channel + this.serverChannel = ServerSocketChannel.open(); + serverChannel.configureBlocking(false); + + // Bind the server socket to the specified address and port + serverChannel.socket().bind(address); + + // Register the server socket channel, indicating an interest in + // accepting new connections + serverChannel.register(selector, SelectionKey.OP_ACCEPT); + + log.log( Level.INFO, "Hosting TCP connection:{0}.", address ); + } + + public void close() throws IOException, InterruptedException + { + // Set the thread to stop + go.set(false); + + // Make sure the channel is closed + serverChannel.close(); + + // Force the selector to stop blocking + wakeupSelector(); + + // And wait for it + join(); + } + + protected void wakeupSelector() + { + selector.wakeup(); + } + + protected void setupSelectorOptions() + { + // For now, selection keys will either be in OP_READ + // or OP_WRITE. So while we are writing a buffer, we + // will not be reading. This is way simpler and less + // error prone... it can always be changed when everything + // else works if we are looking to micro-optimize. + + // Setup options based on the current state of + // the endpoints. This could potentially be more + // efficiently done as change requests... or simply + // keeping a thread-safe set of endpoints with pending + // writes. For most cases, it shouldn't matter. + for( Map.Entry<NioEndpoint,SelectionKey> e : endpointKeys.entrySet() ) { + if( e.getKey().hasPending() ) { + e.getValue().interestOps(SelectionKey.OP_WRITE); + } + } + } + + protected void accept( SelectionKey key ) throws IOException + { + // Would only get accepts on a server channel + ServerSocketChannel serverChan = (ServerSocketChannel)key.channel(); + + // Setup the connection to be non-blocking + SocketChannel remoteChan = serverChan.accept(); + remoteChan.configureBlocking(false); + + // And disable Nagle's buffering algorithm... we want + // data to go when we put it there. + Socket sock = remoteChan.socket(); + sock.setTcpNoDelay(true); + + // Let the selector know we're interested in reading + // data from the channel + SelectionKey endKey = remoteChan.register( selector, SelectionKey.OP_READ ); + + // And now create a new endpoint + NioEndpoint p = addEndpoint( remoteChan ); + endKey.attach(p); + endpointKeys.put(p, endKey); + } + + protected void cancel( NioEndpoint p ) throws IOException + { + SelectionKey key = endpointKeys.remove(p); + if( key == null ) { + //log.log( Level.INFO, "Endpoint already closed:{0}.", p ); + return; // already closed it + } + log.log( Level.FINE, "Endpoint keys size:{0}", endpointKeys.size() ); + + log.log( Level.INFO, "Closing endpoint:{0}.", p ); + SocketChannel c = (SocketChannel)key.channel(); + + // Note: key.cancel() is specifically thread safe. One of + // the few things one can do with a key from another + // thread. + key.cancel(); + c.close(); + removeEndpoint( p, c ); + } + + protected void cancel( SelectionKey key, SocketChannel c ) throws IOException + { + NioEndpoint p = (NioEndpoint)key.attachment(); + log.log( Level.INFO, "Closing channel endpoint:{0}.", p ); + Object o = endpointKeys.remove(p); + + log.log( Level.FINE, "Endpoint keys size:{0}", endpointKeys.size() ); + + key.cancel(); + c.close(); + removeEndpoint( p, c ); + } + + protected void read( SelectionKey key ) throws IOException + { + NioEndpoint p = (NioEndpoint)key.attachment(); + SocketChannel c = (SocketChannel)key.channel(); + working.clear(); + + int size; + try { + size = c.read(working); + } catch( IOException e ) { + // The remove end forcibly closed the connection... + // close out our end and cancel the key + cancel( key, c ); + return; + } + + if( size == -1 ) { + // The remote end shut down cleanly... + // close out our end and cancel the key + cancel( key, c ); + return; + } + + newData( p, c, working, size ); + } + + protected void write( SelectionKey key ) throws IOException + { + NioEndpoint p = (NioEndpoint)key.attachment(); + SocketChannel c = (SocketChannel)key.channel(); + + // We will send what we can and move on. + ByteBuffer current = p.peekPending(); + if( current == NioEndpoint.CLOSE_MARKER ) { + // This connection wants to be closed now + closeEndpoint(p); + + // Nothing more to do + return; + } + + c.write( current ); + + // If we wrote all of that packet then we need to remove it + if( current.remaining() == 0 ) { + p.removePending(); + } + + // If we happened to empty the pending queue then let's read + // again. + if( !p.hasPending() ) { + key.interestOps( SelectionKey.OP_READ ); + } + } + + protected void select() throws IOException + { + selector.select(); + + for( Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext(); ) { + SelectionKey key = i.next(); + i.remove(); + + if( !key.isValid() ) + { + // When does this happen? + log.log( Level.INFO, "Key is not valid:{0}.", key ); + continue; + } + + try { + if( key.isAcceptable() ) + accept(key); + else if( key.isWritable() ) + write(key); + else if( key.isReadable() ) + read(key); + } catch( IOException e ) { + if( !go.get() ) + return; // error likely due to shutting down + reportError( e ); + + // And at this level, errors likely mean the key is now + // dead and it doesn't hurt to kick them anyway. If we + // find IOExceptions that are not fatal, this can be + // readdressed + cancel( key, (SocketChannel)key.channel() ); + } + } + } + + public void run() + { + log.log( Level.INFO, "Kernel started for connection:{0}.", address ); + + // An atomic is safest and costs almost nothing + while( go.get() ) { + // Setup any queued option changes + setupSelectorOptions(); + + // Check for available keys and process them + try { + select(); + } catch( ClosedSelectorException e ) { + if( !go.get() ) + return; // it's because we're shutting down + throw new KernelException( "Premature selector closing", e ); + } catch( IOException e ) { + if( !go.get() ) + return; // error likely due to shutting down + reportError( e ); + } + } + } + } +} diff --git a/engine/src/networking/com/jme3/network/kernel/tcp/SocketConnector.java b/engine/src/networking/com/jme3/network/kernel/tcp/SocketConnector.java new file mode 100644 index 0000000..66d2040 --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/tcp/SocketConnector.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel.tcp; + +import com.jme3.network.kernel.Connector; +import com.jme3.network.kernel.ConnectorException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.InetAddress; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * A straight forward socket-based connector implementation that + * does not use any separate threading. It relies completely on + * the buffering in the OS network layer. + * + * @version $Revision: 8843 $ + * @author Paul Speed + */ +public class SocketConnector implements Connector +{ + private Socket sock; + private InputStream in; + private OutputStream out; + private SocketAddress remoteAddress; + private byte[] buffer = new byte[65535]; + private AtomicBoolean connected = new AtomicBoolean(false); + + public SocketConnector( InetAddress address, int port ) throws IOException + { + this.sock = new Socket(address, port); + remoteAddress = sock.getRemoteSocketAddress(); // for info purposes + + // Disable Nagle's buffering so data goes out when we + // put it there. + sock.setTcpNoDelay(true); + + in = sock.getInputStream(); + out = sock.getOutputStream(); + + connected.set(true); + } + + protected void checkClosed() + { + if( sock == null ) + throw new ConnectorException( "Connection is closed:" + remoteAddress ); + } + + public boolean isConnected() + { + if( sock == null ) + return false; + return sock.isConnected(); + } + + public void close() + { + checkClosed(); + try { + Socket temp = sock; + sock = null; + connected.set(false); + temp.close(); + } catch( IOException e ) { + throw new ConnectorException( "Error closing socket for:" + remoteAddress, e ); + } + } + + public boolean available() + { + checkClosed(); + try { + return in.available() > 0; + } catch( IOException e ) { + throw new ConnectorException( "Error retrieving data availability for:" + remoteAddress, e ); + } + } + + public ByteBuffer read() + { + checkClosed(); + + try { + // Read what we can + int count = in.read(buffer); + if( count < 0 ) { + // Socket is closed + close(); + return null; + } + + // Wrap it in a ByteBuffer for the caller + return ByteBuffer.wrap( buffer, 0, count ); + } catch( IOException e ) { + if( !connected.get() ) { + // Nothing to see here... just move along + return null; + } + throw new ConnectorException( "Error reading from connection to:" + remoteAddress, e ); + } + } + + public void write( ByteBuffer data ) + { + checkClosed(); + + try { + out.write(data.array(), data.position(), data.remaining()); + } catch( IOException e ) { + throw new ConnectorException( "Error writing to connection:" + remoteAddress, e ); + } + } + +} diff --git a/engine/src/networking/com/jme3/network/kernel/udp/UdpConnector.java b/engine/src/networking/com/jme3/network/kernel/udp/UdpConnector.java new file mode 100644 index 0000000..0193e1c --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/udp/UdpConnector.java @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel.udp; + +import com.jme3.network.kernel.Connector; +import com.jme3.network.kernel.ConnectorException; +import java.io.IOException; +import java.net.*; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicBoolean; + + +/** + * A straight forward datagram socket-based UDP connector + * implementation. + * + * @version $Revision: 8843 $ + * @author Paul Speed + */ +public class UdpConnector implements Connector +{ + private DatagramSocket sock = new DatagramSocket(); + private SocketAddress remoteAddress; + private byte[] buffer = new byte[65535]; + private AtomicBoolean connected = new AtomicBoolean(false); + + /** + * In order to provide proper available() checking, we + * potentially queue one datagram. + */ + private DatagramPacket pending; + + /** + * Creates a new UDP connection that send datagrams to the + * specified address and port. + */ + public UdpConnector( InetAddress remote, int remotePort ) throws IOException + { + InetSocketAddress localSocketAddress = new InetSocketAddress(0); + this.sock = new DatagramSocket( localSocketAddress ); + remoteAddress = new InetSocketAddress( remote, remotePort ); + + // Setup to receive only from the remote address + sock.connect( remoteAddress ); + + connected.set(true); + } + + protected void checkClosed() + { + if( sock == null ) + throw new ConnectorException( "Connection is closed:" + remoteAddress ); + } + + public boolean isConnected() + { + if( sock == null ) + return false; + return sock.isConnected(); + } + + public void close() + { + checkClosed(); + DatagramSocket temp = sock; + sock = null; + connected.set(false); + temp.close(); + } + + /** + * This always returns false since the simple DatagramSocket usage + * cannot be run in a non-blocking way. + */ + public boolean available() + { + // It would take a separate thread or an NIO Selector based implementation to get this + // to work. If a polling strategy is never employed by callers then it doesn't + // seem worth it to implement all of that just for this method. + checkClosed(); + return false; + } + + public ByteBuffer read() + { + checkClosed(); + + try { + DatagramPacket packet = new DatagramPacket( buffer, buffer.length ); + sock.receive(packet); + + // Wrap it in a ByteBuffer for the caller + return ByteBuffer.wrap( buffer, 0, packet.getLength() ); + } catch( IOException e ) { + if( !connected.get() ) { + // Nothing to see here... just move along + return null; + } + throw new ConnectorException( "Error reading from connection to:" + remoteAddress, e ); + } + } + + public void write( ByteBuffer data ) + { + checkClosed(); + + try { + DatagramPacket p = new DatagramPacket( data.array(), data.position(), data.remaining(), + remoteAddress ); + sock.send(p); + } catch( IOException e ) { + throw new ConnectorException( "Error writing to connection:" + remoteAddress, e ); + } + } +} + diff --git a/engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java b/engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java new file mode 100644 index 0000000..4a98387 --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java @@ -0,0 +1,140 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel.udp; + +import com.jme3.network.kernel.Endpoint; +import com.jme3.network.kernel.Kernel; +import com.jme3.network.kernel.KernelException; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; +import java.net.SocketAddress; +import java.nio.ByteBuffer; + + +/** + * Endpoint implementation that encapsulates the + * UDP connection information for return messaging, + * identification of envelope sources, etc. + * + * @version $Revision: 8843 $ + * @author Paul Speed + */ +public class UdpEndpoint implements Endpoint +{ + private long id; + private SocketAddress address; + private DatagramSocket socket; + private UdpKernel kernel; + private boolean connected = true; // it's connectionless but we track logical state + + public UdpEndpoint( UdpKernel kernel, long id, SocketAddress address, DatagramSocket socket ) + { + this.id = id; + this.address = address; + this.socket = socket; + this.kernel = kernel; + } + + public Kernel getKernel() + { + return kernel; + } + + protected SocketAddress getRemoteAddress() + { + return address; + } + + public void close() + { + close( false ); + } + + public void close( boolean flush ) + { + // No real reason to flush UDP traffic yet... especially + // when considering that the outbound UDP isn't even + // queued. + + try { + kernel.closeEndpoint(this); + connected = false; + } catch( IOException e ) { + throw new KernelException( "Error closing endpoint for socket:" + socket, e ); + } + } + + public long getId() + { + return id; + } + + public String getAddress() + { + return String.valueOf(address); + } + + public boolean isConnected() + { + // The socket is always unconnected anyway so we track our + // own logical state for the kernel's benefit. + return connected; + } + + public void send( ByteBuffer data ) + { + if( !isConnected() ) { + throw new KernelException( "Endpoint is not connected:" + this ); + } + + + try { + DatagramPacket p = new DatagramPacket( data.array(), data.position(), + data.remaining(), address ); + + // Just queue it up for the kernel threads to write + // out + kernel.enqueueWrite( this, p ); + + //socket.send(p); + } catch( IOException e ) { + throw new KernelException( "Error sending datagram to:" + address, e ); + } + } + + public String toString() + { + return "UdpEndpoint[" + id + ", " + address + "]"; + } +} diff --git a/engine/src/networking/com/jme3/network/kernel/udp/UdpKernel.java b/engine/src/networking/com/jme3/network/kernel/udp/UdpKernel.java new file mode 100644 index 0000000..c0a0f88 --- /dev/null +++ b/engine/src/networking/com/jme3/network/kernel/udp/UdpKernel.java @@ -0,0 +1,294 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.kernel.udp; + +import com.jme3.network.Filter; +import com.jme3.network.kernel.*; +import java.io.IOException; +import java.net.*; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A Kernel implementation using UDP packets. + * + * @version $Revision: 8944 $ + * @author Paul Speed + */ +public class UdpKernel extends AbstractKernel +{ + static Logger log = Logger.getLogger(UdpKernel.class.getName()); + + private InetSocketAddress address; + private HostThread thread; + + private ExecutorService writer; + + // The nature of UDP means that even through a firewall, + // a user would have to have a unique address+port since UDP + // can't really be NAT'ed. + private Map<SocketAddress,UdpEndpoint> socketEndpoints = new ConcurrentHashMap<SocketAddress,UdpEndpoint>(); + + public UdpKernel( InetAddress host, int port ) + { + this( new InetSocketAddress(host, port) ); + } + + public UdpKernel( int port ) throws IOException + { + this( new InetSocketAddress(port) ); + } + + public UdpKernel( InetSocketAddress address ) + { + this.address = address; + } + + protected HostThread createHostThread() + { + return new HostThread(); + } + + public void initialize() + { + if( thread != null ) + throw new IllegalStateException( "Kernel already initialized." ); + + writer = Executors.newFixedThreadPool(2, new NamedThreadFactory(toString() + "-writer")); + + thread = createHostThread(); + + try { + thread.connect(); + thread.start(); + } catch( IOException e ) { + throw new KernelException( "Error hosting:" + address, e ); + } + } + + public void terminate() throws InterruptedException + { + if( thread == null ) + throw new IllegalStateException( "Kernel not initialized." ); + + try { + thread.close(); + writer.shutdown(); + thread = null; + } catch( IOException e ) { + throw new KernelException( "Error closing host connection:" + address, e ); + } + } + + /** + * Dispatches the data to all endpoints managed by the + * kernel. 'routing' is currently ignored. + */ + public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable, + boolean copy ) + { + if( reliable ) + throw new UnsupportedOperationException( "Reliable send not supported by this kernel." ); + + if( copy ) { + // Copy the data just once + byte[] temp = new byte[data.remaining()]; + System.arraycopy(data.array(), data.position(), temp, 0, data.remaining()); + data = ByteBuffer.wrap(temp); + } + + // Hand it to all of the endpoints that match our routing + for( UdpEndpoint p : socketEndpoints.values() ) { + // Does it match the filter? + if( filter != null && !filter.apply(p) ) + continue; + + // Send the data + p.send( data ); + } + } + + protected Endpoint getEndpoint( SocketAddress address, boolean create ) + { + UdpEndpoint p = socketEndpoints.get(address); + if( p == null && create ) { + p = new UdpEndpoint( this, nextEndpointId(), address, thread.getSocket() ); + socketEndpoints.put( address, p ); + + // Add an event for it. + addEvent( EndpointEvent.createAdd( this, p ) ); + } + return p; + } + + /** + * Called by the endpoints when they need to be closed. + */ + protected void closeEndpoint( UdpEndpoint p ) throws IOException + { + // Just book-keeping to do here. + if( socketEndpoints.remove( p.getRemoteAddress() ) == null ) + return; + + log.log( Level.INFO, "Closing endpoint:{0}.", p ); + log.log( Level.FINE, "Socket endpoints size:{0}", socketEndpoints.size() ); + + addEvent( EndpointEvent.createRemove( this, p ) ); + + // If there are no pending messages then add one so that the + // kernel-user knows to wake up if it is only listening for + // envelopes. + if( !hasEnvelopes() ) { + // Note: this is not really a race condition. At worst, our + // event has already been handled by now and it does no harm + // to check again. + addEnvelope( EVENTS_PENDING ); + } + } + + protected void newData( DatagramPacket packet ) + { + // So the tricky part here is figuring out the endpoint and + // whether it's new or not. In these UDP schemes, firewalls have + // to be ported back to a specific machine so we will consider + // the address + port (ie: SocketAddress) the defacto unique + // ID. + Endpoint p = getEndpoint( packet.getSocketAddress(), true ); + + // We'll copy the data to trim it. + byte[] data = new byte[packet.getLength()]; + System.arraycopy(packet.getData(), 0, data, 0, data.length); + + Envelope env = new Envelope( p, data, false ); + addEnvelope( env ); + } + + protected void enqueueWrite( Endpoint endpoint, DatagramPacket packet ) + { + writer.execute( new MessageWriter(endpoint, packet) ); + } + + protected class MessageWriter implements Runnable + { + private Endpoint endpoint; + private DatagramPacket packet; + + public MessageWriter( Endpoint endpoint, DatagramPacket packet ) + { + this.endpoint = endpoint; + this.packet = packet; + } + + public void run() + { + // Not guaranteed to always work but an extra datagram + // to a dead connection isn't so big of a deal. + if( !endpoint.isConnected() ) { + return; + } + + try { + thread.getSocket().send(packet); + } catch( Exception e ) { + KernelException exc = new KernelException( "Error sending datagram to:" + address, e ); + exc.fillInStackTrace(); + reportError(exc); + } + } + } + + protected class HostThread extends Thread + { + private DatagramSocket socket; + private AtomicBoolean go = new AtomicBoolean(true); + + private byte[] buffer = new byte[65535]; // slightly bigger than needed. + + public HostThread() + { + setName( "UDP Host@" + address ); + setDaemon(true); + } + + protected DatagramSocket getSocket() + { + return socket; + } + + public void connect() throws IOException + { + socket = new DatagramSocket( address ); + log.log( Level.INFO, "Hosting UDP connection:{0}.", address ); + } + + public void close() throws IOException, InterruptedException + { + // Set the thread to stop + go.set(false); + + // Make sure the channel is closed + socket.close(); + + // And wait for it + join(); + } + + public void run() + { + log.log( Level.INFO, "Kernel started for connection:{0}.", address ); + + // An atomic is safest and costs almost nothing + while( go.get() ) { + try { + // Could reuse the packet but I don't see the + // point and it may lead to subtle bugs if not properly + // reset. + DatagramPacket packet = new DatagramPacket( buffer, buffer.length ); + socket.receive(packet); + + newData( packet ); + } catch( IOException e ) { + if( !go.get() ) + return; + reportError( e ); + } + } + } + } +} |