diff options
Diffstat (limited to 'engine/src/networking/com/jme3/network/base')
11 files changed, 2068 insertions, 0 deletions
diff --git a/engine/src/networking/com/jme3/network/base/ConnectorAdapter.java b/engine/src/networking/com/jme3/network/base/ConnectorAdapter.java new file mode 100644 index 0000000..3827786 --- /dev/null +++ b/engine/src/networking/com/jme3/network/base/ConnectorAdapter.java @@ -0,0 +1,218 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.base; + +import com.jme3.network.ErrorListener; +import com.jme3.network.Message; +import com.jme3.network.MessageListener; +import com.jme3.network.kernel.Connector; +import com.jme3.network.kernel.ConnectorException; +import java.nio.ByteBuffer; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Wraps a single Connector and forwards new messages + * to the supplied message dispatcher. This is used + * by DefaultClient to manage its connector objects. + * This is only responsible for message reading and provides + * no support for buffering writes. + * + * <p>This adapter assumes a simple protocol where two + * bytes define a (short) object size with the object data + * to follow. Note: this limits the size of serialized + * objects to 32676 bytes... even though, for example, + * datagram packets can hold twice that. :P</p> + * + * @version $Revision: 8944 $ + * @author Paul Speed + */ +public class ConnectorAdapter extends Thread +{ + private static final int OUTBOUND_BACKLOG = 16000; + + private Connector connector; + private MessageListener<Object> dispatcher; + private ErrorListener<Object> errorHandler; + private AtomicBoolean go = new AtomicBoolean(true); + + private BlockingQueue<ByteBuffer> outbound; + + // Writes messages out on a background thread + private WriterThread writer; + + // Marks the messages as reliable or not if they came + // through this connector. + private boolean reliable; + + public ConnectorAdapter( Connector connector, MessageListener<Object> dispatcher, + ErrorListener<Object> errorHandler, boolean reliable ) + { + super( String.valueOf(connector) ); + this.connector = connector; + this.dispatcher = dispatcher; + this.errorHandler = errorHandler; + this.reliable = reliable; + setDaemon(true); + + // The backlog makes sure that the outbound channel blocks once + // a certain backlog level is reached. It is set high so that it + // is only reached in the worst cases... which are usually things like + // raw throughput tests. Technically, a saturated TCP channel could + // back up quite a bit if the buffers are full and the socket has + // stalled but 16,000 messages is still a big backlog. + outbound = new ArrayBlockingQueue<ByteBuffer>(OUTBOUND_BACKLOG); + + // Note: this technically adds a potential deadlock case + // with the above code where there wasn't one before. For example, + // if a TCP outbound queue fills to capacity and a client sends + // in such a way that they block TCP message handling then if the HostedConnection + // on the server is similarly blocked then the TCP network buffers may + // all get full and no outbound messages move and we forever block + // on the queue. + // However, in practice this can't really happen... or at least it's + // the sign of other really bad things. + // First, currently the server-side outbound queues are all unbounded and + // so won't ever block the handling of messages if the outbound channel is full. + // Second, there would have to be a huge amount of data backlog for this + // to ever occur anyway. + // Third, it's a sign of a really poor architecture if 16,000 messages + // can go out in a way that blocks reads. + + writer = new WriterThread(); + writer.start(); + } + + public void close() + { + go.set(false); + + // Kill the writer service + writer.shutdown(); + + if( connector.isConnected() ) + { + // Kill the connector + connector.close(); + } + } + + protected void dispatch( Message m ) + { + dispatcher.messageReceived( null, m ); + } + + public void write( ByteBuffer data ) + { + try { + outbound.put( data ); + } catch( InterruptedException e ) { + throw new RuntimeException( "Interrupted while waiting for queue to drain", e ); + } + } + + protected void handleError( Exception e ) + { + if( !go.get() ) + return; + + errorHandler.handleError( this, e ); + } + + public void run() + { + MessageProtocol protocol = new MessageProtocol(); + + try { + while( go.get() ) { + ByteBuffer buffer = connector.read(); + if( buffer == null ) { + if( go.get() ) { + throw new ConnectorException( "Connector closed." ); + } else { + // Just dump out because a null buffer is expected + // from a closed/closing connector + break; + } + } + + protocol.addBuffer( buffer ); + + Message m = null; + while( (m = protocol.getMessage()) != null ) { + m.setReliable( reliable ); + dispatch( m ); + } + } + } catch( Exception e ) { + handleError( e ); + } + } + + protected class WriterThread extends Thread + { + public WriterThread() + { + super( String.valueOf(connector) + "-writer" ); + } + + public void shutdown() + { + interrupt(); + } + + private void write( ByteBuffer data ) + { + try { + connector.write(data); + } catch( Exception e ) { + handleError( e ); + } + } + + public void run() + { + while( go.get() ) { + try { + ByteBuffer data = outbound.take(); + write(data); + } catch( InterruptedException e ) { + if( !go.get() ) + return; + throw new RuntimeException( "Interrupted waiting for data", e ); + } + } + } + } +} diff --git a/engine/src/networking/com/jme3/network/base/ConnectorFactory.java b/engine/src/networking/com/jme3/network/base/ConnectorFactory.java new file mode 100644 index 0000000..d9bb700 --- /dev/null +++ b/engine/src/networking/com/jme3/network/base/ConnectorFactory.java @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.base; + +import com.jme3.network.kernel.Connector; +import java.io.IOException; + + +/** + * Creates Connectors for a specific host. + * + * @version $Revision: 8938 $ + * @author Paul Speed + */ +public interface ConnectorFactory +{ + public Connector createConnector( int channel, int port ) throws IOException; +} diff --git a/engine/src/networking/com/jme3/network/base/DefaultClient.java b/engine/src/networking/com/jme3/network/base/DefaultClient.java new file mode 100644 index 0000000..e03c3e4 --- /dev/null +++ b/engine/src/networking/com/jme3/network/base/DefaultClient.java @@ -0,0 +1,428 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.base; + +import com.jme3.network.ClientStateListener.DisconnectInfo; +import com.jme3.network.*; +import com.jme3.network.kernel.Connector; +import com.jme3.network.message.ChannelInfoMessage; +import com.jme3.network.message.ClientRegistrationMessage; +import com.jme3.network.message.DisconnectMessage; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A default implementation of the Client interface that delegates + * its network connectivity to a kernel.Connector. + * + * @version $Revision: 8938 $ + * @author Paul Speed + */ +public class DefaultClient implements Client +{ + static Logger log = Logger.getLogger(DefaultClient.class.getName()); + + // First two channels are reserved for reliable and + // unreliable. Note: channels are endpoint specific so these + // constants and the handling need not have anything to do with + // the same constants in DefaultServer... which is why they are + // separate. + private static final int CH_RELIABLE = 0; + private static final int CH_UNRELIABLE = 1; + private static final int CH_FIRST = 2; + + private ThreadLocal<ByteBuffer> dataBuffer = new ThreadLocal<ByteBuffer>(); + + private int id = -1; + private boolean isRunning = false; + private CountDownLatch connecting = new CountDownLatch(1); + private String gameName; + private int version; + private MessageListenerRegistry<Client> messageListeners = new MessageListenerRegistry<Client>(); + private List<ClientStateListener> stateListeners = new CopyOnWriteArrayList<ClientStateListener>(); + private List<ErrorListener<? super Client>> errorListeners = new CopyOnWriteArrayList<ErrorListener<? super Client>>(); + private Redispatch dispatcher = new Redispatch(); + private List<ConnectorAdapter> channels = new ArrayList<ConnectorAdapter>(); + + private ConnectorFactory connectorFactory; + + public DefaultClient( String gameName, int version ) + { + this.gameName = gameName; + this.version = version; + } + + public DefaultClient( String gameName, int version, Connector reliable, Connector fast, + ConnectorFactory connectorFactory ) + { + this( gameName, version ); + setPrimaryConnectors( reliable, fast, connectorFactory ); + } + + protected void setPrimaryConnectors( Connector reliable, Connector fast, ConnectorFactory connectorFactory ) + { + if( reliable == null ) + throw new IllegalArgumentException( "The reliable connector cannot be null." ); + if( isRunning ) + throw new IllegalStateException( "Client is already started." ); + if( !channels.isEmpty() ) + throw new IllegalStateException( "Channels already exist." ); + + this.connectorFactory = connectorFactory; + channels.add(new ConnectorAdapter(reliable, dispatcher, dispatcher, true)); + if( fast != null ) { + channels.add(new ConnectorAdapter(fast, dispatcher, dispatcher, false)); + } else { + // Add the null adapter to keep the indexes right + channels.add(null); + } + } + + protected void checkRunning() + { + if( !isRunning ) + throw new IllegalStateException( "Client is not started." ); + } + + public void start() + { + if( isRunning ) + throw new IllegalStateException( "Client is already started." ); + + // Start up the threads and stuff for the + // connectors that we have + for( ConnectorAdapter ca : channels ) { + if( ca == null ) + continue; + ca.start(); + } + + // Send our connection message with a generated ID until + // we get one back from the server. We'll hash time in + // millis and time in nanos. + // This is used to match the TCP and UDP endpoints up on the + // other end since they may take different routes to get there. + // Behind NAT, many game clients may be coming over the same + // IP address from the server's perspective and they may have + // their UDP ports mapped all over the place. + // + // Since currentTimeMillis() is absolute time and nano time + // is roughtly related to system start time, adding these two + // together should be plenty unique for our purposes. It wouldn't + // hurt to reconcile with IP on the server side, though. + long tempId = System.currentTimeMillis() + System.nanoTime(); + + // Set it true here so we can send some messages. + isRunning = true; + + ClientRegistrationMessage reg; + reg = new ClientRegistrationMessage(); + reg.setId(tempId); + reg.setGameName(getGameName()); + reg.setVersion(getVersion()); + reg.setReliable(true); + send(CH_RELIABLE, reg, false); + + // Send registration messages to any other configured + // connectors + reg = new ClientRegistrationMessage(); + reg.setId(tempId); + reg.setReliable(false); + for( int ch = CH_UNRELIABLE; ch < channels.size(); ch++ ) { + if( channels.get(ch) == null ) + continue; + send(ch, reg, false); + } + } + + protected void waitForConnected() + { + if( isConnected() ) + return; + + try { + connecting.await(); + } catch( InterruptedException e ) { + throw new RuntimeException( "Interrupted waiting for connect", e ); + } + } + + public boolean isConnected() + { + return id != -1 && isRunning; + } + + public int getId() + { + return id; + } + + public String getGameName() + { + return gameName; + } + + public int getVersion() + { + return version; + } + + public void send( Message message ) + { + if( message.isReliable() || channels.get(CH_UNRELIABLE) == null ) { + send(CH_RELIABLE, message, true); + } else { + send(CH_UNRELIABLE, message, true); + } + } + + public void send( int channel, Message message ) + { + if( channel < 0 || channel + CH_FIRST >= channels.size() ) + throw new IllegalArgumentException( "Channel is undefined:" + channel ); + send( channel + CH_FIRST, message, true ); + } + + protected void send( int channel, Message message, boolean waitForConnected ) + { + checkRunning(); + + if( waitForConnected ) { + // Make sure we aren't still connecting + waitForConnected(); + } + + ByteBuffer buffer = dataBuffer.get(); + if( buffer == null ) { + buffer = ByteBuffer.allocate( 65536 + 2 ); + dataBuffer.set(buffer); + } + buffer.clear(); + + // Convert the message to bytes + buffer = MessageProtocol.messageToBuffer(message, buffer); + + // Since we share the buffer between invocations, we will need to + // copy this message's part out of it. This is because we actually + // do the send on a background thread. + byte[] temp = new byte[buffer.remaining()]; + System.arraycopy(buffer.array(), buffer.position(), temp, 0, buffer.remaining()); + buffer = ByteBuffer.wrap(temp); + + channels.get(channel).write(buffer); + } + + public void close() + { + checkRunning(); + + closeConnections( null ); + } + + protected void closeConnections( DisconnectInfo info ) + { + if( !isRunning ) + return; + + // Send a close message + + // Tell the thread it's ok to die + for( ConnectorAdapter ca : channels ) { + if( ca == null ) + continue; + ca.close(); + } + + // Wait for the threads? + + // Just in case we never fully connected + connecting.countDown(); + + fireDisconnected(info); + + isRunning = false; + } + + public void addClientStateListener( ClientStateListener listener ) + { + stateListeners.add( listener ); + } + + public void removeClientStateListener( ClientStateListener listener ) + { + stateListeners.remove( listener ); + } + + public void addMessageListener( MessageListener<? super Client> listener ) + { + messageListeners.addMessageListener( listener ); + } + + public void addMessageListener( MessageListener<? super Client> listener, Class... classes ) + { + messageListeners.addMessageListener( listener, classes ); + } + + public void removeMessageListener( MessageListener<? super Client> listener ) + { + messageListeners.removeMessageListener( listener ); + } + + public void removeMessageListener( MessageListener<? super Client> listener, Class... classes ) + { + messageListeners.removeMessageListener( listener, classes ); + } + + public void addErrorListener( ErrorListener<? super Client> listener ) + { + errorListeners.add( listener ); + } + + public void removeErrorListener( ErrorListener<? super Client> listener ) + { + errorListeners.remove( listener ); + } + + protected void fireConnected() + { + for( ClientStateListener l : stateListeners ) { + l.clientConnected( this ); + } + } + + protected void fireDisconnected( DisconnectInfo info ) + { + for( ClientStateListener l : stateListeners ) { + l.clientDisconnected( this, info ); + } + } + + /** + * Either calls the ErrorListener or closes the connection + * if there are no listeners. + */ + protected void handleError( Throwable t ) + { + // If there are no listeners then close the connection with + // a reason + if( errorListeners.isEmpty() ) { + log.log( Level.SEVERE, "Termining connection due to unhandled error", t ); + DisconnectInfo info = new DisconnectInfo(); + info.reason = "Connection Error"; + info.error = t; + closeConnections(info); + return; + } + + for( ErrorListener l : errorListeners ) { + l.handleError( this, t ); + } + } + + protected void configureChannels( long tempId, int[] ports ) { + + try { + for( int i = 0; i < ports.length; i++ ) { + Connector c = connectorFactory.createConnector( i, ports[i] ); + ConnectorAdapter ca = new ConnectorAdapter(c, dispatcher, dispatcher, true); + int ch = channels.size(); + channels.add( ca ); + + // Need to send the connection its hook-up registration + // and start it. + ca.start(); + ClientRegistrationMessage reg; + reg = new ClientRegistrationMessage(); + reg.setId(tempId); + reg.setReliable(true); + send( ch, reg, false ); + } + } catch( IOException e ) { + throw new RuntimeException( "Error configuring channels", e ); + } + } + + protected void dispatch( Message m ) + { + // Pull off the connection management messages we're + // interested in and then pass on the rest. + if( m instanceof ClientRegistrationMessage ) { + // Then we've gotten our real id + this.id = (int)((ClientRegistrationMessage)m).getId(); + log.log( Level.INFO, "Connection established, id:{0}.", this.id ); + connecting.countDown(); + fireConnected(); + return; + } else if( m instanceof ChannelInfoMessage ) { + // This is an interum step in the connection process and + // now we need to add a bunch of connections + configureChannels( ((ChannelInfoMessage)m).getId(), ((ChannelInfoMessage)m).getPorts() ); + return; + } else if( m instanceof DisconnectMessage ) { + // Can't do too much else yet + String reason = ((DisconnectMessage)m).getReason(); + log.log( Level.SEVERE, "Connection terminated, reason:{0}.", reason ); + DisconnectInfo info = new DisconnectInfo(); + info.reason = reason; + closeConnections(info); + } + + // Make sure client MessageListeners are called single-threaded + // since it could receive messages from the TCP and UDP + // thread simultaneously. + synchronized( this ) { + messageListeners.messageReceived( this, m ); + } + } + + protected class Redispatch implements MessageListener<Object>, ErrorListener<Object> + { + public void messageReceived( Object source, Message m ) + { + dispatch( m ); + } + + public void handleError( Object source, Throwable t ) + { + // Only doing the DefaultClient.this to make the code + // checker happy... it compiles fine without it but I + // don't like red lines in my editor. :P + DefaultClient.this.handleError( t ); + } + } +} diff --git a/engine/src/networking/com/jme3/network/base/DefaultServer.java b/engine/src/networking/com/jme3/network/base/DefaultServer.java new file mode 100644 index 0000000..28c4c80 --- /dev/null +++ b/engine/src/networking/com/jme3/network/base/DefaultServer.java @@ -0,0 +1,591 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.base; + +import com.jme3.network.*; +import com.jme3.network.kernel.Endpoint; +import com.jme3.network.kernel.Kernel; +import com.jme3.network.message.ChannelInfoMessage; +import com.jme3.network.message.ClientRegistrationMessage; +import com.jme3.network.message.DisconnectMessage; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A default implementation of the Server interface that delegates + * its network connectivity to kernel.Kernel. + * + * @version $Revision: 9114 $ + * @author Paul Speed + */ +public class DefaultServer implements Server +{ + static Logger log = Logger.getLogger(DefaultServer.class.getName()); + + // First two channels are reserved for reliable and + // unreliable + private static final int CH_RELIABLE = 0; + private static final int CH_UNRELIABLE = 1; + private static final int CH_FIRST = 2; + + private boolean isRunning = false; + private AtomicInteger nextId = new AtomicInteger(0); + private String gameName; + private int version; + private KernelFactory kernelFactory = KernelFactory.DEFAULT; + private KernelAdapter reliableAdapter; + private KernelAdapter fastAdapter; + private List<KernelAdapter> channels = new ArrayList<KernelAdapter>(); + private List<Integer> alternatePorts = new ArrayList<Integer>(); + private Redispatch dispatcher = new Redispatch(); + private Map<Integer,HostedConnection> connections = new ConcurrentHashMap<Integer,HostedConnection>(); + private Map<Endpoint,HostedConnection> endpointConnections + = new ConcurrentHashMap<Endpoint,HostedConnection>(); + + // Keeps track of clients for whom we've only received the UDP + // registration message + private Map<Long,Connection> connecting = new ConcurrentHashMap<Long,Connection>(); + + private MessageListenerRegistry<HostedConnection> messageListeners + = new MessageListenerRegistry<HostedConnection>(); + private List<ConnectionListener> connectionListeners = new CopyOnWriteArrayList<ConnectionListener>(); + + public DefaultServer( String gameName, int version, Kernel reliable, Kernel fast ) + { + if( reliable == null ) + throw new IllegalArgumentException( "Default server reqiures a reliable kernel instance." ); + + this.gameName = gameName; + this.version = version; + + reliableAdapter = new KernelAdapter( this, reliable, dispatcher, true ); + channels.add( reliableAdapter ); + if( fast != null ) { + fastAdapter = new KernelAdapter( this, fast, dispatcher, false ); + channels.add( fastAdapter ); + } + } + + public String getGameName() + { + return gameName; + } + + public int getVersion() + { + return version; + } + + public int addChannel( int port ) + { + if( isRunning ) + throw new IllegalStateException( "Channels cannot be added once server is started." ); + + // Note: it does bug me that channels aren't 100% universal and + // setup externally but it requires a more invasive set of changes + // for "connection types" and some kind of registry of kernel and + // connector factories. This really would be the best approach and + // would allow all kinds of channel customization maybe... but for + // now, we hard-code the standard connections and treat the +2 extras + // differently. + + // Check for consistency with the channels list + if( channels.size() - CH_FIRST != alternatePorts.size() ) + throw new IllegalStateException( "Channel and port lists do not match." ); + + try { + int result = alternatePorts.size(); + alternatePorts.add(port); + + Kernel kernel = kernelFactory.createKernel(result, port); + channels.add( new KernelAdapter(this, kernel, dispatcher, true) ); + + return result; + } catch( IOException e ) { + throw new RuntimeException( "Error adding channel for port:" + port, e ); + } + } + + protected void checkChannel( int channel ) + { + if( channel < 0 || channel >= alternatePorts.size() ) + throw new IllegalArgumentException( "Channel is undefined:" + channel ); + } + + public void start() + { + if( isRunning ) + throw new IllegalStateException( "Server is already started." ); + + // Initialize the kernels + for( KernelAdapter ka : channels ) { + ka.initialize(); + } + + // Start em up + for( KernelAdapter ka : channels ) { + ka.start(); + } + + isRunning = true; + } + + public boolean isRunning() + { + return isRunning; + } + + public void close() + { + if( !isRunning ) + throw new IllegalStateException( "Server is not started." ); + + try { + // Kill the adpaters, they will kill the kernels + for( KernelAdapter ka : channels ) { + ka.close(); + } + + isRunning = false; + } catch( InterruptedException e ) { + throw new RuntimeException( "Interrupted while closing", e ); + } + } + + public void broadcast( Message message ) + { + broadcast( null, message ); + } + + public void broadcast( Filter<? super HostedConnection> filter, Message message ) + { + if( connections.isEmpty() ) + return; + + ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null); + + FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter); + + if( message.isReliable() || fastAdapter == null ) { + // Don't need to copy the data because message protocol is already + // giving us a fresh buffer + reliableAdapter.broadcast( adapter, buffer, true, false ); + } else { + fastAdapter.broadcast( adapter, buffer, false, false ); + } + } + + public void broadcast( int channel, Filter<? super HostedConnection> filter, Message message ) + { + if( connections.isEmpty() ) + return; + + checkChannel(channel); + + ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null); + + FilterAdapter adapter = filter == null ? null : new FilterAdapter(filter); + + channels.get(channel+CH_FIRST).broadcast( adapter, buffer, true, false ); + } + + public HostedConnection getConnection( int id ) + { + return connections.get(id); + } + + public boolean hasConnections() + { + return !connections.isEmpty(); + } + + public Collection<HostedConnection> getConnections() + { + return Collections.unmodifiableCollection((Collection<HostedConnection>)connections.values()); + } + + public void addConnectionListener( ConnectionListener listener ) + { + connectionListeners.add(listener); + } + + public void removeConnectionListener( ConnectionListener listener ) + { + connectionListeners.remove(listener); + } + + public void addMessageListener( MessageListener<? super HostedConnection> listener ) + { + messageListeners.addMessageListener( listener ); + } + + public void addMessageListener( MessageListener<? super HostedConnection> listener, Class... classes ) + { + messageListeners.addMessageListener( listener, classes ); + } + + public void removeMessageListener( MessageListener<? super HostedConnection> listener ) + { + messageListeners.removeMessageListener( listener ); + } + + public void removeMessageListener( MessageListener<? super HostedConnection> listener, Class... classes ) + { + messageListeners.removeMessageListener( listener, classes ); + } + + protected void dispatch( HostedConnection source, Message m ) + { + if( source == null ) { + messageListeners.messageReceived( source, m ); + } else { + + // A semi-heavy handed way to make sure the listener + // doesn't get called at the same time from two different + // threads for the same hosted connection. + synchronized( source ) { + messageListeners.messageReceived( source, m ); + } + } + } + + protected void fireConnectionAdded( HostedConnection conn ) + { + for( ConnectionListener l : connectionListeners ) { + l.connectionAdded( this, conn ); + } + } + + protected void fireConnectionRemoved( HostedConnection conn ) + { + for( ConnectionListener l : connectionListeners ) { + l.connectionRemoved( this, conn ); + } + } + + protected int getChannel( KernelAdapter ka ) + { + return channels.indexOf(ka); + } + + protected void registerClient( KernelAdapter ka, Endpoint p, ClientRegistrationMessage m ) + { + Connection addedConnection = null; + + // generally this will only be called by one thread but it's + // important enough I won't take chances + synchronized( this ) { + // Grab the random ID that the client created when creating + // its two registration messages + long tempId = m.getId(); + + // See if we already have one + Connection c = connecting.remove(tempId); + if( c == null ) { + c = new Connection(channels.size()); + log.log( Level.FINE, "Registering client for endpoint, pass 1:{0}.", p ); + } else { + log.log( Level.FINE, "Refining client registration for endpoint:{0}.", p ); + } + + // Fill in what we now know + int channel = getChannel(ka); + c.setChannel(channel, p); + log.log( Level.FINE, "Setting up channel:{0}", channel ); + + // If it's channel 0 then this is the initial connection + // and we will send the connection information + if( channel == CH_RELIABLE ) { + // Validate the name and version which is only sent + // over the reliable connection at this point. + if( !getGameName().equals(m.getGameName()) + || getVersion() != m.getVersion() ) { + + log.log( Level.INFO, "Kicking client due to name/version mismatch:{0}.", c ); + + // Need to kick them off... I may regret doing this from within + // the sync block but the alternative is more code + c.close( "Server client mismatch, server:" + getGameName() + " v" + getVersion() + + " client:" + m.getGameName() + " v" + m.getVersion() ); + return; + } + + // Else send the extra channel information to the client + if( !alternatePorts.isEmpty() ) { + ChannelInfoMessage cim = new ChannelInfoMessage( m.getId(), alternatePorts ); + c.send(cim); + } + } + + if( c.isComplete() ) { + // Then we are fully connected + if( connections.put( c.getId(), c ) == null ) { + + for( Endpoint cp : c.channels ) { + if( cp == null ) + continue; + endpointConnections.put( cp, c ); + } + + addedConnection = c; + } + } else { + // Need to keep getting channels so we'll keep it in + // the map + connecting.put(tempId, c); + } + } + + // Best to do this outside of the synch block to avoid + // over synchronizing which is the path to deadlocks + if( addedConnection != null ) { + log.log( Level.INFO, "Client registered:{0}.", addedConnection ); + + // Send the ID back to the client letting it know it's + // fully connected. + m = new ClientRegistrationMessage(); + m.setId( addedConnection.getId() ); + m.setReliable(true); + addedConnection.send(m); + + // Now we can notify the listeners about the + // new connection. + fireConnectionAdded( addedConnection ); + } + } + + protected HostedConnection getConnection( Endpoint endpoint ) + { + return endpointConnections.get(endpoint); + } + + protected void connectionClosed( Endpoint p ) + { + if( p.isConnected() ) { + log.log( Level.INFO, "Connection closed:{0}.", p ); + } else { + log.log( Level.FINE, "Connection closed:{0}.", p ); + } + + // Try to find the endpoint in all ways that it might + // exist. Note: by this point the raw network channel is + // closed already. + + // Also note: this method will be called multiple times per + // HostedConnection if it has multiple endpoints. + + Connection removed = null; + synchronized( this ) { + // Just in case the endpoint was still connecting + connecting.values().remove(p); + + // And the regular management + removed = (Connection)endpointConnections.remove(p); + if( removed != null ) { + connections.remove( removed.getId() ); + } + + log.log( Level.FINE, "Connections size:{0}", connections.size() ); + log.log( Level.FINE, "Endpoint mappings size:{0}", endpointConnections.size() ); + } + + // Better not to fire events while we hold a lock + // so always do this outside the synch block. + // Note: checking removed.closed just to avoid spurious log messages + // since in general we are called back for every endpoint closing. + if( removed != null && !removed.closed ) { + + log.log( Level.INFO, "Client closed:{0}.", removed ); + + removed.closeConnection(); + } + } + + protected class Connection implements HostedConnection + { + private int id; + private boolean closed; + private Endpoint[] channels; + private int setChannelCount = 0; + + private Map<String,Object> sessionData = new ConcurrentHashMap<String,Object>(); + + public Connection( int channelCount ) + { + id = nextId.getAndIncrement(); + channels = new Endpoint[channelCount]; + } + + void setChannel( int channel, Endpoint p ) + { + if( channels[channel] != null && channels[channel] != p ) { + throw new RuntimeException( "Channel has already been set:" + channel + + " = " + channels[channel] + ", cannot be set to:" + p ); + } + channels[channel] = p; + if( p != null ) + setChannelCount++; + } + + boolean isComplete() + { + return setChannelCount == channels.length; + } + + public Server getServer() + { + return DefaultServer.this; + } + + public int getId() + { + return id; + } + + public String getAddress() + { + return channels[CH_RELIABLE] == null ? null : channels[CH_RELIABLE].getAddress(); + } + + public void send( Message message ) + { + ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null); + if( message.isReliable() || channels[CH_UNRELIABLE] == null ) { + channels[CH_RELIABLE].send( buffer ); + } else { + channels[CH_UNRELIABLE].send( buffer ); + } + } + + public void send( int channel, Message message ) + { + checkChannel(channel); + ByteBuffer buffer = MessageProtocol.messageToBuffer(message, null); + channels[channel+CH_FIRST].send(buffer); + } + + protected void closeConnection() + { + if( closed ) + return; + closed = true; + + // Make sure all endpoints are closed. Note: reliable + // should always already be closed through all paths that I + // can conceive... but it doesn't hurt to be sure. + for( Endpoint p : channels ) { + if( p == null ) + continue; + p.close(); + } + + fireConnectionRemoved( this ); + } + + public void close( String reason ) + { + // Send a reason + DisconnectMessage m = new DisconnectMessage(); + m.setType( DisconnectMessage.KICK ); + m.setReason( reason ); + m.setReliable( true ); + send( m ); + + // Just close the reliable endpoint + // fast will be cleaned up as a side-effect + // when closeConnection() is called by the + // connectionClosed() endpoint callback. + if( channels[CH_RELIABLE] != null ) { + // Close with flush so we make sure our + // message gets out + channels[CH_RELIABLE].close(true); + } + } + + public Object setAttribute( String name, Object value ) + { + if( value == null ) + return sessionData.remove(name); + return sessionData.put(name, value); + } + + @SuppressWarnings("unchecked") + public <T> T getAttribute( String name ) + { + return (T)sessionData.get(name); + } + + public Set<String> attributeNames() + { + return Collections.unmodifiableSet(sessionData.keySet()); + } + + public String toString() + { + return "Connection[ id=" + id + ", reliable=" + channels[CH_RELIABLE] + + ", fast=" + channels[CH_UNRELIABLE] + " ]"; + } + } + + protected class Redispatch implements MessageListener<HostedConnection> + { + public void messageReceived( HostedConnection source, Message m ) + { + dispatch( source, m ); + } + } + + protected class FilterAdapter implements Filter<Endpoint> + { + private Filter<? super HostedConnection> delegate; + + public FilterAdapter( Filter<? super HostedConnection> delegate ) + { + this.delegate = delegate; + } + + public boolean apply( Endpoint input ) + { + HostedConnection conn = getConnection( input ); + if( conn == null ) + return false; + return delegate.apply(conn); + } + } +} diff --git a/engine/src/networking/com/jme3/network/base/KernelAdapter.java b/engine/src/networking/com/jme3/network/base/KernelAdapter.java new file mode 100644 index 0000000..8477aad --- /dev/null +++ b/engine/src/networking/com/jme3/network/base/KernelAdapter.java @@ -0,0 +1,296 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.base; + +import com.jme3.network.Filter; +import com.jme3.network.HostedConnection; +import com.jme3.network.Message; +import com.jme3.network.MessageListener; +import com.jme3.network.kernel.Endpoint; +import com.jme3.network.kernel.EndpointEvent; +import com.jme3.network.kernel.Envelope; +import com.jme3.network.kernel.Kernel; +import com.jme3.network.message.ClientRegistrationMessage; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Wraps a single Kernel and forwards new messages + * to the supplied message dispatcher and new endpoint + * events to the connection dispatcher. This is used + * by DefaultServer to manage its kernel objects. + * + * <p>This adapter assumes a simple protocol where two + * bytes define a (short) object size with the object data + * to follow. Note: this limits the size of serialized + * objects to 32676 bytes... even though, for example, + * datagram packets can hold twice that. :P</p> + * + * @version $Revision: 8944 $ + * @author Paul Speed + */ +public class KernelAdapter extends Thread +{ + static Logger log = Logger.getLogger(KernelAdapter.class.getName()); + + private DefaultServer server; // this is unfortunate + private Kernel kernel; + private MessageListener<HostedConnection> messageDispatcher; + private AtomicBoolean go = new AtomicBoolean(true); + + // Keeps track of the in-progress messages that are received + // on reliable connections + private Map<Endpoint, MessageProtocol> messageBuffers = new ConcurrentHashMap<Endpoint,MessageProtocol>(); + + // Marks the messages as reliable or not if they came + // through this connector. + private boolean reliable; + + public KernelAdapter( DefaultServer server, Kernel kernel, MessageListener<HostedConnection> messageDispatcher, + boolean reliable ) + { + super( String.valueOf(kernel) ); + this.server = server; + this.kernel = kernel; + this.messageDispatcher = messageDispatcher; + this.reliable = reliable; + setDaemon(true); + } + + public Kernel getKernel() + { + return kernel; + } + + public void initialize() + { + kernel.initialize(); + } + + public void broadcast( Filter<? super Endpoint> filter, ByteBuffer data, boolean reliable, + boolean copy ) + { + kernel.broadcast( filter, data, reliable, copy ); + } + + public void close() throws InterruptedException + { + go.set(false); + + // Kill the kernel + kernel.terminate(); + } + + protected void reportError( Endpoint p, Object context, Exception e ) + { + // Should really be queued up so the outer thread can + // retrieve them. For now we'll just log it. FIXME + log.log( Level.SEVERE, "Unhandled error, endpoint:" + p + ", context:" + context, e ); + + // In lieu of other options, at least close the endpoint + p.close(); + } + + protected HostedConnection getConnection( Endpoint p ) + { + return server.getConnection(p); + } + + protected void connectionClosed( Endpoint p ) + { + // Remove any message buffer we've been accumulating + // on behalf of this endpoing + messageBuffers.remove(p); + + log.log( Level.FINE, "Buffers size:{0}", messageBuffers.size() ); + + server.connectionClosed(p); + } + + /** + * Note on threading for those writing their own server + * or adapter implementations. The rule that a single connection be + * processed by only one thread at a time is more about ensuring that + * the messages are delivered in the order that they are received + * than for any user-code safety. 99% of the time the user code should + * be writing for multithreaded access anyway. + * + * <p>The issue with the messages is that if a an implementation is + * using a general thread pool then it would be possible for a + * naive implementation to have one thread grab an Envelope from + * connection 1's and another grab the next Envelope. Since an Envelope + * may contain several messages, delivering the second thread's messages + * before or during the first's would be really confusing and hard + * to code for in user code.</p> + * + * <p>And that's why this note is here. DefaultServer does a rudimentary + * per-connection locking but it couldn't possibly guard against + * out of order Envelope processing.</p> + */ + protected void dispatch( Endpoint p, Message m ) + { + // Because this class is the only one with the information + // to do it... we need to pull of the registration message + // here. + if( m instanceof ClientRegistrationMessage ) { + server.registerClient( this, p, (ClientRegistrationMessage)m ); + return; + } + + try { + HostedConnection source = getConnection(p); + if( source == null ) { + if( reliable ) { + // If it's a reliable connection then it's slightly more + // concerning but this can happen all the time for a UDP endpoint. + log.log( Level.WARNING, "Recieved message from unconnected endpoint:" + p + " message:" + m ); + } + return; + } + messageDispatcher.messageReceived( source, m ); + } catch( Exception e ) { + reportError(p, m, e); + } + } + + protected MessageProtocol getMessageBuffer( Endpoint p ) + { + if( !reliable ) { + // Since UDP comes in packets and they aren't split + // up, there is no reason to buffer. In fact, there would + // be a down side because there is no way for us to reliably + // clean these up later since we'd create another one for + // any random UDP packet that comes to the port. + return new MessageProtocol(); + } else { + // See if we already have one + MessageProtocol result = messageBuffers.get(p); + if( result == null ) { + result = new MessageProtocol(); + messageBuffers.put(p, result); + } + return result; + } + } + + protected void createAndDispatch( Envelope env ) + { + MessageProtocol protocol = getMessageBuffer(env.getSource()); + + byte[] data = env.getData(); + ByteBuffer buffer = ByteBuffer.wrap(data); + + int count = protocol.addBuffer( buffer ); + if( count == 0 ) { + // This can happen if there was only a partial message + // received. However, this should never happen for unreliable + // connections. + if( !reliable ) { + // Log some additional information about the packet. + int len = Math.min( 10, data.length ); + StringBuilder sb = new StringBuilder(); + for( int i = 0; i < len; i++ ) { + sb.append( "[" + Integer.toHexString(data[i]) + "]" ); + } + log.log( Level.INFO, "First 10 bytes of incomplete nessage:" + sb ); + throw new RuntimeException( "Envelope contained incomplete data:" + env ); + } + } + + // Should be complete... and maybe we should check but we don't + Message m = null; + while( (m = protocol.getMessage()) != null ) { + m.setReliable(reliable); + dispatch( env.getSource(), m ); + } + } + + protected void createAndDispatch( EndpointEvent event ) + { + // Only need to tell the server about disconnects + if( event.getType() == EndpointEvent.Type.REMOVE ) { + connectionClosed( event.getEndpoint() ); + } + } + + protected void flushEvents() + { + EndpointEvent event; + while( (event = kernel.nextEvent()) != null ) { + try { + createAndDispatch( event ); + } catch( Exception e ) { + reportError(event.getEndpoint(), event, e); + } + } + } + + public void run() + { + while( go.get() ) { + + try { + // Check for pending events + flushEvents(); + + // Grab the next envelope + Envelope e = kernel.read(); + if( e == Kernel.EVENTS_PENDING ) + continue; // We'll catch it up above + + // Check for pending events that might have + // come in while we were blocking. This is usually + // when the connection add events come through + flushEvents(); + + try { + createAndDispatch( e ); + } catch( Exception ex ) { + reportError(e.getSource(), e, ex); + } + + } catch( InterruptedException ex ) { + if( !go.get() ) + return; + throw new RuntimeException( "Unexpected interruption", ex ); + } + } + } + +} + + diff --git a/engine/src/networking/com/jme3/network/base/KernelFactory.java b/engine/src/networking/com/jme3/network/base/KernelFactory.java new file mode 100644 index 0000000..0074d39 --- /dev/null +++ b/engine/src/networking/com/jme3/network/base/KernelFactory.java @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.base; + +import com.jme3.network.kernel.Kernel; +import java.io.IOException; + + +/** + * Supplied to the DefaultServer to create any additional + * channel kernels that might be required. + * + * @version $Revision: 8938 $ + * @author Paul Speed + */ +public interface KernelFactory +{ + public static final KernelFactory DEFAULT = new NioKernelFactory(); + + public Kernel createKernel( int channel, int port ) throws IOException; +} diff --git a/engine/src/networking/com/jme3/network/base/MessageListenerRegistry.java b/engine/src/networking/com/jme3/network/base/MessageListenerRegistry.java new file mode 100644 index 0000000..c861786 --- /dev/null +++ b/engine/src/networking/com/jme3/network/base/MessageListenerRegistry.java @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.base; + +import com.jme3.network.Message; +import com.jme3.network.MessageListener; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * Keeps track of message listeners registered to specific + * types or to any type. + * + * @version $Revision: 8843 $ + * @author Paul Speed + */ +public class MessageListenerRegistry<S> implements MessageListener<S> +{ + static Logger log = Logger.getLogger(MessageListenerRegistry.class.getName()); + + private List<MessageListener<? super S>> listeners = new CopyOnWriteArrayList<MessageListener<? super S>>(); + private Map<Class,List<MessageListener<? super S>>> typeListeners + = new ConcurrentHashMap<Class,List<MessageListener<? super S>>>(); + + public MessageListenerRegistry() + { + } + + public void messageReceived( S source, Message m ) + { + boolean delivered = false; + + for( MessageListener<? super S> l : listeners ) { + l.messageReceived( source, m ); + delivered = true; + } + + for( MessageListener<? super S> l : getListeners(m.getClass(),false) ) { + l.messageReceived( source, m ); + delivered = true; + } + + if( !delivered ) { + log.log( Level.INFO, "Received message had no registered listeners: {0}", m ); + } + } + + protected List<MessageListener<? super S>> getListeners( Class c, boolean create ) + { + List<MessageListener<? super S>> result = typeListeners.get(c); + if( result == null && create ) { + result = new CopyOnWriteArrayList<MessageListener<? super S>>(); + typeListeners.put( c, result ); + } + + if( result == null ) { + result = Collections.emptyList(); + } + return result; + } + + public void addMessageListener( MessageListener<? super S> listener ) + { + listeners.add(listener); + } + + public void removeMessageListener( MessageListener<? super S> listener ) + { + listeners.remove(listener); + } + + public void addMessageListener( MessageListener<? super S> listener, Class... classes ) + { + for( Class c : classes ) { + getListeners(c, true).add(listener); + } + } + + public void removeMessageListener( MessageListener<? super S> listener, Class... classes ) + { + for( Class c : classes ) { + getListeners(c, false).remove(listener); + } + } +} diff --git a/engine/src/networking/com/jme3/network/base/MessageProtocol.java b/engine/src/networking/com/jme3/network/base/MessageProtocol.java new file mode 100644 index 0000000..eb22302 --- /dev/null +++ b/engine/src/networking/com/jme3/network/base/MessageProtocol.java @@ -0,0 +1,191 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.base; + +import com.jme3.network.Message; +import com.jme3.network.serializing.Serializer; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.LinkedList; + +/** + * Consolidates the conversion of messages to/from byte buffers + * and provides a rolling message buffer. ByteBuffers can be + * pushed in and messages will be extracted, accumulated, and + * available for retrieval. This is not thread safe and is meant + * to be used within a single message processing thread. + * + * <p>The protocol is based on a simple length + data format + * where two bytes represent the (short) length of the data + * and the rest is the raw data for the Serializers class.</p> + * + * @version $Revision: 8843 $ + * @author Paul Speed + */ +public class MessageProtocol +{ + private LinkedList<Message> messages = new LinkedList<Message>(); + private ByteBuffer current; + private int size; + private Byte carry; + + /** + * Converts a message to a ByteBuffer using the Serializer + * and the (short length) + data protocol. If target is null + * then a 32k byte buffer will be created and filled. + */ + public static ByteBuffer messageToBuffer( Message message, ByteBuffer target ) + { + // Could let the caller pass their own in + ByteBuffer buffer = target == null ? ByteBuffer.allocate( 32767 + 2 ) : target; + + try { + buffer.position( 2 ); + Serializer.writeClassAndObject( buffer, message ); + buffer.flip(); + short dataLength = (short)(buffer.remaining() - 2); + buffer.putShort( dataLength ); + buffer.position( 0 ); + + return buffer; + } catch( IOException e ) { + throw new RuntimeException( "Error serializing message", e ); + } + } + + /** + * Retrieves and removes an extracted message from the accumulated buffer + * or returns null if there are no more messages. + */ + public Message getMessage() + { + if( messages.isEmpty() ) { + return null; + } + + return messages.removeFirst(); + } + + /** + * Adds the specified buffer, extracting the contained messages + * and making them available to getMessage(). The left over + * data is buffered to be combined with future data. + & + * @return The total number of queued messages after this call. + */ + public int addBuffer( ByteBuffer buffer ) + { + // push the data from the buffer into as + // many messages as we can + while( buffer.remaining() > 0 ) { + + if( current == null ) { + + // If we have a left over carry then we need to + // do manual processing to get the short value + if( carry != null ) { + byte high = carry; + byte low = buffer.get(); + + size = (high & 0xff) << 8 | (low & 0xff); + carry = null; + } + else if( buffer.remaining() < 2 ) { + // It's possible that the supplied buffer only has one + // byte in it... and in that case we will get an underflow + // when attempting to read the short below. + + // It has to be 1 or we'd never get here... but one + // isn't enough so we stash it away. + carry = buffer.get(); + break; + } else { + // We are not currently reading an object so + // grab the size. + // Note: this is somewhat limiting... int would + // be better. + size = buffer.getShort(); + } + + // Allocate the buffer into which we'll feed the + // data as we get it + current = ByteBuffer.allocate(size); + } + + if( current.remaining() <= buffer.remaining() ) { + // We have at least one complete object so + // copy what we can into current, create a message, + // and then continue pulling from buffer. + + // Artificially set the limit so we don't overflow + int extra = buffer.remaining() - current.remaining(); + buffer.limit( buffer.position() + current.remaining() ); + + // Now copy the data + current.put( buffer ); + current.flip(); + + // Now set the limit back to a good value + buffer.limit( buffer.position() + extra ); + + createMessage( current ); + + current = null; + } else { + + // Not yet a complete object so just copy what we have + current.put( buffer ); + } + } + + return messages.size(); + } + + /** + * Creates a message from the properly sized byte buffer + * and adds it to the messages queue. + */ + protected void createMessage( ByteBuffer buffer ) + { + try { + Object obj = Serializer.readClassAndObject( buffer ); + Message m = (Message)obj; + messages.add(m); + } catch( IOException e ) { + throw new RuntimeException( "Error deserializing object", e ); + } + } +} + + + diff --git a/engine/src/networking/com/jme3/network/base/NioKernelFactory.java b/engine/src/networking/com/jme3/network/base/NioKernelFactory.java new file mode 100644 index 0000000..c9fab08 --- /dev/null +++ b/engine/src/networking/com/jme3/network/base/NioKernelFactory.java @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.base; + +import com.jme3.network.kernel.Kernel; +import com.jme3.network.kernel.tcp.SelectorKernel; +import java.io.IOException; + + +/** + * KernelFactory implemention for creating TCP kernels + * using the NIO selector model. + * + * @version $Revision: 8938 $ + * @author Paul Speed + */ +public class NioKernelFactory implements KernelFactory +{ + public Kernel createKernel( int channel, int port ) throws IOException + { + return new SelectorKernel(port); + } +} diff --git a/engine/src/networking/com/jme3/network/base/TcpConnectorFactory.java b/engine/src/networking/com/jme3/network/base/TcpConnectorFactory.java new file mode 100644 index 0000000..e53e62d --- /dev/null +++ b/engine/src/networking/com/jme3/network/base/TcpConnectorFactory.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2011 jMonkeyEngine + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * * Neither the name of 'jMonkeyEngine' nor the names of its contributors + * may be used to endorse or promote products derived from this software + * without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR + * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR + * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, + * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, + * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR + * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS + * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package com.jme3.network.base; + +import com.jme3.network.kernel.Connector; +import com.jme3.network.kernel.tcp.SocketConnector; +import java.io.IOException; +import java.net.InetAddress; + + +/** + * Creates TCP connectors to a specific remote address. + * + * @version $Revision: 8938 $ + * @author Paul Speed + */ +public class TcpConnectorFactory implements ConnectorFactory +{ + private InetAddress remoteAddress; + + public TcpConnectorFactory( InetAddress remoteAddress ) + { + this.remoteAddress = remoteAddress; + } + + public Connector createConnector( int channel, int port ) throws IOException + { + return new SocketConnector( remoteAddress, port ); + } +} diff --git a/engine/src/networking/com/jme3/network/base/package.html b/engine/src/networking/com/jme3/network/base/package.html new file mode 100644 index 0000000..a00cb16 --- /dev/null +++ b/engine/src/networking/com/jme3/network/base/package.html @@ -0,0 +1,12 @@ +<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd"> +<html> +<head> +<title></title> +<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"> +</head> +<body> +The base package contains the default implementations for the +{@link com.jme3.network.Client} and {@link com.jme3.network.Server} +interfaces from the public API. +</body> +</html> |