aboutsummaryrefslogtreecommitdiff
path: root/engine/src/networking/com/jme3/network/base
diff options
context:
space:
mode:
Diffstat (limited to 'engine/src/networking/com/jme3/network/base')
-rw-r--r--engine/src/networking/com/jme3/network/base/ConnectorAdapter.java218
-rw-r--r--engine/src/networking/com/jme3/network/base/ConnectorFactory.java48
-rw-r--r--engine/src/networking/com/jme3/network/base/DefaultClient.java428
-rw-r--r--engine/src/networking/com/jme3/network/base/DefaultServer.java591
-rw-r--r--engine/src/networking/com/jme3/network/base/KernelAdapter.java296
-rw-r--r--engine/src/networking/com/jme3/network/base/KernelFactory.java51
-rw-r--r--engine/src/networking/com/jme3/network/base/MessageListenerRegistry.java120
-rw-r--r--engine/src/networking/com/jme3/network/base/MessageProtocol.java191
-rw-r--r--engine/src/networking/com/jme3/network/base/NioKernelFactory.java53
-rw-r--r--engine/src/networking/com/jme3/network/base/TcpConnectorFactory.java60
-rw-r--r--engine/src/networking/com/jme3/network/base/package.html12
11 files changed, 2068 insertions, 0 deletions
diff --git a/engine/src/networking/com/jme3/network/base/ConnectorAdapter.java b/engine/src/networking/com/jme3/network/base/ConnectorAdapter.java
new file mode 100644
index 0000000..3827786
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/base/ConnectorAdapter.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.base;
+
+import com.jme3.network.ErrorListener;
+import com.jme3.network.Message;
+import com.jme3.network.MessageListener;
+import com.jme3.network.kernel.Connector;
+import com.jme3.network.kernel.ConnectorException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Wraps a single Connector and forwards new messages
+ * to the supplied message dispatcher. This is used
+ * by DefaultClient to manage its connector objects.
+ * This is only responsible for message reading and provides
+ * no support for buffering writes.
+ *
+ * <p>This adapter assumes a simple protocol where two
+ * bytes define a (short) object size with the object data
+ * to follow. Note: this limits the size of serialized
+ * objects to 32676 bytes... even though, for example,
+ * datagram packets can hold twice that. :P</p>
+ *
+ * @version $Revision: 8944 $
+ * @author Paul Speed
+ */
+public class ConnectorAdapter extends Thread
+{
+ private static final int OUTBOUND_BACKLOG = 16000;
+
+ private Connector connector;
+ private MessageListener<Object> dispatcher;
+ private ErrorListener<Object> errorHandler;
+ private AtomicBoolean go = new AtomicBoolean(true);
+
+ private BlockingQueue<ByteBuffer> outbound;
+
+ // Writes messages out on a background thread
+ private WriterThread writer;
+
+ // Marks the messages as reliable or not if they came
+ // through this connector.
+ private boolean reliable;
+
+ public ConnectorAdapter( Connector connector, MessageListener<Object> dispatcher,
+ ErrorListener<Object> errorHandler, boolean reliable )
+ {
+ super( String.valueOf(connector) );
+ this.connector = connector;
+ this.dispatcher = dispatcher;
+ this.errorHandler = errorHandler;
+ this.reliable = reliable;
+ setDaemon(true);
+
+ // The backlog makes sure that the outbound channel blocks once
+ // a certain backlog level is reached. It is set high so that it
+ // is only reached in the worst cases... which are usually things like
+ // raw throughput tests. Technically, a saturated TCP channel could
+ // back up quite a bit if the buffers are full and the socket has
+ // stalled but 16,000 messages is still a big backlog.
+ outbound = new ArrayBlockingQueue<ByteBuffer>(OUTBOUND_BACKLOG);
+
+ // Note: this technically adds a potential deadlock case
+ // with the above code where there wasn't one before. For example,
+ // if a TCP outbound queue fills to capacity and a client sends
+ // in such a way that they block TCP message handling then if the HostedConnection
+ // on the server is similarly blocked then the TCP network buffers may
+ // all get full and no outbound messages move and we forever block
+ // on the queue.
+ // However, in practice this can't really happen... or at least it's
+ // the sign of other really bad things.
+ // First, currently the server-side outbound queues are all unbounded and
+ // so won't ever block the handling of messages if the outbound channel is full.
+ // Second, there would have to be a huge amount of data backlog for this
+ // to ever occur anyway.
+ // Third, it's a sign of a really poor architecture if 16,000 messages
+ // can go out in a way that blocks reads.
+
+ writer = new WriterThread();
+ writer.start();
+ }
+
+ public void close()
+ {
+ go.set(false);
+
+ // Kill the writer service
+ writer.shutdown();
+
+ if( connector.isConnected() )
+ {
+ // Kill the connector
+ connector.close();
+ }
+ }
+
+ protected void dispatch( Message m )
+ {
+ dispatcher.messageReceived( null, m );
+ }
+
+ public void write( ByteBuffer data )
+ {
+ try {
+ outbound.put( data );
+ } catch( InterruptedException e ) {
+ throw new RuntimeException( "Interrupted while waiting for queue to drain", e );
+ }
+ }
+
+ protected void handleError( Exception e )
+ {
+ if( !go.get() )
+ return;
+
+ errorHandler.handleError( this, e );
+ }
+
+ public void run()
+ {
+ MessageProtocol protocol = new MessageProtocol();
+
+ try {
+ while( go.get() ) {
+ ByteBuffer buffer = connector.read();
+ if( buffer == null ) {
+ if( go.get() ) {
+ throw new ConnectorException( "Connector closed." );
+ } else {
+ // Just dump out because a null buffer is expected
+ // from a closed/closing connector
+ break;
+ }
+ }
+
+ protocol.addBuffer( buffer );
+
+ Message m = null;
+ while( (m = protocol.getMessage()) != null ) {
+ m.setReliable( reliable );
+ dispatch( m );
+ }
+ }
+ } catch( Exception e ) {
+ handleError( e );
+ }
+ }
+
+ protected class WriterThread extends Thread
+ {
+ public WriterThread()
+ {
+ super( String.valueOf(connector) + "-writer" );
+ }
+
+ public void shutdown()
+ {
+ interrupt();
+ }
+
+ private void write( ByteBuffer data )
+ {
+ try {
+ connector.write(data);
+ } catch( Exception e ) {
+ handleError( e );
+ }
+ }
+
+ public void run()
+ {
+ while( go.get() ) {
+ try {
+ ByteBuffer data = outbound.take();
+ write(data);
+ } catch( InterruptedException e ) {
+ if( !go.get() )
+ return;
+ throw new RuntimeException( "Interrupted waiting for data", e );
+ }
+ }
+ }
+ }
+}
diff --git a/engine/src/networking/com/jme3/network/base/ConnectorFactory.java b/engine/src/networking/com/jme3/network/base/ConnectorFactory.java
new file mode 100644
index 0000000..d9bb700
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/base/ConnectorFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.base;
+
+import com.jme3.network.kernel.Connector;
+import java.io.IOException;
+
+
+/**
+ * Creates Connectors for a specific host.
+ *
+ * @version $Revision: 8938 $
+ * @author Paul Speed
+ */
+public interface ConnectorFactory
+{
+ public Connector createConnector( int channel, int port ) throws IOException;
+}
diff --git a/engine/src/networking/com/jme3/network/base/DefaultClient.java b/engine/src/networking/com/jme3/network/base/DefaultClient.java
new file mode 100644
index 0000000..e03c3e4
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/base/DefaultClient.java
@@ -0,0 +1,428 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.base;
+
+import com.jme3.network.ClientStateListener.DisconnectInfo;
+import com.jme3.network.*;
+import com.jme3.network.kernel.Connector;
+import com.jme3.network.message.ChannelInfoMessage;
+import com.jme3.network.message.ClientRegistrationMessage;
+import com.jme3.network.message.DisconnectMessage;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A default implementation of the Client interface that delegates
+ * its network connectivity to a kernel.Connector.
+ *
+ * @version $Revision: 8938 $
+ * @author Paul Speed
+ */
+public class DefaultClient implements Client
+{
+ static Logger log = Logger.getLogger(DefaultClient.class.getName());
+
+ // First two channels are reserved for reliable and
+ // unreliable. Note: channels are endpoint specific so these
+ // constants and the handling need not have anything to do with
+ // the same constants in DefaultServer... which is why they are
+ // separate.
+ private static final int CH_RELIABLE = 0;
+ private static final int CH_UNRELIABLE = 1;
+ private static final int CH_FIRST = 2;
+
+ private ThreadLocal<ByteBuffer> dataBuffer = new ThreadLocal<ByteBuffer>();
+
+ private int id = -1;
+ private boolean isRunning = false;
+ private CountDownLatch connecting = new CountDownLatch(1);
+ private String gameName;
+ private int version;
+ private MessageListenerRegistry<Client> messageListeners = new MessageListenerRegistry<Client>();
+ private List<ClientStateListener> stateListeners = new CopyOnWriteArrayList<ClientStateListener>();
+ private List<ErrorListener<? super Client>> errorListeners = new CopyOnWriteArrayList<ErrorListener<? super Client>>();
+ private Redispatch dispatcher = new Redispatch();
+ private List<ConnectorAdapter> channels = new ArrayList<ConnectorAdapter>();
+
+ private ConnectorFactory connectorFactory;
+
+ public DefaultClient( String gameName, int version )
+ {
+ this.gameName = gameName;
+ this.version = version;
+ }
+
+ public DefaultClient( String gameName, int version, Connector reliable, Connector fast,
+ ConnectorFactory connectorFactory )
+ {
+ this( gameName, version );
+ setPrimaryConnectors( reliable, fast, connectorFactory );
+ }
+
+ protected void setPrimaryConnectors( Connector reliable, Connector fast, ConnectorFactory connectorFactory )
+ {
+ if( reliable == null )
+ throw new IllegalArgumentException( "The reliable connector cannot be null." );
+ if( isRunning )
+ throw new IllegalStateException( "Client is already started." );
+ if( !channels.isEmpty() )
+ throw new IllegalStateException( "Channels already exist." );
+
+ this.connectorFactory = connectorFactory;
+ channels.add(new ConnectorAdapter(reliable, dispatcher, dispatcher, true));
+ if( fast != null ) {
+ channels.add(new ConnectorAdapter(fast, dispatcher, dispatcher, false));
+ } else {
+ // Add the null adapter to keep the indexes right
+ channels.add(null);
+ }
+ }
+
+ protected void checkRunning()
+ {
+ if( !isRunning )
+ throw new IllegalStateException( "Client is not started." );
+ }
+
+ public void start()
+ {
+ if( isRunning )
+ throw new IllegalStateException( "Client is already started." );
+
+ // Start up the threads and stuff for the
+ // connectors that we have
+ for( ConnectorAdapter ca : channels ) {
+ if( ca == null )
+ continue;
+ ca.start();
+ }
+
+ // Send our connection message with a generated ID until
+ // we get one back from the server. We'll hash time in
+ // millis and time in nanos.
+ // This is used to match the TCP and UDP endpoints up on the
+ // other end since they may take different routes to get there.
+ // Behind NAT, many game clients may be coming over the same
+ // IP address from the server's perspective and they may have
+ // their UDP ports mapped all over the place.
+ //
+ // Since currentTimeMillis() is absolute time and nano time
+ // is roughtly related to system start time, adding these two
+ // together should be plenty unique for our purposes. It wouldn't
+ // hurt to reconcile with IP on the server side, though.
+ long tempId = System.currentTimeMillis() + System.nanoTime();
+
+ // Set it true here so we can send some messages.
+ isRunning = true;
+
+ ClientRegistrationMessage reg;
+ reg = new ClientRegistrationMessage();
+ reg.setId(tempId);
+ reg.setGameName(getGameName());
+ reg.setVersion(getVersion());
+ reg.setReliable(true);
+ send(CH_RELIABLE, reg, false);
+
+ // Send registration messages to any other configured
+ // connectors
+ reg = new ClientRegistrationMessage();
+ reg.setId(tempId);
+ reg.setReliable(false);
+ for( int ch = CH_UNRELIABLE; ch < channels.size(); ch++ ) {
+ if( channels.get(ch) == null )
+ continue;
+ send(ch, reg, false);
+ }
+ }
+
+ protected void waitForConnected()
+ {
+ if( isConnected() )
+ return;
+
+ try {
+ connecting.await();
+ } catch( InterruptedException e ) {
+ throw new RuntimeException( "Interrupted waiting for connect", e );
+ }
+ }
+
+ public boolean isConnected()
+ {
+ return id != -1 && isRunning;
+ }
+
+ public int getId()
+ {
+ return id;
+ }
+
+ public String getGameName()
+ {
+ return gameName;
+ }
+
+ public int getVersion()
+ {
+ return version;
+ }
+
+ public void send( Message message )
+ {
+ if( message.isReliable() || channels.get(CH_UNRELIABLE) == null ) {
+ send(CH_RELIABLE, message, true);
+ } else {
+ send(CH_UNRELIABLE, message, true);
+ }
+ }
+
+ public void send( int channel, Message message )
+ {
+ if( channel < 0 || channel + CH_FIRST >= channels.size() )
+ throw new IllegalArgumentException( "Channel is undefined:" + channel );
+ send( channel + CH_FIRST, message, true );
+ }
+
+ protected void send( int channel, Message message, boolean waitForConnected )
+ {
+ checkRunning();
+
+ if( waitForConnected ) {
+ // Make sure we aren't still connecting
+ waitForConnected();
+ }
+
+ ByteBuffer buffer = dataBuffer.get();
+ if( buffer == null ) {
+ buffer = ByteBuffer.allocate( 65536 + 2 );
+ dataBuffer.set(buffer);
+ }
+ buffer.clear();
+
+ // Convert the message to bytes
+ buffer = MessageProtocol.messageToBuffer(message, buffer);
+
+ // Since we share the buffer between invocations, we will need to
+ // copy this message's part out of it. This is because we actually
+ // do the send on a background thread.
+ byte[] temp = new byte[buffer.remaining()];
+ System.arraycopy(buffer.array(), buffer.position(), temp, 0, buffer.remaining());
+ buffer = ByteBuffer.wrap(temp);
+
+ channels.get(channel).write(buffer);
+ }
+
+ public void close()
+ {
+ checkRunning();
+
+ closeConnections( null );
+ }
+
+ protected void closeConnections( DisconnectInfo info )
+ {
+ if( !isRunning )
+ return;
+
+ // Send a close message
+
+ // Tell the thread it's ok to die
+ for( ConnectorAdapter ca : channels ) {
+ if( ca == null )
+ continue;
+ ca.close();
+ }
+
+ // Wait for the threads?
+
+ // Just in case we never fully connected
+ connecting.countDown();
+
+ fireDisconnected(info);
+
+ isRunning = false;
+ }
+
+ public void addClientStateListener( ClientStateListener listener )
+ {
+ stateListeners.add( listener );
+ }
+
+ public void removeClientStateListener( ClientStateListener listener )
+ {
+ stateListeners.remove( listener );
+ }
+
+ public void addMessageListener( MessageListener<? super Client> listener )
+ {
+ messageListeners.addMessageListener( listener );
+ }
+
+ public void addMessageListener( MessageListener<? super Client> listener, Class... classes )
+ {
+ messageListeners.addMessageListener( listener, classes );
+ }
+
+ public void removeMessageListener( MessageListener<? super Client> listener )
+ {
+ messageListeners.removeMessageListener( listener );
+ }
+
+ public void removeMessageListener( MessageListener<? super Client> listener, Class... classes )
+ {
+ messageListeners.removeMessageListener( listener, classes );
+ }
+
+ public void addErrorListener( ErrorListener<? super Client> listener )
+ {
+ errorListeners.add( listener );
+ }
+
+ public void removeErrorListener( ErrorListener<? super Client> listener )
+ {
+ errorListeners.remove( listener );
+ }
+
+ protected void fireConnected()
+ {
+ for( ClientStateListener l : stateListeners ) {
+ l.clientConnected( this );
+ }
+ }
+
+ protected void fireDisconnected( DisconnectInfo info )
+ {
+ for( ClientStateListener l : stateListeners ) {
+ l.clientDisconnected( this, info );
+ }
+ }
+
+ /**
+ * Either calls the ErrorListener or closes the connection
+ * if there are no listeners.
+ */
+ protected void handleError( Throwable t )
+ {
+ // If there are no listeners then close the connection with
+ // a reason
+ if( errorListeners.isEmpty() ) {
+ log.log( Level.SEVERE, "Termining connection due to unhandled error", t );
+ DisconnectInfo info = new DisconnectInfo();
+ info.reason = "Connection Error";
+ info.error = t;
+ closeConnections(info);
+ return;
+ }
+
+ for( ErrorListener l : errorListeners ) {
+ l.handleError( this, t );
+ }
+ }
+
+ protected void configureChannels( long tempId, int[] ports ) {
+
+ try {
+ for( int i = 0; i < ports.length; i++ ) {
+ Connector c = connectorFactory.createConnector( i, ports[i] );
+ ConnectorAdapter ca = new ConnectorAdapter(c, dispatcher, dispatcher, true);
+ int ch = channels.size();
+ channels.add( ca );
+
+ // Need to send the connection its hook-up registration
+ // and start it.
+ ca.start();
+ ClientRegistrationMessage reg;
+ reg = new ClientRegistrationMessage();
+ reg.setId(tempId);
+ reg.setReliable(true);
+ send( ch, reg, false );
+ }
+ } catch( IOException e ) {
+ throw new RuntimeException( "Error configuring channels", e );
+ }
+ }
+
+ protected void dispatch( Message m )
+ {
+ // Pull off the connection management messages we're
+ // interested in and then pass on the rest.
+ if( m instanceof ClientRegistrationMessage ) {
+ // Then we've gotten our real id
+ this.id = (int)((ClientRegistrationMessage)m).getId();
+ log.log( Level.INFO, "Connection established, id:{0}.", this.id );
+ connecting.countDown();
+ fireConnected();
+ return;
+ } else if( m instanceof ChannelInfoMessage ) {
+ // This is an interum step in the connection process and
+ // now we need to add a bunch of connections
+ configureChannels( ((ChannelInfoMessage)m).getId(), ((ChannelInfoMessage)m).getPorts() );
+ return;
+ } else if( m instanceof DisconnectMessage ) {
+ // Can't do too much else yet
+ String reason = ((DisconnectMessage)m).getReason();
+ log.log( Level.SEVERE, "Connection terminated, reason:{0}.", reason );
+ DisconnectInfo info = new DisconnectInfo();
+ info.reason = reason;
+ closeConnections(info);
+ }
+
+ // Make sure client MessageListeners are called single-threaded
+ // since it could receive messages from the TCP and UDP
+ // thread simultaneously.
+ synchronized( this ) {
+ messageListeners.messageReceived( this, m );
+ }
+ }
+
+ protected class Redispatch implements MessageListener<Object>, ErrorListener<Object>
+ {
+ public void messageReceived( Object source, Message m )
+ {
+ dispatch( m );
+ }
+
+ public void handleError( Object source, Throwable t )
+ {
+ // Only doing the DefaultClient.this to make the code
+ // checker happy... it compiles fine without it but I
+ // don't like red lines in my editor. :P
+ DefaultClient.this.handleError( t );
+ }
+ }
+}
diff --git a/engine/src/networking/com/jme3/network/base/DefaultServer.java b/engine/src/networking/com/jme3/network/base/DefaultServer.java
new file mode 100644
index 0000000..28c4c80
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/base/DefaultServer.java
@@ -0,0 +1,591 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.base;
+
+import com.jme3.network.*;
+import com.jme3.network.kernel.Endpoint;
+import com.jme3.network.kernel.Kernel;
+import com.jme3.network.message.ChannelInfoMessage;
+import com.jme3.network.message.ClientRegistrationMessage;
+import com.jme3.network.message.DisconnectMessage;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A default implementation of the Server interface that delegates
+ * its network connectivity to kernel.Kernel.
+ *
+ * @version $Revision: 9114 $
+ * @author Paul Speed
+ */
+public class DefaultServer implements Server
+{
+ static Logger log = Logger.getLogger(DefaultServer.class.getName());
+
+ // First two channels are reserved for reliable and
+ // unreliable
+ private static final int CH_RELIABLE = 0;
+ private static final int CH_UNRELIABLE = 1;
+ private static final int CH_FIRST = 2;
+
+ private boolean isRunning = false;
+ private AtomicInteger nextId = new AtomicInteger(0);
+ private String gameName;
+ private int version;
+ private KernelFactory kernelFactory = KernelFactory.DEFAULT;
+ private KernelAdapter reliableAdapter;
+ private KernelAdapter fastAdapter;
+ private List<KernelAdapter> channels = new ArrayList<KernelAdapter>();
+ private List<Integer> alternatePorts = new ArrayList<Integer>();
+ private Redispatch dispatcher = new Redispatch();
+ private Map<Integer,HostedConnection> connections = new ConcurrentHashMap<Integer,HostedConnection>();
+ private Map<Endpoint,HostedConnection> endpointConnections
+ = new ConcurrentHashMap<Endpoint,HostedConnection>();
+
+ // Keeps track of clients for whom we've only received the UDP
+ // registration message
+ private Map<Long,Connection> connecting = new ConcurrentHashMap<Long,Connection>();
+
+ private MessageListenerRegistry<HostedConnection> messageListeners
+ = new MessageListenerRegistry<HostedConnection>();
+ private List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>();
+
+ public DefaultServer( String gameName, int version, Kernel reliable, Kernel fast )
+ {
+ if( reliable == null )
+ throw new IllegalArgumentException( "Default server reqiures a reliable kernel instance." );
+
+ this.gameName = gameName;
+ this.version = version;
+
+ reliableAdapter = new KernelAdapter( this, reliable, dispatcher, true );
+ channels.add( reliableAdapter );
+ if( fast != null ) {
+ fastAdapter = new KernelAdapter( this, fast, dispatcher, false );
+ channels.add( fastAdapter );
+ }
+ }
+
+ public String getGameName()
+ {
+ return gameName;
+ }
+
+ public int getVersion()
+ {
+ return version;
+ }
+
+ public int addChannel( int port )
+ {
+ if( isRunning )
+ throw new IllegalStateException( "Channels cannot be added once server is started." );
+
+ // Note: it does bug me that channels aren't 100% universal and
+ // setup externally but it requires a more invasive set of changes
+ // for "connection types" and some kind of registry of kernel and
+ // connector factories. This really would be the best approach and
+ // would allow all kinds of channel customization maybe... but for
+ // now, we hard-code the standard connections and treat the +2 extras
+ // differently.
+
+ // Check for consistency with the channels list
+ if( channels.size() - CH_FIRST != alternatePorts.size() )
+ throw new IllegalStateException( "Channel and port lists do not match." );
+
+ try {
+ int result = alternatePorts.size();
+ alternatePorts.add(port);
+
+ Kernel kernel = kernelFactory.createKernel(result, port);
+ channels.add( new KernelAdapter(this, kernel, dispatcher, true) );
+
+ return result;
+ } catch( IOException e ) {
+ throw new RuntimeException( "Error adding channel for port:" + port, e );
+ }
+ }
+
+ protected void checkChannel( int channel )
+ {
+ if( channel < 0 || channel >= alternatePorts.size() )
+ throw new IllegalArgumentException( "Channel is undefined:" + channel );
+ }
+
+ public void start()
+ {
+ if( isRunning )
+ throw new IllegalStateException( "Server is already started." );
+
+ // Initialize the kernels
+ for( KernelAdapter ka : channels ) {
+ ka.initialize();
+ }
+
+ // Start em up
+ for( KernelAdapter ka : channels ) {
+ ka.start();
+ }
+
+ isRunning = true;
+ }
+
+ public boolean isRunning()
+ {
+ return isRunning;
+ }
+
+ public void close()
+ {
+ if( !isRunning )
+ throw new IllegalStateException( "Server is not started." );
+
+ try {
+ // Kill the adpaters, they will kill the kernels
+ for( KernelAdapter ka : channels ) {
+ ka.close();
+ }
+
+ isRunning = false;
+ } catch( InterruptedException e ) {
+ throw new RuntimeException( "Interrupted while closing", e );
+ }
+ }
+
+ public void broadcast( Message message )
+ {
+ broadcast( null, message );
+ }
+
+ public void broadcast( Filter<? super HostedConnection> filter, Message message )
+ {
+ if( connections.isEmpty() )
+ return;
+
+ ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
+
+ FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
+
+ if( message.isReliable() || fastAdapter == null ) {
+ // Don't need to copy the data because message protocol is already
+ // giving us a fresh buffer
+ reliableAdapter.broadcast( adapter, buffer, true, false );
+ } else {
+ fastAdapter.broadcast( adapter, buffer, false, false );
+ }
+ }
+
+ public void broadcast( int channel, Filter<? super HostedConnection> filter, Message message )
+ {
+ if( connections.isEmpty() )
+ return;
+
+ checkChannel(channel);
+
+ ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
+
+ FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter);
+
+ channels.get(channel+CH_FIRST).broadcast( adapter, buffer, true, false );
+ }
+
+ public HostedConnection getConnection( int id )
+ {
+ return connections.get(id);
+ }
+
+ public boolean hasConnections()
+ {
+ return !connections.isEmpty();
+ }
+
+ public Collection<HostedConnection> getConnections()
+ {
+ return Collections.unmodifiableCollection((Collection<HostedConnection>)connections.values());
+ }
+
+ public void addConnectionListener( ConnectionListener listener )
+ {
+ connectionListeners.add(listener);
+ }
+
+ public void removeConnectionListener( ConnectionListener listener )
+ {
+ connectionListeners.remove(listener);
+ }
+
+ public void addMessageListener( MessageListener<? super HostedConnection> listener )
+ {
+ messageListeners.addMessageListener( listener );
+ }
+
+ public void addMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )
+ {
+ messageListeners.addMessageListener( listener, classes );
+ }
+
+ public void removeMessageListener( MessageListener<? super HostedConnection> listener )
+ {
+ messageListeners.removeMessageListener( listener );
+ }
+
+ public void removeMessageListener( MessageListener<? super HostedConnection> listener, Class... classes )
+ {
+ messageListeners.removeMessageListener( listener, classes );
+ }
+
+ protected void dispatch( HostedConnection source, Message m )
+ {
+ if( source == null ) {
+ messageListeners.messageReceived( source, m );
+ } else {
+
+ // A semi-heavy handed way to make sure the listener
+ // doesn't get called at the same time from two different
+ // threads for the same hosted connection.
+ synchronized( source ) {
+ messageListeners.messageReceived( source, m );
+ }
+ }
+ }
+
+ protected void fireConnectionAdded( HostedConnection conn )
+ {
+ for( ConnectionListener l : connectionListeners ) {
+ l.connectionAdded( this, conn );
+ }
+ }
+
+ protected void fireConnectionRemoved( HostedConnection conn )
+ {
+ for( ConnectionListener l : connectionListeners ) {
+ l.connectionRemoved( this, conn );
+ }
+ }
+
+ protected int getChannel( KernelAdapter ka )
+ {
+ return channels.indexOf(ka);
+ }
+
+ protected void registerClient( KernelAdapter ka, Endpoint p, ClientRegistrationMessage m )
+ {
+ Connection addedConnection = null;
+
+ // generally this will only be called by one thread but it's
+ // important enough I won't take chances
+ synchronized( this ) {
+ // Grab the random ID that the client created when creating
+ // its two registration messages
+ long tempId = m.getId();
+
+ // See if we already have one
+ Connection c = connecting.remove(tempId);
+ if( c == null ) {
+ c = new Connection(channels.size());
+ log.log( Level.FINE, "Registering client for endpoint, pass 1:{0}.", p );
+ } else {
+ log.log( Level.FINE, "Refining client registration for endpoint:{0}.", p );
+ }
+
+ // Fill in what we now know
+ int channel = getChannel(ka);
+ c.setChannel(channel, p);
+ log.log( Level.FINE, "Setting up channel:{0}", channel );
+
+ // If it's channel 0 then this is the initial connection
+ // and we will send the connection information
+ if( channel == CH_RELIABLE ) {
+ // Validate the name and version which is only sent
+ // over the reliable connection at this point.
+ if( !getGameName().equals(m.getGameName())
+ || getVersion() != m.getVersion() ) {
+
+ log.log( Level.INFO, "Kicking client due to name/version mismatch:{0}.", c );
+
+ // Need to kick them off... I may regret doing this from within
+ // the sync block but the alternative is more code
+ c.close( "Server client mismatch, server:" + getGameName() + " v" + getVersion()
+ + " client:" + m.getGameName() + " v" + m.getVersion() );
+ return;
+ }
+
+ // Else send the extra channel information to the client
+ if( !alternatePorts.isEmpty() ) {
+ ChannelInfoMessage cim = new ChannelInfoMessage( m.getId(), alternatePorts );
+ c.send(cim);
+ }
+ }
+
+ if( c.isComplete() ) {
+ // Then we are fully connected
+ if( connections.put( c.getId(), c ) == null ) {
+
+ for( Endpoint cp : c.channels ) {
+ if( cp == null )
+ continue;
+ endpointConnections.put( cp, c );
+ }
+
+ addedConnection = c;
+ }
+ } else {
+ // Need to keep getting channels so we'll keep it in
+ // the map
+ connecting.put(tempId, c);
+ }
+ }
+
+ // Best to do this outside of the synch block to avoid
+ // over synchronizing which is the path to deadlocks
+ if( addedConnection != null ) {
+ log.log( Level.INFO, "Client registered:{0}.", addedConnection );
+
+ // Send the ID back to the client letting it know it's
+ // fully connected.
+ m = new ClientRegistrationMessage();
+ m.setId( addedConnection.getId() );
+ m.setReliable(true);
+ addedConnection.send(m);
+
+ // Now we can notify the listeners about the
+ // new connection.
+ fireConnectionAdded( addedConnection );
+ }
+ }
+
+ protected HostedConnection getConnection( Endpoint endpoint )
+ {
+ return endpointConnections.get(endpoint);
+ }
+
+ protected void connectionClosed( Endpoint p )
+ {
+ if( p.isConnected() ) {
+ log.log( Level.INFO, "Connection closed:{0}.", p );
+ } else {
+ log.log( Level.FINE, "Connection closed:{0}.", p );
+ }
+
+ // Try to find the endpoint in all ways that it might
+ // exist. Note: by this point the raw network channel is
+ // closed already.
+
+ // Also note: this method will be called multiple times per
+ // HostedConnection if it has multiple endpoints.
+
+ Connection removed = null;
+ synchronized( this ) {
+ // Just in case the endpoint was still connecting
+ connecting.values().remove(p);
+
+ // And the regular management
+ removed = (Connection)endpointConnections.remove(p);
+ if( removed != null ) {
+ connections.remove( removed.getId() );
+ }
+
+ log.log( Level.FINE, "Connections size:{0}", connections.size() );
+ log.log( Level.FINE, "Endpoint mappings size:{0}", endpointConnections.size() );
+ }
+
+ // Better not to fire events while we hold a lock
+ // so always do this outside the synch block.
+ // Note: checking removed.closed just to avoid spurious log messages
+ // since in general we are called back for every endpoint closing.
+ if( removed != null && !removed.closed ) {
+
+ log.log( Level.INFO, "Client closed:{0}.", removed );
+
+ removed.closeConnection();
+ }
+ }
+
+ protected class Connection implements HostedConnection
+ {
+ private int id;
+ private boolean closed;
+ private Endpoint[] channels;
+ private int setChannelCount = 0;
+
+ private Map<String,Object> sessionData = new ConcurrentHashMap<String,Object>();
+
+ public Connection( int channelCount )
+ {
+ id = nextId.getAndIncrement();
+ channels = new Endpoint[channelCount];
+ }
+
+ void setChannel( int channel, Endpoint p )
+ {
+ if( channels[channel] != null && channels[channel] != p ) {
+ throw new RuntimeException( "Channel has already been set:" + channel
+ + " = " + channels[channel] + ", cannot be set to:" + p );
+ }
+ channels[channel] = p;
+ if( p != null )
+ setChannelCount++;
+ }
+
+ boolean isComplete()
+ {
+ return setChannelCount == channels.length;
+ }
+
+ public Server getServer()
+ {
+ return DefaultServer.this;
+ }
+
+ public int getId()
+ {
+ return id;
+ }
+
+ public String getAddress()
+ {
+ return channels[CH_RELIABLE] == null ? null : channels[CH_RELIABLE].getAddress();
+ }
+
+ public void send( Message message )
+ {
+ ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
+ if( message.isReliable() || channels[CH_UNRELIABLE] == null ) {
+ channels[CH_RELIABLE].send( buffer );
+ } else {
+ channels[CH_UNRELIABLE].send( buffer );
+ }
+ }
+
+ public void send( int channel, Message message )
+ {
+ checkChannel(channel);
+ ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null);
+ channels[channel+CH_FIRST].send(buffer);
+ }
+
+ protected void closeConnection()
+ {
+ if( closed )
+ return;
+ closed = true;
+
+ // Make sure all endpoints are closed. Note: reliable
+ // should always already be closed through all paths that I
+ // can conceive... but it doesn't hurt to be sure.
+ for( Endpoint p : channels ) {
+ if( p == null )
+ continue;
+ p.close();
+ }
+
+ fireConnectionRemoved( this );
+ }
+
+ public void close( String reason )
+ {
+ // Send a reason
+ DisconnectMessage m = new DisconnectMessage();
+ m.setType( DisconnectMessage.KICK );
+ m.setReason( reason );
+ m.setReliable( true );
+ send( m );
+
+ // Just close the reliable endpoint
+ // fast will be cleaned up as a side-effect
+ // when closeConnection() is called by the
+ // connectionClosed() endpoint callback.
+ if( channels[CH_RELIABLE] != null ) {
+ // Close with flush so we make sure our
+ // message gets out
+ channels[CH_RELIABLE].close(true);
+ }
+ }
+
+ public Object setAttribute( String name, Object value )
+ {
+ if( value == null )
+ return sessionData.remove(name);
+ return sessionData.put(name, value);
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T getAttribute( String name )
+ {
+ return (T)sessionData.get(name);
+ }
+
+ public Set<String> attributeNames()
+ {
+ return Collections.unmodifiableSet(sessionData.keySet());
+ }
+
+ public String toString()
+ {
+ return "Connection[ id=" + id + ", reliable=" + channels[CH_RELIABLE]
+ + ", fast=" + channels[CH_UNRELIABLE] + " ]";
+ }
+ }
+
+ protected class Redispatch implements MessageListener<HostedConnection>
+ {
+ public void messageReceived( HostedConnection source, Message m )
+ {
+ dispatch( source, m );
+ }
+ }
+
+ protected class FilterAdapter implements Filter<Endpoint>
+ {
+ private Filter<? super HostedConnection> delegate;
+
+ public FilterAdapter( Filter<? super HostedConnection> delegate )
+ {
+ this.delegate = delegate;
+ }
+
+ public boolean apply( Endpoint input )
+ {
+ HostedConnection conn = getConnection( input );
+ if( conn == null )
+ return false;
+ return delegate.apply(conn);
+ }
+ }
+}
diff --git a/engine/src/networking/com/jme3/network/base/KernelAdapter.java b/engine/src/networking/com/jme3/network/base/KernelAdapter.java
new file mode 100644
index 0000000..8477aad
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/base/KernelAdapter.java
@@ -0,0 +1,296 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.base;
+
+import com.jme3.network.Filter;
+import com.jme3.network.HostedConnection;
+import com.jme3.network.Message;
+import com.jme3.network.MessageListener;
+import com.jme3.network.kernel.Endpoint;
+import com.jme3.network.kernel.EndpointEvent;
+import com.jme3.network.kernel.Envelope;
+import com.jme3.network.kernel.Kernel;
+import com.jme3.network.message.ClientRegistrationMessage;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Wraps a single Kernel and forwards new messages
+ * to the supplied message dispatcher and new endpoint
+ * events to the connection dispatcher. This is used
+ * by DefaultServer to manage its kernel objects.
+ *
+ * <p>This adapter assumes a simple protocol where two
+ * bytes define a (short) object size with the object data
+ * to follow. Note: this limits the size of serialized
+ * objects to 32676 bytes... even though, for example,
+ * datagram packets can hold twice that. :P</p>
+ *
+ * @version $Revision: 8944 $
+ * @author Paul Speed
+ */
+public class KernelAdapter extends Thread
+{
+ static Logger log = Logger.getLogger(KernelAdapter.class.getName());
+
+ private DefaultServer server; // this is unfortunate
+ private Kernel kernel;
+ private MessageListener<HostedConnection> messageDispatcher;
+ private AtomicBoolean go = new AtomicBoolean(true);
+
+ // Keeps track of the in-progress messages that are received
+ // on reliable connections
+ private Map<Endpoint, MessageProtocol> messageBuffers = new ConcurrentHashMap<Endpoint,MessageProtocol>();
+
+ // Marks the messages as reliable or not if they came
+ // through this connector.
+ private boolean reliable;
+
+ public KernelAdapter( DefaultServer server, Kernel kernel, MessageListener<HostedConnection> messageDispatcher,
+ boolean reliable )
+ {
+ super( String.valueOf(kernel) );
+ this.server = server;
+ this.kernel = kernel;
+ this.messageDispatcher = messageDispatcher;
+ this.reliable = reliable;
+ setDaemon(true);
+ }
+
+ public Kernel getKernel()
+ {
+ return kernel;
+ }
+
+ public void initialize()
+ {
+ kernel.initialize();
+ }
+
+ public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable,
+ boolean copy )
+ {
+ kernel.broadcast( filter, data, reliable, copy );
+ }
+
+ public void close() throws InterruptedException
+ {
+ go.set(false);
+
+ // Kill the kernel
+ kernel.terminate();
+ }
+
+ protected void reportError( Endpoint p, Object context, Exception e )
+ {
+ // Should really be queued up so the outer thread can
+ // retrieve them. For now we'll just log it. FIXME
+ log.log( Level.SEVERE, "Unhandled error, endpoint:" + p + ", context:" + context, e );
+
+ // In lieu of other options, at least close the endpoint
+ p.close();
+ }
+
+ protected HostedConnection getConnection( Endpoint p )
+ {
+ return server.getConnection(p);
+ }
+
+ protected void connectionClosed( Endpoint p )
+ {
+ // Remove any message buffer we've been accumulating
+ // on behalf of this endpoing
+ messageBuffers.remove(p);
+
+ log.log( Level.FINE, "Buffers size:{0}", messageBuffers.size() );
+
+ server.connectionClosed(p);
+ }
+
+ /**
+ * Note on threading for those writing their own server
+ * or adapter implementations. The rule that a single connection be
+ * processed by only one thread at a time is more about ensuring that
+ * the messages are delivered in the order that they are received
+ * than for any user-code safety. 99% of the time the user code should
+ * be writing for multithreaded access anyway.
+ *
+ * <p>The issue with the messages is that if a an implementation is
+ * using a general thread pool then it would be possible for a
+ * naive implementation to have one thread grab an Envelope from
+ * connection 1's and another grab the next Envelope. Since an Envelope
+ * may contain several messages, delivering the second thread's messages
+ * before or during the first's would be really confusing and hard
+ * to code for in user code.</p>
+ *
+ * <p>And that's why this note is here. DefaultServer does a rudimentary
+ * per-connection locking but it couldn't possibly guard against
+ * out of order Envelope processing.</p>
+ */
+ protected void dispatch( Endpoint p, Message m )
+ {
+ // Because this class is the only one with the information
+ // to do it... we need to pull of the registration message
+ // here.
+ if( m instanceof ClientRegistrationMessage ) {
+ server.registerClient( this, p, (ClientRegistrationMessage)m );
+ return;
+ }
+
+ try {
+ HostedConnection source = getConnection(p);
+ if( source == null ) {
+ if( reliable ) {
+ // If it's a reliable connection then it's slightly more
+ // concerning but this can happen all the time for a UDP endpoint.
+ log.log( Level.WARNING, "Recieved message from unconnected endpoint:" + p + " message:" + m );
+ }
+ return;
+ }
+ messageDispatcher.messageReceived( source, m );
+ } catch( Exception e ) {
+ reportError(p, m, e);
+ }
+ }
+
+ protected MessageProtocol getMessageBuffer( Endpoint p )
+ {
+ if( !reliable ) {
+ // Since UDP comes in packets and they aren't split
+ // up, there is no reason to buffer. In fact, there would
+ // be a down side because there is no way for us to reliably
+ // clean these up later since we'd create another one for
+ // any random UDP packet that comes to the port.
+ return new MessageProtocol();
+ } else {
+ // See if we already have one
+ MessageProtocol result = messageBuffers.get(p);
+ if( result == null ) {
+ result = new MessageProtocol();
+ messageBuffers.put(p, result);
+ }
+ return result;
+ }
+ }
+
+ protected void createAndDispatch( Envelope env )
+ {
+ MessageProtocol protocol = getMessageBuffer(env.getSource());
+
+ byte[] data = env.getData();
+ ByteBuffer buffer = ByteBuffer.wrap(data);
+
+ int count = protocol.addBuffer( buffer );
+ if( count == 0 ) {
+ // This can happen if there was only a partial message
+ // received. However, this should never happen for unreliable
+ // connections.
+ if( !reliable ) {
+ // Log some additional information about the packet.
+ int len = Math.min( 10, data.length );
+ StringBuilder sb = new StringBuilder();
+ for( int i = 0; i < len; i++ ) {
+ sb.append( "[" + Integer.toHexString(data[i]) + "]" );
+ }
+ log.log( Level.INFO, "First 10 bytes of incomplete nessage:" + sb );
+ throw new RuntimeException( "Envelope contained incomplete data:" + env );
+ }
+ }
+
+ // Should be complete... and maybe we should check but we don't
+ Message m = null;
+ while( (m = protocol.getMessage()) != null ) {
+ m.setReliable(reliable);
+ dispatch( env.getSource(), m );
+ }
+ }
+
+ protected void createAndDispatch( EndpointEvent event )
+ {
+ // Only need to tell the server about disconnects
+ if( event.getType() == EndpointEvent.Type.REMOVE ) {
+ connectionClosed( event.getEndpoint() );
+ }
+ }
+
+ protected void flushEvents()
+ {
+ EndpointEvent event;
+ while( (event = kernel.nextEvent()) != null ) {
+ try {
+ createAndDispatch( event );
+ } catch( Exception e ) {
+ reportError(event.getEndpoint(), event, e);
+ }
+ }
+ }
+
+ public void run()
+ {
+ while( go.get() ) {
+
+ try {
+ // Check for pending events
+ flushEvents();
+
+ // Grab the next envelope
+ Envelope e = kernel.read();
+ if( e == Kernel.EVENTS_PENDING )
+ continue; // We'll catch it up above
+
+ // Check for pending events that might have
+ // come in while we were blocking. This is usually
+ // when the connection add events come through
+ flushEvents();
+
+ try {
+ createAndDispatch( e );
+ } catch( Exception ex ) {
+ reportError(e.getSource(), e, ex);
+ }
+
+ } catch( InterruptedException ex ) {
+ if( !go.get() )
+ return;
+ throw new RuntimeException( "Unexpected interruption", ex );
+ }
+ }
+ }
+
+}
+
+
diff --git a/engine/src/networking/com/jme3/network/base/KernelFactory.java b/engine/src/networking/com/jme3/network/base/KernelFactory.java
new file mode 100644
index 0000000..0074d39
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/base/KernelFactory.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.base;
+
+import com.jme3.network.kernel.Kernel;
+import java.io.IOException;
+
+
+/**
+ * Supplied to the DefaultServer to create any additional
+ * channel kernels that might be required.
+ *
+ * @version $Revision: 8938 $
+ * @author Paul Speed
+ */
+public interface KernelFactory
+{
+ public static final KernelFactory DEFAULT = new NioKernelFactory();
+
+ public Kernel createKernel( int channel, int port ) throws IOException;
+}
diff --git a/engine/src/networking/com/jme3/network/base/MessageListenerRegistry.java b/engine/src/networking/com/jme3/network/base/MessageListenerRegistry.java
new file mode 100644
index 0000000..c861786
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/base/MessageListenerRegistry.java
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.base;
+
+import com.jme3.network.Message;
+import com.jme3.network.MessageListener;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * Keeps track of message listeners registered to specific
+ * types or to any type.
+ *
+ * @version $Revision: 8843 $
+ * @author Paul Speed
+ */
+public class MessageListenerRegistry<S> implements MessageListener<S>
+{
+ static Logger log = Logger.getLogger(MessageListenerRegistry.class.getName());
+
+ private List<MessageListener<? super S>> listeners = new CopyOnWriteArrayList<MessageListener<? super S>>();
+ private Map<Class,List<MessageListener<? super S>>> typeListeners
+ = new ConcurrentHashMap<Class,List<MessageListener<? super S>>>();
+
+ public MessageListenerRegistry()
+ {
+ }
+
+ public void messageReceived( S source, Message m )
+ {
+ boolean delivered = false;
+
+ for( MessageListener<? super S> l : listeners ) {
+ l.messageReceived( source, m );
+ delivered = true;
+ }
+
+ for( MessageListener<? super S> l : getListeners(m.getClass(),false) ) {
+ l.messageReceived( source, m );
+ delivered = true;
+ }
+
+ if( !delivered ) {
+ log.log( Level.INFO, "Received message had no registered listeners: {0}", m );
+ }
+ }
+
+ protected List<MessageListener<? super S>> getListeners( Class c, boolean create )
+ {
+ List<MessageListener<? super S>> result = typeListeners.get(c);
+ if( result == null && create ) {
+ result = new CopyOnWriteArrayList<MessageListener<? super S>>();
+ typeListeners.put( c, result );
+ }
+
+ if( result == null ) {
+ result = Collections.emptyList();
+ }
+ return result;
+ }
+
+ public void addMessageListener( MessageListener<? super S> listener )
+ {
+ listeners.add(listener);
+ }
+
+ public void removeMessageListener( MessageListener<? super S> listener )
+ {
+ listeners.remove(listener);
+ }
+
+ public void addMessageListener( MessageListener<? super S> listener, Class... classes )
+ {
+ for( Class c : classes ) {
+ getListeners(c, true).add(listener);
+ }
+ }
+
+ public void removeMessageListener( MessageListener<? super S> listener, Class... classes )
+ {
+ for( Class c : classes ) {
+ getListeners(c, false).remove(listener);
+ }
+ }
+}
diff --git a/engine/src/networking/com/jme3/network/base/MessageProtocol.java b/engine/src/networking/com/jme3/network/base/MessageProtocol.java
new file mode 100644
index 0000000..eb22302
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/base/MessageProtocol.java
@@ -0,0 +1,191 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.base;
+
+import com.jme3.network.Message;
+import com.jme3.network.serializing.Serializer;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+
+/**
+ * Consolidates the conversion of messages to/from byte buffers
+ * and provides a rolling message buffer. ByteBuffers can be
+ * pushed in and messages will be extracted, accumulated, and
+ * available for retrieval. This is not thread safe and is meant
+ * to be used within a single message processing thread.
+ *
+ * <p>The protocol is based on a simple length + data format
+ * where two bytes represent the (short) length of the data
+ * and the rest is the raw data for the Serializers class.</p>
+ *
+ * @version $Revision: 8843 $
+ * @author Paul Speed
+ */
+public class MessageProtocol
+{
+ private LinkedList<Message> messages = new LinkedList<Message>();
+ private ByteBuffer current;
+ private int size;
+ private Byte carry;
+
+ /**
+ * Converts a message to a ByteBuffer using the Serializer
+ * and the (short length) + data protocol. If target is null
+ * then a 32k byte buffer will be created and filled.
+ */
+ public static ByteBuffer messageToBuffer( Message message, ByteBuffer target )
+ {
+ // Could let the caller pass their own in
+ ByteBuffer buffer = target == null ? ByteBuffer.allocate( 32767 + 2 ) : target;
+
+ try {
+ buffer.position( 2 );
+ Serializer.writeClassAndObject( buffer, message );
+ buffer.flip();
+ short dataLength = (short)(buffer.remaining() - 2);
+ buffer.putShort( dataLength );
+ buffer.position( 0 );
+
+ return buffer;
+ } catch( IOException e ) {
+ throw new RuntimeException( "Error serializing message", e );
+ }
+ }
+
+ /**
+ * Retrieves and removes an extracted message from the accumulated buffer
+ * or returns null if there are no more messages.
+ */
+ public Message getMessage()
+ {
+ if( messages.isEmpty() ) {
+ return null;
+ }
+
+ return messages.removeFirst();
+ }
+
+ /**
+ * Adds the specified buffer, extracting the contained messages
+ * and making them available to getMessage(). The left over
+ * data is buffered to be combined with future data.
+ &
+ * @return The total number of queued messages after this call.
+ */
+ public int addBuffer( ByteBuffer buffer )
+ {
+ // push the data from the buffer into as
+ // many messages as we can
+ while( buffer.remaining() > 0 ) {
+
+ if( current == null ) {
+
+ // If we have a left over carry then we need to
+ // do manual processing to get the short value
+ if( carry != null ) {
+ byte high = carry;
+ byte low = buffer.get();
+
+ size = (high & 0xff) << 8 | (low & 0xff);
+ carry = null;
+ }
+ else if( buffer.remaining() < 2 ) {
+ // It's possible that the supplied buffer only has one
+ // byte in it... and in that case we will get an underflow
+ // when attempting to read the short below.
+
+ // It has to be 1 or we'd never get here... but one
+ // isn't enough so we stash it away.
+ carry = buffer.get();
+ break;
+ } else {
+ // We are not currently reading an object so
+ // grab the size.
+ // Note: this is somewhat limiting... int would
+ // be better.
+ size = buffer.getShort();
+ }
+
+ // Allocate the buffer into which we'll feed the
+ // data as we get it
+ current = ByteBuffer.allocate(size);
+ }
+
+ if( current.remaining() <= buffer.remaining() ) {
+ // We have at least one complete object so
+ // copy what we can into current, create a message,
+ // and then continue pulling from buffer.
+
+ // Artificially set the limit so we don't overflow
+ int extra = buffer.remaining() - current.remaining();
+ buffer.limit( buffer.position() + current.remaining() );
+
+ // Now copy the data
+ current.put( buffer );
+ current.flip();
+
+ // Now set the limit back to a good value
+ buffer.limit( buffer.position() + extra );
+
+ createMessage( current );
+
+ current = null;
+ } else {
+
+ // Not yet a complete object so just copy what we have
+ current.put( buffer );
+ }
+ }
+
+ return messages.size();
+ }
+
+ /**
+ * Creates a message from the properly sized byte buffer
+ * and adds it to the messages queue.
+ */
+ protected void createMessage( ByteBuffer buffer )
+ {
+ try {
+ Object obj = Serializer.readClassAndObject( buffer );
+ Message m = (Message)obj;
+ messages.add(m);
+ } catch( IOException e ) {
+ throw new RuntimeException( "Error deserializing object", e );
+ }
+ }
+}
+
+
+
diff --git a/engine/src/networking/com/jme3/network/base/NioKernelFactory.java b/engine/src/networking/com/jme3/network/base/NioKernelFactory.java
new file mode 100644
index 0000000..c9fab08
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/base/NioKernelFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.base;
+
+import com.jme3.network.kernel.Kernel;
+import com.jme3.network.kernel.tcp.SelectorKernel;
+import java.io.IOException;
+
+
+/**
+ * KernelFactory implemention for creating TCP kernels
+ * using the NIO selector model.
+ *
+ * @version $Revision: 8938 $
+ * @author Paul Speed
+ */
+public class NioKernelFactory implements KernelFactory
+{
+ public Kernel createKernel( int channel, int port ) throws IOException
+ {
+ return new SelectorKernel(port);
+ }
+}
diff --git a/engine/src/networking/com/jme3/network/base/TcpConnectorFactory.java b/engine/src/networking/com/jme3/network/base/TcpConnectorFactory.java
new file mode 100644
index 0000000..e53e62d
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/base/TcpConnectorFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright (c) 2011 jMonkeyEngine
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ *
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in the
+ * documentation and/or other materials provided with the distribution.
+ *
+ * * Neither the name of 'jMonkeyEngine' nor the names of its contributors
+ * may be used to endorse or promote products derived from this software
+ * without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
+ * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
+ * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
+ * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
+ * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
+ * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package com.jme3.network.base;
+
+import com.jme3.network.kernel.Connector;
+import com.jme3.network.kernel.tcp.SocketConnector;
+import java.io.IOException;
+import java.net.InetAddress;
+
+
+/**
+ * Creates TCP connectors to a specific remote address.
+ *
+ * @version $Revision: 8938 $
+ * @author Paul Speed
+ */
+public class TcpConnectorFactory implements ConnectorFactory
+{
+ private InetAddress remoteAddress;
+
+ public TcpConnectorFactory( InetAddress remoteAddress )
+ {
+ this.remoteAddress = remoteAddress;
+ }
+
+ public Connector createConnector( int channel, int port ) throws IOException
+ {
+ return new SocketConnector( remoteAddress, port );
+ }
+}
diff --git a/engine/src/networking/com/jme3/network/base/package.html b/engine/src/networking/com/jme3/network/base/package.html
new file mode 100644
index 0000000..a00cb16
--- /dev/null
+++ b/engine/src/networking/com/jme3/network/base/package.html
@@ -0,0 +1,12 @@
+<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
+<html>
+<head>
+<title></title>
+<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
+</head>
+<body>
+The base package contains the default implementations for the
+{@link com.jme3.network.Client} and {@link com.jme3.network.Server}
+interfaces from the public API.
+</body>
+</html>