aboutsummaryrefslogtreecommitdiff
path: root/engine/src/networking/com/jme3/network/kernel
diff options
context:
space:
mode:
Diffstat (limited to 'engine/src/networking/com/jme3/network/kernel')
-rw-r--r--engine/src/networking/com/jme3/network/kernel/AbstractKernel.java122
-rw-r--r--engine/src/networking/com/jme3/network/kernel/Connector.java83
-rw-r--r--engine/src/networking/com/jme3/network/kernel/ConnectorException.java54
-rw-r--r--engine/src/networking/com/jme3/network/kernel/Endpoint.java88
-rw-r--r--engine/src/networking/com/jme3/network/kernel/EndpointEvent.java87
-rw-r--r--engine/src/networking/com/jme3/network/kernel/Envelope.java79
-rw-r--r--engine/src/networking/com/jme3/network/kernel/Kernel.java94
-rw-r--r--engine/src/networking/com/jme3/network/kernel/KernelException.java54
-rw-r--r--engine/src/networking/com/jme3/network/kernel/NamedThreadFactory.java83
-rw-r--r--engine/src/networking/com/jme3/network/kernel/package.html34
-rw-r--r--engine/src/networking/com/jme3/network/kernel/tcp/NioEndpoint.java181
-rw-r--r--engine/src/networking/com/jme3/network/kernel/tcp/SelectorKernel.java472
-rw-r--r--engine/src/networking/com/jme3/network/kernel/tcp/SocketConnector.java150
-rw-r--r--engine/src/networking/com/jme3/network/kernel/udp/UdpConnector.java146
-rw-r--r--engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java140
-rw-r--r--engine/src/networking/com/jme3/network/kernel/udp/UdpKernel.java294
16 files changed, 2161 insertions, 0 deletions
diff --git a/engine/src/networking/com/jme3/network/kernel/AbstractKernel.java b/engine/src/networking/com/jme3/network/kernel/AbstractKernel.java
new file mode 100644
index 0000000..9881bfa
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/AbstractKernel.java
@@ -0,0 +1,122 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Base implementation of the Kernel interface providing several
+ * useful default implementations of some methods. This implementation
+ * assumes that the kernel will be managing its own internal threads
+ * and queuing any results for the caller to retrieve on their own
+ * thread.
+ *
+ * @version $Revision: 8843 $
+ * @author Paul Speed
+ */
+public abstract class AbstractKernel implements Kernel
+{
+ static Logger log = Logger.getLogger(AbstractKernel.class.getName());
+
+ private AtomicLong nextId = new AtomicLong(1);
+
+ /**
+ * Contains the pending endpoint events waiting for the caller
+ * to retrieve them.
+ */
+ private ConcurrentLinkedQueue<EndpointEvent> endpointEvents = new ConcurrentLinkedQueue<EndpointEvent>();
+
+ /**
+ * Contains the pending envelopes waiting for the caller to
+ * retrieve them.
+ */
+ private LinkedBlockingQueue<Envelope> envelopes = new LinkedBlockingQueue<Envelope>();
+
+ protected AbstractKernel()
+ {
+ }
+
+ protected void reportError( Exception e )
+ {
+ // Should really be queued up so the outer thread can
+ // retrieve them. For now we'll just log it. FIXME
+ log.log( Level.SEVERE, "Unhanddled kernel error", e );
+ }
+
+ protected long nextEndpointId()
+ {
+ return nextId.getAndIncrement();
+ }
+
+ /**
+ * Returns true if there are waiting envelopes.
+ */
+ public boolean hasEnvelopes()
+ {
+ return !envelopes.isEmpty();
+ }
+
+ /**
+ * Removes one envelope from the received messages queue or
+ * blocks until one is available.
+ */
+ public Envelope read() throws InterruptedException
+ {
+ return envelopes.take();
+ }
+
+ /**
+ * Removes and returnsn one endpoint event from the event queue or
+ * null if there are no endpoint events.
+ */
+ public EndpointEvent nextEvent()
+ {
+ return endpointEvents.poll();
+ }
+
+ protected void addEvent( EndpointEvent e )
+ {
+ endpointEvents.add( e );
+ }
+
+ protected void addEnvelope( Envelope env )
+ {
+ if( !envelopes.offer( env ) ) {
+ throw new KernelException( "Critical error, could not enqueue envelope." );
+ }
+ }
+}
diff --git a/engine/src/networking/com/jme3/network/kernel/Connector.java b/engine/src/networking/com/jme3/network/kernel/Connector.java
new file mode 100644
index 0000000..f86c17b
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/Connector.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel;
+
+import java.nio.ByteBuffer;
+
+/**
+ * A single channel remote connection allowing the sending
+ * and receiving of data. As opposed to the Kernel, this will
+ * only ever receive data from one Endpoint and so bypasses
+ * the envelope wrapping.
+ *
+ * @version $Revision: 7012 $
+ * @author Paul Speed
+ */
+public interface Connector
+{
+ /**
+ * Returns true if this connector is currently connected.
+ */
+ public boolean isConnected();
+
+ /**
+ * Closes the connection. Any subsequent attempts to read
+ * or write will fail with an exception.
+ */
+ public void close();
+
+ /**
+ * Returns true if there is currently data available for
+ * reading. Some connector implementations may not be able
+ * to answer this question accurately and will always return
+ * false.
+ */
+ public boolean available();
+
+ /**
+ * Reads a chunk of data from the connection, blocking if
+ * there is no data available. The buffer may only be valid
+ * until the next read() call is made. Callers should copy
+ * the data if they need it for longer than that.
+ *
+ * @return The data read or null if there is no more data
+ * because the connection is closed.
+ */
+ public ByteBuffer read();
+
+ /**
+ * Writes a chunk of data to the connection from data.position()
+ * to data.limit().
+ */
+ public void write( ByteBuffer data );
+}
diff --git a/engine/src/networking/com/jme3/network/kernel/ConnectorException.java b/engine/src/networking/com/jme3/network/kernel/ConnectorException.java
new file mode 100644
index 0000000..31d02b0
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/ConnectorException.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel;
+
+
+/**
+ * Represents a client-side connection error, usually encapsulating
+ * an IOException as its cause.
+ *
+ * @version $Revision: 7009 $
+ * @author Paul Speed
+ */
+public class ConnectorException extends RuntimeException
+{
+ public ConnectorException( String message, Throwable cause )
+ {
+ super( message, cause );
+ }
+
+ public ConnectorException( String message )
+ {
+ super( message );
+ }
+}
diff --git a/engine/src/networking/com/jme3/network/kernel/Endpoint.java b/engine/src/networking/com/jme3/network/kernel/Endpoint.java
new file mode 100644
index 0000000..d63fdae
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/Endpoint.java
@@ -0,0 +1,88 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel;
+
+import java.nio.ByteBuffer;
+
+/**
+ * An abstract endpoint in a Kernel that can be used for
+ * sending/receiving messages within the kernel space.
+ *
+ * @version $Revision: 7049 $
+ * @author Paul Speed
+ */
+public interface Endpoint
+{
+ /**
+ * Returns an ID that is unique for this endpoint within its
+ * Kernel instance.
+ */
+ public long getId();
+
+ /**
+ * Returns the transport specific remote address of this endpoint
+ * as a string. This may or may not be unique per endpoint depending
+ * on the type of transport.
+ */
+ public String getAddress();
+
+ /**
+ * Returns the kernel to which this endpoint belongs.
+ */
+ public Kernel getKernel();
+
+ /**
+ * Returns true if this endpoint is currently connected.
+ */
+ public boolean isConnected();
+
+ /**
+ * Sends data to the other end of the connection represented
+ * by this endpoint.
+ */
+ public void send( ByteBuffer data );
+
+ /**
+ * Closes this endpoint without flushing any of its
+ * currently enqueued outbound data.
+ */
+ public void close();
+
+ /**
+ * Closes this endpoint, optionally flushing any queued
+ * data before closing. As soon as this method is called,
+ * ne send() calls will fail with an exception... even while
+ * close() is still flushing the earlier queued messages.
+ */
+ public void close(boolean flushData);
+}
diff --git a/engine/src/networking/com/jme3/network/kernel/EndpointEvent.java b/engine/src/networking/com/jme3/network/kernel/EndpointEvent.java
new file mode 100644
index 0000000..feb48a6
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/EndpointEvent.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel;
+
+
+/**
+ * Provides information about an added or
+ * removed connection.
+ *
+ * @version $Revision: 7009 $
+ * @author Paul Speed
+ */
+public class EndpointEvent
+{
+ public enum Type { ADD, REMOVE };
+
+ private Kernel source;
+ private Endpoint endpoint;
+ private Type type;
+
+ public EndpointEvent( Kernel source, Endpoint p, Type type )
+ {
+ this.source = source;
+ this.endpoint = p;
+ this.type = type;
+ }
+
+ public static EndpointEvent createAdd( Kernel source, Endpoint p )
+ {
+ return new EndpointEvent( source, p, Type.ADD );
+ }
+
+ public static EndpointEvent createRemove( Kernel source, Endpoint p )
+ {
+ return new EndpointEvent( source, p, Type.REMOVE );
+ }
+
+ public Kernel getSource()
+ {
+ return source;
+ }
+
+ public Endpoint getEndpoint()
+ {
+ return endpoint;
+ }
+
+ public Type getType()
+ {
+ return type;
+ }
+
+ public String toString()
+ {
+ return "EndpointEvent[" + type + ", " + endpoint + "]";
+ }
+}
diff --git a/engine/src/networking/com/jme3/network/kernel/Envelope.java b/engine/src/networking/com/jme3/network/kernel/Envelope.java
new file mode 100644
index 0000000..3c0028b
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/Envelope.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel;
+
+/**
+ * Encapsulates a received piece of data. This is used by the Kernel
+ * to track incoming chunks of data.
+ *
+ * @version $Revision: 8843 $
+ * @author Paul Speed
+ */
+public class Envelope
+{
+ private Endpoint source;
+ private byte[] data;
+ private boolean reliable;
+
+ /**
+ * Creates an incoming envelope holding the data from the specified
+ * source. The 'reliable' flag further indicates on which mode of
+ * transport the data arrrived.
+ */
+ public Envelope( Endpoint source, byte[] data, boolean reliable )
+ {
+ this.source = source;
+ this.data = data;
+ this.reliable = reliable;
+ }
+
+ public Endpoint getSource()
+ {
+ return source;
+ }
+
+ public byte[] getData()
+ {
+ return data;
+ }
+
+ public boolean isReliable()
+ {
+ return reliable;
+ }
+
+ public String toString()
+ {
+ return "Envelope[" + source + ", " + (reliable?"reliable":"unreliable") + ", " + data.length + "]";
+ }
+}
diff --git a/engine/src/networking/com/jme3/network/kernel/Kernel.java b/engine/src/networking/com/jme3/network/kernel/Kernel.java
new file mode 100644
index 0000000..c13b736
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/Kernel.java
@@ -0,0 +1,94 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel;
+
+import com.jme3.network.Filter;
+import java.nio.ByteBuffer;
+
+/**
+ * Defines the basic byte[] passing messaging
+ * kernel.
+ *
+ * @version $Revision: 8843 $
+ * @author Paul Speed
+ */
+public interface Kernel
+{
+ /**
+ * A marker envelope returned from read() that indicates that
+ * there are events pending. This allows a single thread to
+ * more easily process the envelopes and endpoint events.
+ */
+ public static final Envelope EVENTS_PENDING = new Envelope( null, new byte[0], false );
+
+ /**
+ * Initializes the kernel and starts any internal processing.
+ */
+ public void initialize();
+
+ /**
+ * Gracefully terminates the kernel and stops any internal
+ * daemon processing. This method will not return until all
+ * internal threads have been shut down.
+ */
+ public void terminate() throws InterruptedException;
+
+ /**
+ * Dispatches the data to all endpoints managed by the
+ * kernel that match the specified endpoint filter..
+ * If 'copy' is true then the implementation will copy the byte buffer
+ * before delivering it to endpoints. This allows the caller to reuse
+ * the data buffer. Though it is important that the buffer not be changed
+ * by another thread while this call is running.
+ * Only the bytes from data.position() to data.remaining() are sent.
+ */
+ public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
+ boolean copy );
+
+ /**
+ * Returns true if there are waiting envelopes.
+ */
+ public boolean hasEnvelopes();
+
+ /**
+ * Removes one envelope from the received messages queue or
+ * blocks until one is available.
+ */
+ public Envelope read() throws InterruptedException;
+
+ /**
+ * Removes and returnsn one endpoint event from the event queue or
+ * null if there are no endpoint events.
+ */
+ public EndpointEvent nextEvent();
+}
diff --git a/engine/src/networking/com/jme3/network/kernel/KernelException.java b/engine/src/networking/com/jme3/network/kernel/KernelException.java
new file mode 100644
index 0000000..62a57e4
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/KernelException.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel;
+
+
+/**
+ * Represents a kernel-level error, usually encapsulating
+ * an IOException as its cause.
+ *
+ * @version $Revision: 7009 $
+ * @author Paul Speed
+ */
+public class KernelException extends RuntimeException
+{
+ public KernelException( String message, Throwable cause )
+ {
+ super( message, cause );
+ }
+
+ public KernelException( String message )
+ {
+ super( message );
+ }
+}
diff --git a/engine/src/networking/com/jme3/network/kernel/NamedThreadFactory.java b/engine/src/networking/com/jme3/network/kernel/NamedThreadFactory.java
new file mode 100644
index 0000000..f56c23a
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/NamedThreadFactory.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+
+/**
+ * A simple factory that delegates to java.util.concurrent's
+ * default thread factory but adds a prefix to the beginning
+ * of the thread name.
+ *
+ * @version $Revision: 7314 $
+ * @author Paul Speed
+ */
+public class NamedThreadFactory implements ThreadFactory
+{
+ private String name;
+ private boolean daemon;
+ private ThreadFactory delegate;
+
+ public NamedThreadFactory( String name )
+ {
+ this( name, Executors.defaultThreadFactory() );
+ }
+
+ public NamedThreadFactory( String name, boolean daemon )
+ {
+ this( name, daemon, Executors.defaultThreadFactory() );
+ }
+
+ public NamedThreadFactory( String name, ThreadFactory delegate )
+ {
+ this( name, false, delegate );
+ }
+
+ public NamedThreadFactory( String name, boolean daemon, ThreadFactory delegate )
+ {
+ this.name = name;
+ this.daemon = daemon;
+ this.delegate = delegate;
+ }
+
+ public Thread newThread( Runnable r )
+ {
+ Thread result = delegate.newThread(r);
+ String s = result.getName();
+ result.setName( name + "[" + s + "]" );
+ result.setDaemon(daemon);
+ return result;
+ }
+}
+
diff --git a/engine/src/networking/com/jme3/network/kernel/package.html b/engine/src/networking/com/jme3/network/kernel/package.html
new file mode 100644
index 0000000..7029c95
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/package.html
@@ -0,0 +1,34 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+
+<head>
+<title></title>
+<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+</head>
+
+<body>
+The kernel package is the heart of the JME networking module
+and controls the routing and dispatch of message data over
+different transport implementations. Most users will never
+have to deal with these classes unless they are writing their own
+client and server implementations that diverge from the standard
+classes that are provided.
+
+<p>{@link com.jme3.network.kernel.Kernel} defines the core of a server-side message
+broker that abstracts away the specific transport and underlying
+threading model used. For example, it might use NIO selectors
+in a single threaded model or straight multithreaded socket
+model. Or it might implement SSL connections. Once created,
+{@link com.jme3.network.kernel.Kernel} users don't need to care about the details.</p>
+
+<p>{@link com.jme3.network.kernel.Endpoint} is a managed connection within a
+{@link com.jme3.network.kernel.Kernel} providing kernel to client connectivity.</p>
+
+<p>{@link com.jme3.network.kernel.Connector} defines the basic client-side message sender
+and these objects are typically used to connect to a {@link com.jme3.network.kernel.Kernel}
+though they can connect to any network port that supports the implementation's
+protocol. Implementations are provided for straight TCP and UDP communication
+and could be extended to support SSL or different threading models.</p>
+
+</body>
+</html>
diff --git a/engine/src/networking/com/jme3/network/kernel/tcp/NioEndpoint.java b/engine/src/networking/com/jme3/network/kernel/tcp/NioEndpoint.java
new file mode 100644
index 0000000..87b721f
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/tcp/NioEndpoint.java
@@ -0,0 +1,181 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel.tcp;
+
+import com.jme3.network.kernel.Endpoint;
+import com.jme3.network.kernel.Kernel;
+import com.jme3.network.kernel.KernelException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+
+/**
+ * Endpoint implementation that encapsulates the
+ * channel IO based connection information and keeps
+ * track of the outbound data queue for the channel.
+ *
+ * @version $Revision: 8944 $
+ * @author Paul Speed
+ */
+public class NioEndpoint implements Endpoint
+{
+ protected static final ByteBuffer CLOSE_MARKER = ByteBuffer.allocate(0);
+
+ private long id;
+ private SocketChannel socket;
+ private SelectorKernel kernel;
+ private ConcurrentLinkedQueue<ByteBuffer> outbound = new ConcurrentLinkedQueue<ByteBuffer>();
+ private boolean closing = false;
+
+ public NioEndpoint( SelectorKernel kernel, long id, SocketChannel socket )
+ {
+ this.id = id;
+ this.socket = socket;
+ this.kernel = kernel;
+ }
+
+ public Kernel getKernel()
+ {
+ return kernel;
+ }
+
+ public void close()
+ {
+ close(false);
+ }
+
+ public void close( boolean flushData )
+ {
+ if( flushData ) {
+ closing = true;
+
+ // Enqueue a close marker message to let the server
+ // know we should close
+ send( CLOSE_MARKER, false, true );
+
+ return;
+ }
+
+ try {
+ // Note: even though we may be disconnected from the socket.isConnected()
+ // standpoint, it's still safest to tell the kernel so that it can be sure
+ // to stop managing us gracefully.
+ kernel.closeEndpoint(this);
+ } catch( IOException e ) {
+ throw new KernelException( "Error closing endpoint for socket:" + socket, e );
+ }
+ }
+
+ public long getId()
+ {
+ return id;
+ }
+
+ public String getAddress()
+ {
+ return String.valueOf(socket.socket().getRemoteSocketAddress());
+ }
+
+ public boolean isConnected()
+ {
+ return socket.isConnected();
+ }
+
+ /**
+ * The wakeup option is used internally when the kernel is
+ * broadcasting out to a bunch of endpoints and doesn't want to
+ * necessarily wakeup right away.
+ */
+ protected void send( ByteBuffer data, boolean copy, boolean wakeup )
+ {
+ // We create a ByteBuffer per endpoint since we
+ // use it to track the data sent to each endpoint
+ // separately.
+ ByteBuffer buffer;
+ if( !copy ) {
+ buffer = data;
+ } else {
+ // Copy the buffer
+ buffer = ByteBuffer.allocate(data.remaining());
+ buffer.put(data);
+ buffer.flip();
+ }
+
+ // Queue it up
+ outbound.add(buffer);
+
+ if( wakeup )
+ kernel.wakeupSelector();
+ }
+
+ /**
+ * Called by the SelectorKernel to get the current top
+ * buffer for writing.
+ */
+ protected ByteBuffer peekPending()
+ {
+ return outbound.peek();
+ }
+
+ /**
+ * Called by the SelectorKernel when the top buffer
+ * has been exhausted.
+ */
+ protected ByteBuffer removePending()
+ {
+ return outbound.poll();
+ }
+
+ protected boolean hasPending()
+ {
+ return !outbound.isEmpty();
+ }
+
+ public void send( ByteBuffer data )
+ {
+ if( data == null ) {
+ throw new IllegalArgumentException( "Data cannot be null." );
+ }
+ if( closing ) {
+ throw new KernelException( "Endpoint has been closed:" + socket );
+ }
+ send( data, true, true );
+ }
+
+ public String toString()
+ {
+ return "NioEndpoint[" + id + ", " + socket + "]";
+ }
+}
diff --git a/engine/src/networking/com/jme3/network/kernel/tcp/SelectorKernel.java b/engine/src/networking/com/jme3/network/kernel/tcp/SelectorKernel.java
new file mode 100644
index 0000000..5fa75f3
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/tcp/SelectorKernel.java
@@ -0,0 +1,472 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel.tcp;
+
+import com.jme3.network.Filter;
+import com.jme3.network.kernel.*;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+
+/**
+ * A Kernel implementation based on NIO selectors.
+ *
+ * @version $Revision: 8944 $
+ * @author Paul Speed
+ */
+public class SelectorKernel extends AbstractKernel
+{
+ static Logger log = Logger.getLogger(SelectorKernel.class.getName());
+
+ private InetSocketAddress address;
+ private SelectorThread thread;
+
+ private Map<Long,NioEndpoint> endpoints = new ConcurrentHashMap<Long,NioEndpoint>();
+
+ public SelectorKernel( InetAddress host, int port )
+ {
+ this( new InetSocketAddress(host, port) );
+ }
+
+ public SelectorKernel( int port ) throws IOException
+ {
+ this( new InetSocketAddress(port) );
+ }
+
+ public SelectorKernel( InetSocketAddress address )
+ {
+ this.address = address;
+ }
+
+ protected SelectorThread createSelectorThread()
+ {
+ return new SelectorThread();
+ }
+
+ public void initialize()
+ {
+ if( thread != null )
+ throw new IllegalStateException( "Kernel already initialized." );
+
+ thread = createSelectorThread();
+
+ try {
+ thread.connect();
+ thread.start();
+ } catch( IOException e ) {
+ throw new KernelException( "Error hosting:" + address, e );
+ }
+ }
+
+ public void terminate() throws InterruptedException
+ {
+ if( thread == null )
+ throw new IllegalStateException( "Kernel not initialized." );
+
+ try {
+ thread.close();
+ thread = null;
+ } catch( IOException e ) {
+ throw new KernelException( "Error closing host connection:" + address, e );
+ }
+ }
+
+ public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
+ boolean copy )
+ {
+ if( !reliable )
+ throw new UnsupportedOperationException( "Unreliable send not supported by this kernel." );
+
+ if( copy ) {
+ // Copy the data just once
+ byte[] temp = new byte[data.remaining()];
+ System.arraycopy(data.array(), data.position(), temp, 0, data.remaining());
+ data = ByteBuffer.wrap(temp);
+ }
+
+ // Hand it to all of the endpoints that match our routing
+ for( NioEndpoint p : endpoints.values() ) {
+ // Does it match the filter?
+ if( filter != null && !filter.apply(p) )
+ continue;
+
+ // Give it the data... but let each endpoint track their
+ // own completion over the shared array of bytes by
+ // duplicating it
+ p.send( data.duplicate(), false, false );
+ }
+
+ // Wake up the selector so it can reinitialize its
+ // state accordingly.
+ wakeupSelector();
+ }
+
+ protected NioEndpoint addEndpoint( SocketChannel c )
+ {
+ // Note: we purposely do NOT put the key in the endpoint.
+ // SelectionKeys are dangerous outside the selector thread
+ // and this is safer.
+ NioEndpoint p = new NioEndpoint( this, nextEndpointId(), c );
+
+ endpoints.put( p.getId(), p );
+
+ // Enqueue an endpoint event for the listeners
+ addEvent( EndpointEvent.createAdd( this, p ) );
+
+ return p;
+ }
+
+ protected void removeEndpoint( NioEndpoint p, SocketChannel c )
+ {
+ endpoints.remove( p.getId() );
+ log.log( Level.FINE, "Endpoints size:{0}", endpoints.size() );
+
+ // Enqueue an endpoint event for the listeners
+ addEvent( EndpointEvent.createRemove( this, p ) );
+
+ // If there are no pending messages then add one so that the
+ // kernel-user knows to wake up if it is only listening for
+ // envelopes.
+ if( !hasEnvelopes() ) {
+ // Note: this is not really a race condition. At worst, our
+ // event has already been handled by now and it does no harm
+ // to check again.
+ addEnvelope( EVENTS_PENDING );
+ }
+ }
+
+ /**
+ * Called by the endpoints when they need to be closed.
+ */
+ protected void closeEndpoint( NioEndpoint p ) throws IOException
+ {
+ //log.log( Level.INFO, "Closing endpoint:{0}.", p );
+
+ thread.cancel(p);
+ }
+
+ /**
+ * Used internally by the endpoints to wakeup the selector
+ * when they have data to send.
+ */
+ protected void wakeupSelector()
+ {
+ thread.wakeupSelector();
+ }
+
+ protected void newData( NioEndpoint p, SocketChannel c, ByteBuffer shared, int size )
+ {
+ // Note: if ever desirable, it would be possible to accumulate
+ // data per source channel and only 'finalize' it when
+ // asked for more envelopes then were ready. I just don't
+ // think it will be an issue in practice. The busier the
+ // server, the more the buffers will fill before we get to them.
+ // And if the server isn't busy, who cares if we chop things up
+ // smaller... the network is still likely to deliver things in
+ // bulk anyway.
+
+ // Must copy the shared data before we use it
+ byte[] dataCopy = new byte[size];
+ System.arraycopy(shared.array(), 0, dataCopy, 0, size);
+
+ Envelope env = new Envelope( p, dataCopy, true );
+ addEnvelope( env );
+ }
+
+ /**
+ * This class is purposely tucked neatly away because
+ * messing with the selector from other threads for any
+ * reason is very bad. This is the safest architecture.
+ */
+ protected class SelectorThread extends Thread
+ {
+ private ServerSocketChannel serverChannel;
+ private Selector selector;
+ private AtomicBoolean go = new AtomicBoolean(true);
+ private ByteBuffer working = ByteBuffer.allocate( 8192 );
+
+ /**
+ * Because we want to keep the keys to ourselves, we'll do
+ * the endpoint -> key mapping internally.
+ */
+ private Map<NioEndpoint,SelectionKey> endpointKeys = new ConcurrentHashMap<NioEndpoint,SelectionKey>();
+
+ public SelectorThread()
+ {
+ setName( "Selector@" + address );
+ setDaemon(true);
+ }
+
+ public void connect() throws IOException
+ {
+ // Create a new selector
+ this.selector = SelectorProvider.provider().openSelector();
+
+ // Create a new non-blocking server socket channel
+ this.serverChannel = ServerSocketChannel.open();
+ serverChannel.configureBlocking(false);
+
+ // Bind the server socket to the specified address and port
+ serverChannel.socket().bind(address);
+
+ // Register the server socket channel, indicating an interest in
+ // accepting new connections
+ serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+
+ log.log( Level.INFO, "Hosting TCP connection:{0}.", address );
+ }
+
+ public void close() throws IOException, InterruptedException
+ {
+ // Set the thread to stop
+ go.set(false);
+
+ // Make sure the channel is closed
+ serverChannel.close();
+
+ // Force the selector to stop blocking
+ wakeupSelector();
+
+ // And wait for it
+ join();
+ }
+
+ protected void wakeupSelector()
+ {
+ selector.wakeup();
+ }
+
+ protected void setupSelectorOptions()
+ {
+ // For now, selection keys will either be in OP_READ
+ // or OP_WRITE. So while we are writing a buffer, we
+ // will not be reading. This is way simpler and less
+ // error prone... it can always be changed when everything
+ // else works if we are looking to micro-optimize.
+
+ // Setup options based on the current state of
+ // the endpoints. This could potentially be more
+ // efficiently done as change requests... or simply
+ // keeping a thread-safe set of endpoints with pending
+ // writes. For most cases, it shouldn't matter.
+ for( Map.Entry<NioEndpoint,SelectionKey> e : endpointKeys.entrySet() ) {
+ if( e.getKey().hasPending() ) {
+ e.getValue().interestOps(SelectionKey.OP_WRITE);
+ }
+ }
+ }
+
+ protected void accept( SelectionKey key ) throws IOException
+ {
+ // Would only get accepts on a server channel
+ ServerSocketChannel serverChan = (ServerSocketChannel)key.channel();
+
+ // Setup the connection to be non-blocking
+ SocketChannel remoteChan = serverChan.accept();
+ remoteChan.configureBlocking(false);
+
+ // And disable Nagle's buffering algorithm... we want
+ // data to go when we put it there.
+ Socket sock = remoteChan.socket();
+ sock.setTcpNoDelay(true);
+
+ // Let the selector know we're interested in reading
+ // data from the channel
+ SelectionKey endKey = remoteChan.register( selector, SelectionKey.OP_READ );
+
+ // And now create a new endpoint
+ NioEndpoint p = addEndpoint( remoteChan );
+ endKey.attach(p);
+ endpointKeys.put(p, endKey);
+ }
+
+ protected void cancel( NioEndpoint p ) throws IOException
+ {
+ SelectionKey key = endpointKeys.remove(p);
+ if( key == null ) {
+ //log.log( Level.INFO, "Endpoint already closed:{0}.", p );
+ return; // already closed it
+ }
+ log.log( Level.FINE, "Endpoint keys size:{0}", endpointKeys.size() );
+
+ log.log( Level.INFO, "Closing endpoint:{0}.", p );
+ SocketChannel c = (SocketChannel)key.channel();
+
+ // Note: key.cancel() is specifically thread safe. One of
+ // the few things one can do with a key from another
+ // thread.
+ key.cancel();
+ c.close();
+ removeEndpoint( p, c );
+ }
+
+ protected void cancel( SelectionKey key, SocketChannel c ) throws IOException
+ {
+ NioEndpoint p = (NioEndpoint)key.attachment();
+ log.log( Level.INFO, "Closing channel endpoint:{0}.", p );
+ Object o = endpointKeys.remove(p);
+
+ log.log( Level.FINE, "Endpoint keys size:{0}", endpointKeys.size() );
+
+ key.cancel();
+ c.close();
+ removeEndpoint( p, c );
+ }
+
+ protected void read( SelectionKey key ) throws IOException
+ {
+ NioEndpoint p = (NioEndpoint)key.attachment();
+ SocketChannel c = (SocketChannel)key.channel();
+ working.clear();
+
+ int size;
+ try {
+ size = c.read(working);
+ } catch( IOException e ) {
+ // The remove end forcibly closed the connection...
+ // close out our end and cancel the key
+ cancel( key, c );
+ return;
+ }
+
+ if( size == -1 ) {
+ // The remote end shut down cleanly...
+ // close out our end and cancel the key
+ cancel( key, c );
+ return;
+ }
+
+ newData( p, c, working, size );
+ }
+
+ protected void write( SelectionKey key ) throws IOException
+ {
+ NioEndpoint p = (NioEndpoint)key.attachment();
+ SocketChannel c = (SocketChannel)key.channel();
+
+ // We will send what we can and move on.
+ ByteBuffer current = p.peekPending();
+ if( current == NioEndpoint.CLOSE_MARKER ) {
+ // This connection wants to be closed now
+ closeEndpoint(p);
+
+ // Nothing more to do
+ return;
+ }
+
+ c.write( current );
+
+ // If we wrote all of that packet then we need to remove it
+ if( current.remaining() == 0 ) {
+ p.removePending();
+ }
+
+ // If we happened to empty the pending queue then let's read
+ // again.
+ if( !p.hasPending() ) {
+ key.interestOps( SelectionKey.OP_READ );
+ }
+ }
+
+ protected void select() throws IOException
+ {
+ selector.select();
+
+ for( Iterator<SelectionKey> i = selector.selectedKeys().iterator(); i.hasNext(); ) {
+ SelectionKey key = i.next();
+ i.remove();
+
+ if( !key.isValid() )
+ {
+ // When does this happen?
+ log.log( Level.INFO, "Key is not valid:{0}.", key );
+ continue;
+ }
+
+ try {
+ if( key.isAcceptable() )
+ accept(key);
+ else if( key.isWritable() )
+ write(key);
+ else if( key.isReadable() )
+ read(key);
+ } catch( IOException e ) {
+ if( !go.get() )
+ return; // error likely due to shutting down
+ reportError( e );
+
+ // And at this level, errors likely mean the key is now
+ // dead and it doesn't hurt to kick them anyway. If we
+ // find IOExceptions that are not fatal, this can be
+ // readdressed
+ cancel( key, (SocketChannel)key.channel() );
+ }
+ }
+ }
+
+ public void run()
+ {
+ log.log( Level.INFO, "Kernel started for connection:{0}.", address );
+
+ // An atomic is safest and costs almost nothing
+ while( go.get() ) {
+ // Setup any queued option changes
+ setupSelectorOptions();
+
+ // Check for available keys and process them
+ try {
+ select();
+ } catch( ClosedSelectorException e ) {
+ if( !go.get() )
+ return; // it's because we're shutting down
+ throw new KernelException( "Premature selector closing", e );
+ } catch( IOException e ) {
+ if( !go.get() )
+ return; // error likely due to shutting down
+ reportError( e );
+ }
+ }
+ }
+ }
+}
diff --git a/engine/src/networking/com/jme3/network/kernel/tcp/SocketConnector.java b/engine/src/networking/com/jme3/network/kernel/tcp/SocketConnector.java
new file mode 100644
index 0000000..66d2040
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/tcp/SocketConnector.java
@@ -0,0 +1,150 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel.tcp;
+
+import com.jme3.network.kernel.Connector;
+import com.jme3.network.kernel.ConnectorException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.InetAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * A straight forward socket-based connector implementation that
+ * does not use any separate threading. It relies completely on
+ * the buffering in the OS network layer.
+ *
+ * @version $Revision: 8843 $
+ * @author Paul Speed
+ */
+public class SocketConnector implements Connector
+{
+ private Socket sock;
+ private InputStream in;
+ private OutputStream out;
+ private SocketAddress remoteAddress;
+ private byte[] buffer = new byte[65535];
+ private AtomicBoolean connected = new AtomicBoolean(false);
+
+ public SocketConnector( InetAddress address, int port ) throws IOException
+ {
+ this.sock = new Socket(address, port);
+ remoteAddress = sock.getRemoteSocketAddress(); // for info purposes
+
+ // Disable Nagle's buffering so data goes out when we
+ // put it there.
+ sock.setTcpNoDelay(true);
+
+ in = sock.getInputStream();
+ out = sock.getOutputStream();
+
+ connected.set(true);
+ }
+
+ protected void checkClosed()
+ {
+ if( sock == null )
+ throw new ConnectorException( "Connection is closed:" + remoteAddress );
+ }
+
+ public boolean isConnected()
+ {
+ if( sock == null )
+ return false;
+ return sock.isConnected();
+ }
+
+ public void close()
+ {
+ checkClosed();
+ try {
+ Socket temp = sock;
+ sock = null;
+ connected.set(false);
+ temp.close();
+ } catch( IOException e ) {
+ throw new ConnectorException( "Error closing socket for:" + remoteAddress, e );
+ }
+ }
+
+ public boolean available()
+ {
+ checkClosed();
+ try {
+ return in.available() > 0;
+ } catch( IOException e ) {
+ throw new ConnectorException( "Error retrieving data availability for:" + remoteAddress, e );
+ }
+ }
+
+ public ByteBuffer read()
+ {
+ checkClosed();
+
+ try {
+ // Read what we can
+ int count = in.read(buffer);
+ if( count < 0 ) {
+ // Socket is closed
+ close();
+ return null;
+ }
+
+ // Wrap it in a ByteBuffer for the caller
+ return ByteBuffer.wrap( buffer, 0, count );
+ } catch( IOException e ) {
+ if( !connected.get() ) {
+ // Nothing to see here... just move along
+ return null;
+ }
+ throw new ConnectorException( "Error reading from connection to:" + remoteAddress, e );
+ }
+ }
+
+ public void write( ByteBuffer data )
+ {
+ checkClosed();
+
+ try {
+ out.write(data.array(), data.position(), data.remaining());
+ } catch( IOException e ) {
+ throw new ConnectorException( "Error writing to connection:" + remoteAddress, e );
+ }
+ }
+
+}
diff --git a/engine/src/networking/com/jme3/network/kernel/udp/UdpConnector.java b/engine/src/networking/com/jme3/network/kernel/udp/UdpConnector.java
new file mode 100644
index 0000000..0193e1c
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/udp/UdpConnector.java
@@ -0,0 +1,146 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel.udp;
+
+import com.jme3.network.kernel.Connector;
+import com.jme3.network.kernel.ConnectorException;
+import java.io.IOException;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+
+/**
+ * A straight forward datagram socket-based UDP connector
+ * implementation.
+ *
+ * @version $Revision: 8843 $
+ * @author Paul Speed
+ */
+public class UdpConnector implements Connector
+{
+ private DatagramSocket sock = new DatagramSocket();
+ private SocketAddress remoteAddress;
+ private byte[] buffer = new byte[65535];
+ private AtomicBoolean connected = new AtomicBoolean(false);
+
+ /**
+ * In order to provide proper available() checking, we
+ * potentially queue one datagram.
+ */
+ private DatagramPacket pending;
+
+ /**
+ * Creates a new UDP connection that send datagrams to the
+ * specified address and port.
+ */
+ public UdpConnector( InetAddress remote, int remotePort ) throws IOException
+ {
+ InetSocketAddress localSocketAddress = new InetSocketAddress(0);
+ this.sock = new DatagramSocket( localSocketAddress );
+ remoteAddress = new InetSocketAddress( remote, remotePort );
+
+ // Setup to receive only from the remote address
+ sock.connect( remoteAddress );
+
+ connected.set(true);
+ }
+
+ protected void checkClosed()
+ {
+ if( sock == null )
+ throw new ConnectorException( "Connection is closed:" + remoteAddress );
+ }
+
+ public boolean isConnected()
+ {
+ if( sock == null )
+ return false;
+ return sock.isConnected();
+ }
+
+ public void close()
+ {
+ checkClosed();
+ DatagramSocket temp = sock;
+ sock = null;
+ connected.set(false);
+ temp.close();
+ }
+
+ /**
+ * This always returns false since the simple DatagramSocket usage
+ * cannot be run in a non-blocking way.
+ */
+ public boolean available()
+ {
+ // It would take a separate thread or an NIO Selector based implementation to get this
+ // to work. If a polling strategy is never employed by callers then it doesn't
+ // seem worth it to implement all of that just for this method.
+ checkClosed();
+ return false;
+ }
+
+ public ByteBuffer read()
+ {
+ checkClosed();
+
+ try {
+ DatagramPacket packet = new DatagramPacket( buffer, buffer.length );
+ sock.receive(packet);
+
+ // Wrap it in a ByteBuffer for the caller
+ return ByteBuffer.wrap( buffer, 0, packet.getLength() );
+ } catch( IOException e ) {
+ if( !connected.get() ) {
+ // Nothing to see here... just move along
+ return null;
+ }
+ throw new ConnectorException( "Error reading from connection to:" + remoteAddress, e );
+ }
+ }
+
+ public void write( ByteBuffer data )
+ {
+ checkClosed();
+
+ try {
+ DatagramPacket p = new DatagramPacket( data.array(), data.position(), data.remaining(),
+ remoteAddress );
+ sock.send(p);
+ } catch( IOException e ) {
+ throw new ConnectorException( "Error writing to connection:" + remoteAddress, e );
+ }
+ }
+}
+
diff --git a/engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java b/engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java
new file mode 100644
index 0000000..4a98387
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/udp/UdpEndpoint.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel.udp;
+
+import com.jme3.network.kernel.Endpoint;
+import com.jme3.network.kernel.Kernel;
+import com.jme3.network.kernel.KernelException;
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+
+
+/**
+ * Endpoint implementation that encapsulates the
+ * UDP connection information for return messaging,
+ * identification of envelope sources, etc.
+ *
+ * @version $Revision: 8843 $
+ * @author Paul Speed
+ */
+public class UdpEndpoint implements Endpoint
+{
+ private long id;
+ private SocketAddress address;
+ private DatagramSocket socket;
+ private UdpKernel kernel;
+ private boolean connected = true; // it's connectionless but we track logical state
+
+ public UdpEndpoint( UdpKernel kernel, long id, SocketAddress address, DatagramSocket socket )
+ {
+ this.id = id;
+ this.address = address;
+ this.socket = socket;
+ this.kernel = kernel;
+ }
+
+ public Kernel getKernel()
+ {
+ return kernel;
+ }
+
+ protected SocketAddress getRemoteAddress()
+ {
+ return address;
+ }
+
+ public void close()
+ {
+ close( false );
+ }
+
+ public void close( boolean flush )
+ {
+ // No real reason to flush UDP traffic yet... especially
+ // when considering that the outbound UDP isn't even
+ // queued.
+
+ try {
+ kernel.closeEndpoint(this);
+ connected = false;
+ } catch( IOException e ) {
+ throw new KernelException( "Error closing endpoint for socket:" + socket, e );
+ }
+ }
+
+ public long getId()
+ {
+ return id;
+ }
+
+ public String getAddress()
+ {
+ return String.valueOf(address);
+ }
+
+ public boolean isConnected()
+ {
+ // The socket is always unconnected anyway so we track our
+ // own logical state for the kernel's benefit.
+ return connected;
+ }
+
+ public void send( ByteBuffer data )
+ {
+ if( !isConnected() ) {
+ throw new KernelException( "Endpoint is not connected:" + this );
+ }
+
+
+ try {
+ DatagramPacket p = new DatagramPacket( data.array(), data.position(),
+ data.remaining(), address );
+
+ // Just queue it up for the kernel threads to write
+ // out
+ kernel.enqueueWrite( this, p );
+
+ //socket.send(p);
+ } catch( IOException e ) {
+ throw new KernelException( "Error sending datagram to:" + address, e );
+ }
+ }
+
+ public String toString()
+ {
+ return "UdpEndpoint[" + id + ", " + address + "]";
+ }
+}
diff --git a/engine/src/networking/com/jme3/network/kernel/udp/UdpKernel.java b/engine/src/networking/com/jme3/network/kernel/udp/UdpKernel.java
new file mode 100644
index 0000000..c0a0f88
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/kernel/udp/UdpKernel.java
@@ -0,0 +1,294 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.kernel.udp;
+
+import com.jme3.network.Filter;
+import com.jme3.network.kernel.*;
+import java.io.IOException;
+import java.net.*;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A Kernel implementation using UDP packets.
+ *
+ * @version $Revision: 8944 $
+ * @author Paul Speed
+ */
+public class UdpKernel extends AbstractKernel
+{
+ static Logger log = Logger.getLogger(UdpKernel.class.getName());
+
+ private InetSocketAddress address;
+ private HostThread thread;
+
+ private ExecutorService writer;
+
+ // The nature of UDP means that even through a firewall,
+ // a user would have to have a unique address+port since UDP
+ // can't really be NAT'ed.
+ private Map<SocketAddress,UdpEndpoint> socketEndpoints = new ConcurrentHashMap<SocketAddress,UdpEndpoint>();
+
+ public UdpKernel( InetAddress host, int port )
+ {
+ this( new InetSocketAddress(host, port) );
+ }
+
+ public UdpKernel( int port ) throws IOException
+ {
+ this( new InetSocketAddress(port) );
+ }
+
+ public UdpKernel( InetSocketAddress address )
+ {
+ this.address = address;
+ }
+
+ protected HostThread createHostThread()
+ {
+ return new HostThread();
+ }
+
+ public void initialize()
+ {
+ if( thread != null )
+ throw new IllegalStateException( "Kernel already initialized." );
+
+ writer = Executors.newFixedThreadPool(2, new NamedThreadFactory(toString() + "-writer"));
+
+ thread = createHostThread();
+
+ try {
+ thread.connect();
+ thread.start();
+ } catch( IOException e ) {
+ throw new KernelException( "Error hosting:" + address, e );
+ }
+ }
+
+ public void terminate() throws InterruptedException
+ {
+ if( thread == null )
+ throw new IllegalStateException( "Kernel not initialized." );
+
+ try {
+ thread.close();
+ writer.shutdown();
+ thread = null;
+ } catch( IOException e ) {
+ throw new KernelException( "Error closing host connection:" + address, e );
+ }
+ }
+
+ /**
+ * Dispatches the data to all endpoints managed by the
+ * kernel. 'routing' is currently ignored.
+ */
+ public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
+ boolean copy )
+ {
+ if( reliable )
+ throw new UnsupportedOperationException( "Reliable send not supported by this kernel." );
+
+ if( copy ) {
+ // Copy the data just once
+ byte[] temp = new byte[data.remaining()];
+ System.arraycopy(data.array(), data.position(), temp, 0, data.remaining());
+ data = ByteBuffer.wrap(temp);
+ }
+
+ // Hand it to all of the endpoints that match our routing
+ for( UdpEndpoint p : socketEndpoints.values() ) {
+ // Does it match the filter?
+ if( filter != null && !filter.apply(p) )
+ continue;
+
+ // Send the data
+ p.send( data );
+ }
+ }
+
+ protected Endpoint getEndpoint( SocketAddress address, boolean create )
+ {
+ UdpEndpoint p = socketEndpoints.get(address);
+ if( p == null && create ) {
+ p = new UdpEndpoint( this, nextEndpointId(), address, thread.getSocket() );
+ socketEndpoints.put( address, p );
+
+ // Add an event for it.
+ addEvent( EndpointEvent.createAdd( this, p ) );
+ }
+ return p;
+ }
+
+ /**
+ * Called by the endpoints when they need to be closed.
+ */
+ protected void closeEndpoint( UdpEndpoint p ) throws IOException
+ {
+ // Just book-keeping to do here.
+ if( socketEndpoints.remove( p.getRemoteAddress() ) == null )
+ return;
+
+ log.log( Level.INFO, "Closing endpoint:{0}.", p );
+ log.log( Level.FINE, "Socket endpoints size:{0}", socketEndpoints.size() );
+
+ addEvent( EndpointEvent.createRemove( this, p ) );
+
+ // If there are no pending messages then add one so that the
+ // kernel-user knows to wake up if it is only listening for
+ // envelopes.
+ if( !hasEnvelopes() ) {
+ // Note: this is not really a race condition. At worst, our
+ // event has already been handled by now and it does no harm
+ // to check again.
+ addEnvelope( EVENTS_PENDING );
+ }
+ }
+
+ protected void newData( DatagramPacket packet )
+ {
+ // So the tricky part here is figuring out the endpoint and
+ // whether it's new or not. In these UDP schemes, firewalls have
+ // to be ported back to a specific machine so we will consider
+ // the address + port (ie: SocketAddress) the defacto unique
+ // ID.
+ Endpoint p = getEndpoint( packet.getSocketAddress(), true );
+
+ // We'll copy the data to trim it.
+ byte[] data = new byte[packet.getLength()];
+ System.arraycopy(packet.getData(), 0, data, 0, data.length);
+
+ Envelope env = new Envelope( p, data, false );
+ addEnvelope( env );
+ }
+
+ protected void enqueueWrite( Endpoint endpoint, DatagramPacket packet )
+ {
+ writer.execute( new MessageWriter(endpoint, packet) );
+ }
+
+ protected class MessageWriter implements Runnable
+ {
+ private Endpoint endpoint;
+ private DatagramPacket packet;
+
+ public MessageWriter( Endpoint endpoint, DatagramPacket packet )
+ {
+ this.endpoint = endpoint;
+ this.packet = packet;
+ }
+
+ public void run()
+ {
+ // Not guaranteed to always work but an extra datagram
+ // to a dead connection isn't so big of a deal.
+ if( !endpoint.isConnected() ) {
+ return;
+ }
+
+ try {
+ thread.getSocket().send(packet);
+ } catch( Exception e ) {
+ KernelException exc = new KernelException( "Error sending datagram to:" + address, e );
+ exc.fillInStackTrace();
+ reportError(exc);
+ }
+ }
+ }
+
+ protected class HostThread extends Thread
+ {
+ private DatagramSocket socket;
+ private AtomicBoolean go = new AtomicBoolean(true);
+
+ private byte[] buffer = new byte[65535]; // slightly bigger than needed.
+
+ public HostThread()
+ {
+ setName( "UDP Host@" + address );
+ setDaemon(true);
+ }
+
+ protected DatagramSocket getSocket()
+ {
+ return socket;
+ }
+
+ public void connect() throws IOException
+ {
+ socket = new DatagramSocket( address );
+ log.log( Level.INFO, "Hosting UDP connection:{0}.", address );
+ }
+
+ public void close() throws IOException, InterruptedException
+ {
+ // Set the thread to stop
+ go.set(false);
+
+ // Make sure the channel is closed
+ socket.close();
+
+ // And wait for it
+ join();
+ }
+
+ public void run()
+ {
+ log.log( Level.INFO, "Kernel started for connection:{0}.", address );
+
+ // An atomic is safest and costs almost nothing
+ while( go.get() ) {
+ try {
+ // Could reuse the packet but I don't see the
+ // point and it may lead to subtle bugs if not properly
+ // reset.
+ DatagramPacket packet = new DatagramPacket( buffer, buffer.length );
+ socket.receive(packet);
+
+ newData( packet );
+ } catch( IOException e ) {
+ if( !go.get() )
+ return;
+ reportError( e );
+ }
+ }
+ }
+ }
+}