*/
-public class Client extends Connection implements EndPoint {
- static {
- try {
- // Needed for NIO selectors on Android 2.2.
- System.setProperty("java.net.preferIPv6Addresses", "false");
- } catch (AccessControlException ignored) {
- }
- }
-
- private final Serialization serialization;
- private Selector selector;
- private int emptySelects;
- private volatile boolean tcpRegistered, udpRegistered;
- private Object tcpRegistrationLock = new Object();
- private Object udpRegistrationLock = new Object();
- private volatile boolean shutdown;
- private final Object updateLock = new Object();
- private Thread updateThread;
- private int connectTimeout;
- private InetAddress connectHost;
- private int connectTcpPort;
- private int connectUdpPort;
- private boolean isClosed;
- private ClientDiscoveryHandler discoveryHandler;
-
- /** Creates a Client with a write buffer size of 8192 and an object buffer size of 2048. */
- public Client () {
- this(8192, 2048);
- }
-
- /** @param writeBufferSize One buffer of this size is allocated. Objects are serialized to the write buffer where the bytes are
- * queued until they can be written to the TCP socket.
- *
- * Normally the socket is writable and the bytes are written immediately. If the socket cannot be written to and
- * enough serialized objects are queued to overflow the buffer, then the connection will be closed.
- *
- * The write buffer should be sized at least as large as the largest object that will be sent, plus some head room to
- * allow for some serialized objects to be queued in case the buffer is temporarily not writable. The amount of head
- * room needed is dependent upon the size of objects being sent and how often they are sent.
- * @param objectBufferSize One (using only TCP) or three (using both TCP and UDP) buffers of this size are allocated. These
- * buffers are used to hold the bytes for a single object graph until it can be sent over the network or
- * deserialized.
- *
- * The object buffers should be sized at least as large as the largest object that will be sent or received. */
- public Client (int writeBufferSize, int objectBufferSize) {
- this(writeBufferSize, objectBufferSize, new KryoSerialization());
- }
-
- public Client (int writeBufferSize, int objectBufferSize, Serialization serialization) {
- super();
- endPoint = this;
-
- this.serialization = serialization;
-
- this.discoveryHandler = ClientDiscoveryHandler.DEFAULT;
-
- initialize(serialization, writeBufferSize, objectBufferSize);
-
- try {
- selector = Selector.open();
- } catch (IOException ex) {
- throw new RuntimeException("Error opening selector.", ex);
- }
- }
-
- public void setDiscoveryHandler (ClientDiscoveryHandler newDiscoveryHandler) {
- discoveryHandler = newDiscoveryHandler;
- }
-
- public Serialization getSerialization () {
- return serialization;
- }
-
- public Kryo getKryo () {
- return ((KryoSerialization)serialization).getKryo();
- }
-
- /** Opens a TCP only client.
- * @see #connect(int, InetAddress, int, int) */
- public void connect (int timeout, String host, int tcpPort) throws IOException {
- connect(timeout, InetAddress.getByName(host), tcpPort, -1);
- }
-
- /** Opens a TCP and UDP client.
- * @see #connect(int, InetAddress, int, int) */
- public void connect (int timeout, String host, int tcpPort, int udpPort) throws IOException {
- connect(timeout, InetAddress.getByName(host), tcpPort, udpPort);
- }
-
- /** Opens a TCP only client.
- * @see #connect(int, InetAddress, int, int) */
- public void connect (int timeout, InetAddress host, int tcpPort) throws IOException {
- connect(timeout, host, tcpPort, -1);
- }
-
- /** Opens a TCP and UDP client. Blocks until the connection is complete or the timeout is reached.
- *
- * Because the framework must perform some minimal communication before the connection is considered successful,
- * {@link #update(int)} must be called on a separate thread during the connection process.
- * @throws IllegalStateException if called from the connection's update thread.
- * @throws IOException if the client could not be opened or connecting times out. */
- public void connect (int timeout, InetAddress host, int tcpPort, int udpPort) throws IOException {
- if (host == null) throw new IllegalArgumentException("host cannot be null.");
- if (Thread.currentThread() == getUpdateThread())
- throw new IllegalStateException("Cannot connect on the connection's update thread.");
- this.connectTimeout = timeout;
- this.connectHost = host;
- this.connectTcpPort = tcpPort;
- this.connectUdpPort = udpPort;
- close();
- if (INFO) {
- if (udpPort != -1)
- info("kryonet", "Connecting: " + host + ":" + tcpPort + "/" + udpPort);
- else
- info("kryonet", "Connecting: " + host + ":" + tcpPort);
- }
- id = -1;
- try {
- if (udpPort != -1) udp = new UdpConnection(serialization, tcp.readBuffer.capacity());
-
- long endTime;
- synchronized (updateLock) {
- tcpRegistered = false;
- selector.wakeup();
- endTime = System.currentTimeMillis() + timeout;
- tcp.connect(selector, new InetSocketAddress(host, tcpPort), 5000);
- }
-
- // Wait for RegisterTCP.
- synchronized (tcpRegistrationLock) {
- while (!tcpRegistered && System.currentTimeMillis() < endTime) {
- try {
- tcpRegistrationLock.wait(100);
- } catch (InterruptedException ignored) {
- }
- }
- if (!tcpRegistered) {
- throw new SocketTimeoutException("Connected, but timed out during TCP registration.\n"
- + "Note: Client#update must be called in a separate thread during connect.");
- }
- }
-
- if (udpPort != -1) {
- InetSocketAddress udpAddress = new InetSocketAddress(host, udpPort);
- synchronized (updateLock) {
- udpRegistered = false;
- selector.wakeup();
- udp.connect(selector, udpAddress);
- }
-
- // Wait for RegisterUDP reply.
- synchronized (udpRegistrationLock) {
- while (!udpRegistered && System.currentTimeMillis() < endTime) {
- RegisterUDP registerUDP = new RegisterUDP();
- registerUDP.connectionID = id;
- udp.send(this, registerUDP, udpAddress);
- try {
- udpRegistrationLock.wait(100);
- } catch (InterruptedException ignored) {
- }
- }
- if (!udpRegistered)
- throw new SocketTimeoutException("Connected, but timed out during UDP registration: " + host + ":" + udpPort);
- }
- }
- } catch (IOException ex) {
- close();
- throw ex;
- }
- }
-
- /** Calls {@link #connect(int, InetAddress, int, int) connect} with the values last passed to connect.
- * @throws IllegalStateException if connect has never been called. */
- public void reconnect () throws IOException {
- reconnect(connectTimeout);
- }
-
- /** Calls {@link #connect(int, InetAddress, int, int) connect} with the specified timeout and the other values last passed to
- * connect.
- * @throws IllegalStateException if connect has never been called. */
- public void reconnect (int timeout) throws IOException {
- if (connectHost == null) throw new IllegalStateException("This client has never been connected.");
- connect(timeout, connectHost, connectTcpPort, connectUdpPort);
- }
-
- /** Reads or writes any pending data for this client. Multiple threads should not call this method at the same time.
- * @param timeout Wait for up to the specified milliseconds for data to be ready to process. May be zero to return immediately
- * if there is no data to process. */
- public void update (int timeout) throws IOException {
- updateThread = Thread.currentThread();
- synchronized (updateLock) { // Blocks to avoid a select while the selector is used to bind the server connection.
- }
- long startTime = System.currentTimeMillis();
- int select = 0;
- if (timeout > 0) {
- select = selector.select(timeout);
- } else {
- select = selector.selectNow();
- }
- if (select == 0) {
- emptySelects++;
- if (emptySelects == 100) {
- emptySelects = 0;
- // NIO freaks and returns immediately with 0 sometimes, so try to keep from hogging the CPU.
- long elapsedTime = System.currentTimeMillis() - startTime;
- try {
- if (elapsedTime < 25) Thread.sleep(25 - elapsedTime);
- } catch (InterruptedException ex) {
- }
- }
- } else {
- emptySelects = 0;
- isClosed = false;
- Set keys = selector.selectedKeys();
- synchronized (keys) {
- for (Iterator iter = keys.iterator(); iter.hasNext();) {
- keepAlive();
- SelectionKey selectionKey = iter.next();
- iter.remove();
- try {
- int ops = selectionKey.readyOps();
- if ((ops & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
- if (selectionKey.attachment() == tcp) {
- while (true) {
- Object object = tcp.readObject(this);
- if (object == null) break;
- if (!tcpRegistered) {
- if (object instanceof RegisterTCP) {
- id = ((RegisterTCP)object).connectionID;
- synchronized (tcpRegistrationLock) {
- tcpRegistered = true;
- tcpRegistrationLock.notifyAll();
- if (TRACE) trace("kryonet", this + " received TCP: RegisterTCP");
- if (udp == null) setConnected(true);
- }
- if (udp == null) notifyConnected();
- }
- continue;
- }
- if (udp != null && !udpRegistered) {
- if (object instanceof RegisterUDP) {
- synchronized (udpRegistrationLock) {
- udpRegistered = true;
- udpRegistrationLock.notifyAll();
- if (TRACE) trace("kryonet", this + " received UDP: RegisterUDP");
- if (DEBUG) {
- debug("kryonet", "Port " + udp.datagramChannel.socket().getLocalPort()
- + "/UDP connected to: " + udp.connectedAddress);
- }
- setConnected(true);
- }
- notifyConnected();
- }
- continue;
- }
- if (!isConnected) continue;
- if (DEBUG) {
- String objectString = object == null ? "null" : object.getClass().getSimpleName();
- if (!(object instanceof FrameworkMessage)) {
- debug("kryonet", this + " received TCP: " + objectString);
- } else if (TRACE) {
- trace("kryonet", this + " received TCP: " + objectString);
- }
- }
- notifyReceived(object);
- }
- } else {
- if (udp.readFromAddress() == null) continue;
- Object object = udp.readObject(this);
- if (object == null) continue;
- if (DEBUG) {
- String objectString = object == null ? "null" : object.getClass().getSimpleName();
- debug("kryonet", this + " received UDP: " + objectString);
- }
- notifyReceived(object);
- }
- }
- if ((ops & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) tcp.writeOperation();
- } catch (CancelledKeyException ignored) {
- // Connection is closed.
- }
- }
- }
- }
- if (isConnected) {
- long time = System.currentTimeMillis();
- if (tcp.isTimedOut(time)) {
- if (DEBUG) debug("kryonet", this + " timed out.");
- close();
- } else
- keepAlive();
- if (isIdle()) notifyIdle();
- }
- }
-
- void keepAlive () {
- if (!isConnected) return;
- long time = System.currentTimeMillis();
- if (tcp.needsKeepAlive(time)) sendTCP(FrameworkMessage.keepAlive);
- if (udp != null && udpRegistered && udp.needsKeepAlive(time)) sendUDP(FrameworkMessage.keepAlive);
- }
-
- public void run () {
- if (TRACE) trace("kryonet", "Client thread started.");
- shutdown = false;
- while (!shutdown) {
- try {
- update(250);
- } catch (IOException ex) {
- if (TRACE) {
- if (isConnected)
- trace("kryonet", "Unable to update connection: " + this, ex);
- else
- trace("kryonet", "Unable to update connection.", ex);
- } else if (DEBUG) {
- if (isConnected)
- debug("kryonet", this + " update: " + ex.getMessage());
- else
- debug("kryonet", "Unable to update connection: " + ex.getMessage());
- }
- close();
- } catch (KryoNetException ex) {
- lastProtocolError = ex;
- if (ERROR) {
- if (isConnected)
- error("kryonet", "Error updating connection: " + this, ex);
- else
- error("kryonet", "Error updating connection.", ex);
- }
- close();
- throw ex;
- }
- }
- if (TRACE) trace("kryonet", "Client thread stopped.");
- }
-
- public void start () {
- // Try to let any previous update thread stop.
- if (updateThread != null) {
- shutdown = true;
- try {
- updateThread.join(5000);
- } catch (InterruptedException ignored) {
- }
- }
- updateThread = new Thread(this, "Client");
- updateThread.setDaemon(true);
- updateThread.start();
- }
-
- public void stop () {
- if (shutdown) return;
- close();
- if (TRACE) trace("kryonet", "Client thread stopping.");
- shutdown = true;
- selector.wakeup();
- }
-
- public void close () {
- super.close();
- synchronized (updateLock) { // Blocks to avoid a select while the selector is used to bind the server connection.
- }
- // Select one last time to complete closing the socket.
- if (!isClosed) {
- isClosed = true;
- selector.wakeup();
- try {
- selector.selectNow();
- } catch (IOException ignored) {
- }
- }
- }
-
- /** Releases the resources used by this client, which may no longer be used. */
- public void dispose () throws IOException {
- close();
- selector.close();
- }
-
- public void addListener (Listener listener) {
- super.addListener(listener);
- if (TRACE) trace("kryonet", "Client listener added.");
- }
-
- public void removeListener (Listener listener) {
- super.removeListener(listener);
- if (TRACE) trace("kryonet", "Client listener removed.");
- }
-
- /** An empty object will be sent if the UDP connection is inactive more than the specified milliseconds. Network hardware may
- * keep a translation table of inside to outside IP addresses and a UDP keep alive keeps this table entry from expiring. Set to
- * zero to disable. Defaults to 19000. */
- public void setKeepAliveUDP (int keepAliveMillis) {
- if (udp == null) throw new IllegalStateException("Not connected via UDP.");
- udp.keepAliveMillis = keepAliveMillis;
- }
-
- public Thread getUpdateThread () {
- return updateThread;
- }
-
- private void broadcast (int udpPort, DatagramSocket socket) throws IOException {
- ByteBuffer dataBuffer = ByteBuffer.allocate(64);
- serialization.write(null, dataBuffer, new DiscoverHost());
- dataBuffer.flip();
- byte[] data = new byte[dataBuffer.limit()];
- dataBuffer.get(data);
- for (NetworkInterface iface : Collections.list(NetworkInterface.getNetworkInterfaces())) {
- for (InetAddress address : Collections.list(iface.getInetAddresses())) {
- // Java 1.5 doesn't support getting the subnet mask, so try the two most common.
- byte[] ip = address.getAddress();
- ip[3] = -1; // 255.255.255.0
- try {
- socket.send(new DatagramPacket(data, data.length, InetAddress.getByAddress(ip), udpPort));
- } catch (Exception ignored) {
- }
- ip[2] = -1; // 255.255.0.0
- try {
- socket.send(new DatagramPacket(data, data.length, InetAddress.getByAddress(ip), udpPort));
- } catch (Exception ignored) {
- }
- }
- }
- if (DEBUG) debug("kryonet", "Broadcasted host discovery on port: " + udpPort);
- }
-
- /** Broadcasts a UDP message on the LAN to discover any running servers. The address of the first server to respond is returned.
- * @param udpPort The UDP port of the server.
- * @param timeoutMillis The number of milliseconds to wait for a response.
- * @return the first server found, or null if no server responded. */
- public InetAddress discoverHost (int udpPort, int timeoutMillis) {
- DatagramSocket socket = null;
- try {
- socket = new DatagramSocket();
- broadcast(udpPort, socket);
- socket.setSoTimeout(timeoutMillis);
- DatagramPacket packet = discoveryHandler.onRequestNewDatagramPacket();
- try {
- socket.receive(packet);
- } catch (SocketTimeoutException ex) {
- if (INFO) info("kryonet", "Host discovery timed out.");
- return null;
- }
- if (INFO) info("kryonet", "Discovered server: " + packet.getAddress());
- discoveryHandler.onDiscoveredHost(packet, getKryo());
- return packet.getAddress();
- } catch (IOException ex) {
- if (ERROR) error("kryonet", "Host discovery failed.", ex);
- return null;
- } finally {
- if (socket != null) socket.close();
- discoveryHandler.onFinally();
- }
- }
-
- /** Broadcasts a UDP message on the LAN to discover any running servers.
- * @param udpPort The UDP port of the server.
- * @param timeoutMillis The number of milliseconds to wait for a response. */
- public List discoverHosts (int udpPort, int timeoutMillis) {
- List hosts = new ArrayList();
- DatagramSocket socket = null;
- try {
- socket = new DatagramSocket();
- broadcast(udpPort, socket);
- socket.setSoTimeout(timeoutMillis);
- while (true) {
- DatagramPacket packet = discoveryHandler.onRequestNewDatagramPacket();
- try {
- socket.receive(packet);
- } catch (SocketTimeoutException ex) {
- if (INFO) info("kryonet", "Host discovery timed out.");
- return hosts;
- }
- if (INFO) info("kryonet", "Discovered server: " + packet.getAddress());
- discoveryHandler.onDiscoveredHost(packet, getKryo());
- hosts.add(packet.getAddress());
- }
- } catch (IOException ex) {
- if (ERROR) error("kryonet", "Host discovery failed.", ex);
- return hosts;
- } finally {
- if (socket != null) socket.close();
- discoveryHandler.onFinally();
- }
- }
-}
+/* Copyright (c) 2008, Nathan Sweet
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the distribution.
+ * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+ * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
+
+package com.esotericsoftware.kryonet;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.DatagramSocket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.security.AccessControlException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryonet.FrameworkMessage.DiscoverHost;
+import com.esotericsoftware.kryonet.FrameworkMessage.RegisterTCP;
+import com.esotericsoftware.kryonet.FrameworkMessage.RegisterUDP;
+
+import static com.esotericsoftware.minlog.Log.*;
+
+/** Represents a TCP and optionally a UDP connection to a {@link Server}.
+ * @author Nathan Sweet */
+public class Client extends Connection implements EndPoint {
+ static {
+ try {
+ // Needed for NIO selectors on Android 2.2.
+ System.setProperty("java.net.preferIPv6Addresses", "false");
+ } catch (AccessControlException ignored) {
+ }
+ }
+
+ private final Serialization serialization;
+ private Selector selector;
+ private int emptySelects;
+ private volatile boolean tcpRegistered, udpRegistered;
+ private Object tcpRegistrationLock = new Object();
+ private Object udpRegistrationLock = new Object();
+ private volatile boolean shutdown;
+ private final Object updateLock = new Object();
+ private Thread updateThread;
+ private int connectTimeout;
+ private InetAddress connectHost;
+ private int connectTcpPort;
+ private int connectUdpPort;
+ private boolean isClosed;
+ private ClientDiscoveryHandler discoveryHandler;
+
+ /** Creates a Client with a write buffer size of 8192 and an object buffer size of 2048. */
+ public Client () {
+ this(8192, 2048);
+ }
+
+ /** @param writeBufferSize One buffer of this size is allocated. Objects are serialized to the write buffer where the bytes are
+ * queued until they can be written to the TCP socket.
+ *
+ * Normally the socket is writable and the bytes are written immediately. If the socket cannot be written to and
+ * enough serialized objects are queued to overflow the buffer, then the connection will be closed.
+ *
+ * The write buffer should be sized at least as large as the largest object that will be sent, plus some head room to
+ * allow for some serialized objects to be queued in case the buffer is temporarily not writable. The amount of head
+ * room needed is dependent upon the size of objects being sent and how often they are sent.
+ * @param objectBufferSize One (using only TCP) or three (using both TCP and UDP) buffers of this size are allocated. These
+ * buffers are used to hold the bytes for a single object graph until it can be sent over the network or
+ * deserialized.
+ *
+ * The object buffers should be sized at least as large as the largest object that will be sent or received. */
+ public Client (int writeBufferSize, int objectBufferSize) {
+ this(writeBufferSize, objectBufferSize, new KryoSerialization());
+ }
+
+ public Client (int writeBufferSize, int objectBufferSize, Serialization serialization) {
+ super();
+ endPoint = this;
+
+ this.serialization = serialization;
+
+ this.discoveryHandler = new ClientDiscoveryHandler() {
+ };
+
+ initialize(serialization, writeBufferSize, objectBufferSize);
+
+ try {
+ selector = Selector.open();
+ } catch (IOException ex) {
+ throw new RuntimeException("Error opening selector.", ex);
+ }
+ }
+
+ public void setDiscoveryHandler (ClientDiscoveryHandler newDiscoveryHandler) {
+ discoveryHandler = newDiscoveryHandler;
+ }
+
+ public Serialization getSerialization () {
+ return serialization;
+ }
+
+ public Kryo getKryo () {
+ return ((KryoSerialization)serialization).getKryo();
+ }
+
+ /** Opens a TCP only client.
+ * @see #connect(int, InetAddress, int, int) */
+ public void connect (int timeout, String host, int tcpPort) throws IOException {
+ connect(timeout, InetAddress.getByName(host), tcpPort, -1);
+ }
+
+ /** Opens a TCP and UDP client.
+ * @see #connect(int, InetAddress, int, int) */
+ public void connect (int timeout, String host, int tcpPort, int udpPort) throws IOException {
+ connect(timeout, InetAddress.getByName(host), tcpPort, udpPort);
+ }
+
+ /** Opens a TCP only client.
+ * @see #connect(int, InetAddress, int, int) */
+ public void connect (int timeout, InetAddress host, int tcpPort) throws IOException {
+ connect(timeout, host, tcpPort, -1);
+ }
+
+ /** Opens a TCP and UDP client. Blocks until the connection is complete or the timeout is reached.
+ *
+ * Because the framework must perform some minimal communication before the connection is considered successful,
+ * {@link #update(int)} must be called on a separate thread during the connection process.
+ * @throws IllegalStateException if called from the connection's update thread.
+ * @throws IOException if the client could not be opened or connecting times out. */
+ public void connect (int timeout, InetAddress host, int tcpPort, int udpPort) throws IOException {
+ if (host == null) throw new IllegalArgumentException("host cannot be null.");
+ if (Thread.currentThread() == getUpdateThread())
+ throw new IllegalStateException("Cannot connect on the connection's update thread.");
+ this.connectTimeout = timeout;
+ this.connectHost = host;
+ this.connectTcpPort = tcpPort;
+ this.connectUdpPort = udpPort;
+ close();
+ if (INFO) {
+ if (udpPort != -1)
+ info("kryonet", "Connecting: " + host + ":" + tcpPort + "/" + udpPort);
+ else
+ info("kryonet", "Connecting: " + host + ":" + tcpPort);
+ }
+ id = -1;
+ try {
+ if (udpPort != -1) udp = new UdpConnection(serialization, tcp.readBuffer.capacity());
+
+ long endTime;
+ synchronized (updateLock) {
+ tcpRegistered = false;
+ selector.wakeup();
+ endTime = System.currentTimeMillis() + timeout;
+ tcp.connect(selector, new InetSocketAddress(host, tcpPort), 5000);
+ }
+
+ // Wait for RegisterTCP.
+ synchronized (tcpRegistrationLock) {
+ while (!tcpRegistered && System.currentTimeMillis() < endTime) {
+ try {
+ tcpRegistrationLock.wait(100);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ if (!tcpRegistered) {
+ throw new SocketTimeoutException("Connected, but timed out during TCP registration.\n"
+ + "Note: Client#update must be called in a separate thread during connect.");
+ }
+ }
+
+ if (udpPort != -1) {
+ InetSocketAddress udpAddress = new InetSocketAddress(host, udpPort);
+ synchronized (updateLock) {
+ udpRegistered = false;
+ selector.wakeup();
+ udp.connect(selector, udpAddress);
+ }
+
+ // Wait for RegisterUDP reply.
+ synchronized (udpRegistrationLock) {
+ while (!udpRegistered && System.currentTimeMillis() < endTime) {
+ RegisterUDP registerUDP = new RegisterUDP();
+ registerUDP.connectionID = id;
+ udp.send(this, registerUDP, udpAddress);
+ try {
+ udpRegistrationLock.wait(100);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ if (!udpRegistered)
+ throw new SocketTimeoutException("Connected, but timed out during UDP registration: " + host + ":" + udpPort);
+ }
+ }
+ } catch (IOException ex) {
+ close();
+ throw ex;
+ }
+ }
+
+ /** Calls {@link #connect(int, InetAddress, int, int) connect} with the values last passed to connect.
+ * @throws IllegalStateException if connect has never been called. */
+ public void reconnect () throws IOException {
+ reconnect(connectTimeout);
+ }
+
+ /** Calls {@link #connect(int, InetAddress, int, int) connect} with the specified timeout and the other values last passed to
+ * connect.
+ * @throws IllegalStateException if connect has never been called. */
+ public void reconnect (int timeout) throws IOException {
+ if (connectHost == null) throw new IllegalStateException("This client has never been connected.");
+ connect(timeout, connectHost, connectTcpPort, connectUdpPort);
+ }
+
+ /** Reads or writes any pending data for this client. Multiple threads should not call this method at the same time.
+ * @param timeout Wait for up to the specified milliseconds for data to be ready to process. May be zero to return immediately
+ * if there is no data to process. */
+ public void update (int timeout) throws IOException {
+ updateThread = Thread.currentThread();
+ synchronized (updateLock) { // Blocks to avoid a select while the selector is used to bind the server connection.
+ }
+ long startTime = System.currentTimeMillis();
+ int select = 0;
+ if (timeout > 0) {
+ select = selector.select(timeout);
+ } else {
+ select = selector.selectNow();
+ }
+ if (select == 0) {
+ emptySelects++;
+ if (emptySelects == 100) {
+ emptySelects = 0;
+ // NIO freaks and returns immediately with 0 sometimes, so try to keep from hogging the CPU.
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ try {
+ if (elapsedTime < 25) Thread.sleep(25 - elapsedTime);
+ } catch (InterruptedException ex) {
+ }
+ }
+ } else {
+ emptySelects = 0;
+ isClosed = false;
+ Set keys = selector.selectedKeys();
+ synchronized (keys) {
+ for (Iterator iter = keys.iterator(); iter.hasNext();) {
+ keepAlive();
+ SelectionKey selectionKey = iter.next();
+ iter.remove();
+ try {
+ int ops = selectionKey.readyOps();
+ if ((ops & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
+ if (selectionKey.attachment() == tcp) {
+ while (true) {
+ Object object = tcp.readObject(this);
+ if (object == null) break;
+ if (!tcpRegistered) {
+ if (object instanceof RegisterTCP) {
+ id = ((RegisterTCP)object).connectionID;
+ synchronized (tcpRegistrationLock) {
+ tcpRegistered = true;
+ tcpRegistrationLock.notifyAll();
+ if (TRACE) trace("kryonet", this + " received TCP: RegisterTCP");
+ if (udp == null) setConnected(true);
+ }
+ if (udp == null) notifyConnected();
+ }
+ continue;
+ }
+ if (udp != null && !udpRegistered) {
+ if (object instanceof RegisterUDP) {
+ synchronized (udpRegistrationLock) {
+ udpRegistered = true;
+ udpRegistrationLock.notifyAll();
+ if (TRACE) trace("kryonet", this + " received UDP: RegisterUDP");
+ if (DEBUG) {
+ debug("kryonet", "Port " + udp.datagramChannel.socket().getLocalPort()
+ + "/UDP connected to: " + udp.connectedAddress);
+ }
+ setConnected(true);
+ }
+ notifyConnected();
+ }
+ continue;
+ }
+ if (!isConnected) continue;
+ if (DEBUG) {
+ String objectString = object == null ? "null" : object.getClass().getSimpleName();
+ if (!(object instanceof FrameworkMessage)) {
+ debug("kryonet", this + " received TCP: " + objectString);
+ } else if (TRACE) {
+ trace("kryonet", this + " received TCP: " + objectString);
+ }
+ }
+ notifyReceived(object);
+ }
+ } else {
+ if (udp.readFromAddress() == null) continue;
+ Object object = udp.readObject(this);
+ if (object == null) continue;
+ if (DEBUG) {
+ String objectString = object == null ? "null" : object.getClass().getSimpleName();
+ debug("kryonet", this + " received UDP: " + objectString);
+ }
+ notifyReceived(object);
+ }
+ }
+ if ((ops & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) tcp.writeOperation();
+ } catch (CancelledKeyException ignored) {
+ // Connection is closed.
+ }
+ }
+ }
+ }
+ if (isConnected) {
+ long time = System.currentTimeMillis();
+ if (tcp.isTimedOut(time)) {
+ if (DEBUG) debug("kryonet", this + " timed out.");
+ close();
+ } else
+ keepAlive();
+ if (isIdle()) notifyIdle();
+ }
+ }
+
+ void keepAlive () {
+ if (!isConnected) return;
+ long time = System.currentTimeMillis();
+ if (tcp.needsKeepAlive(time)) sendTCP(FrameworkMessage.keepAlive);
+ if (udp != null && udpRegistered && udp.needsKeepAlive(time)) sendUDP(FrameworkMessage.keepAlive);
+ }
+
+ public void run () {
+ if (TRACE) trace("kryonet", "Client thread started.");
+ shutdown = false;
+ while (!shutdown) {
+ try {
+ update(250);
+ } catch (IOException ex) {
+ if (TRACE) {
+ if (isConnected)
+ trace("kryonet", "Unable to update connection: " + this, ex);
+ else
+ trace("kryonet", "Unable to update connection.", ex);
+ } else if (DEBUG) {
+ if (isConnected)
+ debug("kryonet", this + " update: " + ex.getMessage());
+ else
+ debug("kryonet", "Unable to update connection: " + ex.getMessage());
+ }
+ close();
+ } catch (KryoNetException ex) {
+ lastProtocolError = ex;
+ if (ERROR) {
+ if (isConnected)
+ error("kryonet", "Error updating connection: " + this, ex);
+ else
+ error("kryonet", "Error updating connection.", ex);
+ }
+ close();
+ throw ex;
+ }
+ }
+ if (TRACE) trace("kryonet", "Client thread stopped.");
+ }
+
+ public void start () {
+ // Try to let any previous update thread stop.
+ if (updateThread != null) {
+ shutdown = true;
+ try {
+ updateThread.join(5000);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ updateThread = new Thread(this, "Client");
+ updateThread.setDaemon(true);
+ updateThread.start();
+ }
+
+ public void stop () {
+ if (shutdown) return;
+ close();
+ if (TRACE) trace("kryonet", "Client thread stopping.");
+ shutdown = true;
+ selector.wakeup();
+ }
+
+ public void close () {
+ super.close();
+ synchronized (updateLock) { // Blocks to avoid a select while the selector is used to bind the server connection.
+ }
+ // Select one last time to complete closing the socket.
+ if (!isClosed) {
+ isClosed = true;
+ selector.wakeup();
+ try {
+ selector.selectNow();
+ } catch (IOException ignored) {
+ }
+ }
+ }
+
+ /** Releases the resources used by this client, which may no longer be used. */
+ public void dispose () throws IOException {
+ close();
+ selector.close();
+ }
+
+ public void addListener (Listener listener) {
+ super.addListener(listener);
+ if (TRACE) trace("kryonet", "Client listener added.");
+ }
+
+ public void removeListener (Listener listener) {
+ super.removeListener(listener);
+ if (TRACE) trace("kryonet", "Client listener removed.");
+ }
+
+ /** An empty object will be sent if the UDP connection is inactive more than the specified milliseconds. Network hardware may
+ * keep a translation table of inside to outside IP addresses and a UDP keep alive keeps this table entry from expiring. Set to
+ * zero to disable. Defaults to 19000. */
+ public void setKeepAliveUDP (int keepAliveMillis) {
+ if (udp == null) throw new IllegalStateException("Not connected via UDP.");
+ udp.keepAliveMillis = keepAliveMillis;
+ }
+
+ public Thread getUpdateThread () {
+ return updateThread;
+ }
+
+ private void broadcast (int udpPort, DatagramSocket socket) throws IOException {
+ ByteBuffer dataBuffer = ByteBuffer.allocate(64);
+ serialization.write(null, dataBuffer, new DiscoverHost());
+ dataBuffer.flip();
+ byte[] data = new byte[dataBuffer.limit()];
+ dataBuffer.get(data);
+ for (NetworkInterface iface : Collections.list(NetworkInterface.getNetworkInterfaces())) {
+ for (InetAddress address : Collections.list(iface.getInetAddresses())) {
+ // Java 1.5 doesn't support getting the subnet mask, so try the two most common.
+ byte[] ip = address.getAddress();
+ ip[3] = -1; // 255.255.255.0
+ try {
+ socket.send(new DatagramPacket(data, data.length, InetAddress.getByAddress(ip), udpPort));
+ } catch (Exception ignored) {
+ }
+ ip[2] = -1; // 255.255.0.0
+ try {
+ socket.send(new DatagramPacket(data, data.length, InetAddress.getByAddress(ip), udpPort));
+ } catch (Exception ignored) {
+ }
+ }
+ }
+ if (DEBUG) debug("kryonet", "Broadcasted host discovery on port: " + udpPort);
+ }
+
+ /** Broadcasts a UDP message on the LAN to discover any running servers. The address of the first server to respond is returned.
+ * @param udpPort The UDP port of the server.
+ * @param timeoutMillis The number of milliseconds to wait for a response.
+ * @return the first server found, or null if no server responded. */
+ public InetAddress discoverHost (int udpPort, int timeoutMillis) {
+ DatagramSocket socket = null;
+ try {
+ socket = new DatagramSocket();
+ broadcast(udpPort, socket);
+ socket.setSoTimeout(timeoutMillis);
+ DatagramPacket packet = discoveryHandler.onRequestNewDatagramPacket();
+ try {
+ socket.receive(packet);
+ } catch (SocketTimeoutException ex) {
+ if (INFO) info("kryonet", "Host discovery timed out.");
+ return null;
+ }
+ if (INFO) info("kryonet", "Discovered server: " + packet.getAddress());
+ discoveryHandler.onDiscoveredHost(packet, getKryo());
+ return packet.getAddress();
+ } catch (IOException ex) {
+ if (ERROR) error("kryonet", "Host discovery failed.", ex);
+ return null;
+ } finally {
+ if (socket != null) socket.close();
+ discoveryHandler.onFinally();
+ }
+ }
+
+ /** Broadcasts a UDP message on the LAN to discover any running servers.
+ * @param udpPort The UDP port of the server.
+ * @param timeoutMillis The number of milliseconds to wait for a response. */
+ public List discoverHosts (int udpPort, int timeoutMillis) {
+ List hosts = new ArrayList();
+ DatagramSocket socket = null;
+ try {
+ socket = new DatagramSocket();
+ broadcast(udpPort, socket);
+ socket.setSoTimeout(timeoutMillis);
+ while (true) {
+ DatagramPacket packet = discoveryHandler.onRequestNewDatagramPacket();
+ try {
+ socket.receive(packet);
+ } catch (SocketTimeoutException ex) {
+ if (INFO) info("kryonet", "Host discovery timed out.");
+ return hosts;
+ }
+ if (INFO) info("kryonet", "Discovered server: " + packet.getAddress());
+ discoveryHandler.onDiscoveredHost(packet, getKryo());
+ hosts.add(packet.getAddress());
+ }
+ } catch (IOException ex) {
+ if (ERROR) error("kryonet", "Host discovery failed.", ex);
+ return hosts;
+ } finally {
+ if (socket != null) socket.close();
+ discoveryHandler.onFinally();
+ }
+ }
+}
diff --git a/src/com/esotericsoftware/kryonet/ClientDiscoveryHandler.java b/src/com/esotericsoftware/kryonet/ClientDiscoveryHandler.java
index e2f28ac4..d31cb1cd 100644
--- a/src/com/esotericsoftware/kryonet/ClientDiscoveryHandler.java
+++ b/src/com/esotericsoftware/kryonet/ClientDiscoveryHandler.java
@@ -1,15 +1,15 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
- *
+ *
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
- *
+ *
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
- *
+ *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
@@ -26,41 +26,20 @@
public interface ClientDiscoveryHandler {
- /**
- * This implementation of the {@link ClientDiscoveryHandler} is responsible
- * for providing the {@link Client} with it's default behavior.
- */
- public static final ClientDiscoveryHandler DEFAULT = new ClientDiscoveryHandler() {
-
- @Override
- public DatagramPacket onRequestNewDatagramPacket() {
- return new DatagramPacket(new byte[0], 0);
- }
-
- @Override
- public void onDiscoveredHost(DatagramPacket datagramPacket, Kryo kryo) {
- //
- }
-
- @Override
- public void onFinally() {
- //
- }
-
- };
-
/**
* Implementations of this method should return a new {@link DatagramPacket}
* that the {@link Client} will use to fill with the incoming packet data
* sent by the {@link ServerDiscoveryHandler}.
- *
+ *
* @return a new {@link DatagramPacket}
*/
- public DatagramPacket onRequestNewDatagramPacket();
+ public default DatagramPacket onRequestNewDatagramPacket() {
+ return new DatagramPacket(new byte[0], 0);
+ };
/**
* Called when the {@link Client} discovers a host.
- *
+ *
* @param datagramPacket
* the same {@link DatagramPacket} from
* {@link #onRequestNewDatagramPacket()}, after being filled with
@@ -68,13 +47,15 @@ public void onFinally() {
* @param kryo
* the {@link Kryo} instance
*/
- public void onDiscoveredHost(DatagramPacket datagramPacket, Kryo kryo);
+ public default void onDiscoveredHost(DatagramPacket datagramPacket, Kryo kryo) {
+ };
/**
* Called right before the {@link Client#discoverHost(int, int)} or
* {@link Client#discoverHosts(int, int)} method exits. This allows the
* implementation to clean up any resources used, i.e. an {@link Input}.
*/
- public void onFinally();
+ public default void onFinally() {
+ };
}
diff --git a/src/com/esotericsoftware/kryonet/Connection.java b/src/com/esotericsoftware/kryonet/Connection.java
index 13934ed5..fff75caf 100644
--- a/src/com/esotericsoftware/kryonet/Connection.java
+++ b/src/com/esotericsoftware/kryonet/Connection.java
@@ -1,340 +1,366 @@
-/* Copyright (c) 2008, Nathan Sweet
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
- * conditions are met:
- *
- * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials provided with the distribution.
- * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
- * from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
- * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
- * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
-
-package com.esotericsoftware.kryonet;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketAddress;
-import java.net.SocketException;
-import java.nio.channels.SocketChannel;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryonet.FrameworkMessage.Ping;
-
-import static com.esotericsoftware.minlog.Log.*;
-
-// BOZO - Layer to handle handshake state.
-
-/** Represents a TCP and optionally a UDP connection between a {@link Client} and a {@link Server}. If either underlying connection
- * is closed or errors, both connections are closed.
- * @author Nathan Sweet */
-public class Connection {
- int id = -1;
- private String name;
- EndPoint endPoint;
- TcpConnection tcp;
- UdpConnection udp;
- InetSocketAddress udpRemoteAddress;
- private Listener[] listeners = {};
- private Object listenerLock = new Object();
- private int lastPingID;
- private long lastPingSendTime;
- private int returnTripTime;
- volatile boolean isConnected;
- volatile KryoNetException lastProtocolError;
-
- protected Connection () {
- }
-
- void initialize (Serialization serialization, int writeBufferSize, int objectBufferSize) {
- tcp = new TcpConnection(serialization, writeBufferSize, objectBufferSize);
- }
-
- /** Returns the server assigned ID. Will return -1 if this connection has never been connected or the last assigned ID if this
- * connection has been disconnected. */
- public int getID () {
- return id;
- }
-
- /** Returns true if this connection is connected to the remote end. Note that a connection can become disconnected at any time. */
- public boolean isConnected () {
- return isConnected;
- }
-
- /**
- * Returns the last protocol error that occured on the connection.
- *
- * @return The last protocol error or null if none error occured.
- */
- public KryoNetException getLastProtocolError() {
- return lastProtocolError;
- }
-
- /** Sends the object over the network using TCP.
- * @return The number of bytes sent.
- * @see Kryo#register(Class, com.esotericsoftware.kryo.Serializer) */
- public int sendTCP (Object object) {
- if (object == null) throw new IllegalArgumentException("object cannot be null.");
- try {
- int length = tcp.send(this, object);
- if (length == 0) {
- if (TRACE) trace("kryonet", this + " TCP had nothing to send.");
- } else if (DEBUG) {
- String objectString = object == null ? "null" : object.getClass().getSimpleName();
- if (!(object instanceof FrameworkMessage)) {
- debug("kryonet", this + " sent TCP: " + objectString + " (" + length + ")");
- } else if (TRACE) {
- trace("kryonet", this + " sent TCP: " + objectString + " (" + length + ")");
- }
- }
- return length;
- } catch (IOException ex) {
- if (DEBUG) debug("kryonet", "Unable to send TCP with connection: " + this, ex);
- close();
- return 0;
- } catch (KryoNetException ex) {
- if (ERROR) error("kryonet", "Unable to send TCP with connection: " + this, ex);
- close();
- return 0;
- }
- }
-
- /** Sends the object over the network using UDP.
- * @return The number of bytes sent.
- * @see Kryo#register(Class, com.esotericsoftware.kryo.Serializer)
- * @throws IllegalStateException if this connection was not opened with both TCP and UDP. */
- public int sendUDP (Object object) {
- if (object == null) throw new IllegalArgumentException("object cannot be null.");
- SocketAddress address = udpRemoteAddress;
- if (address == null && udp != null) address = udp.connectedAddress;
- if (address == null && isConnected) throw new IllegalStateException("Connection is not connected via UDP.");
-
- try {
- if (address == null) throw new SocketException("Connection is closed.");
-
- int length = udp.send(this, object, address);
- if (length == 0) {
- if (TRACE) trace("kryonet", this + " UDP had nothing to send.");
- } else if (DEBUG) {
- if (length != -1) {
- String objectString = object == null ? "null" : object.getClass().getSimpleName();
- if (!(object instanceof FrameworkMessage)) {
- debug("kryonet", this + " sent UDP: " + objectString + " (" + length + ")");
- } else if (TRACE) {
- trace("kryonet", this + " sent UDP: " + objectString + " (" + length + ")");
- }
- } else
- debug("kryonet", this + " was unable to send, UDP socket buffer full.");
- }
- return length;
- } catch (IOException ex) {
- if (DEBUG) debug("kryonet", "Unable to send UDP with connection: " + this, ex);
- close();
- return 0;
- } catch (KryoNetException ex) {
- if (ERROR) error("kryonet", "Unable to send UDP with connection: " + this, ex);
- close();
- return 0;
- }
- }
-
- public void close () {
- boolean wasConnected = isConnected;
- isConnected = false;
- tcp.close();
- if (udp != null && udp.connectedAddress != null) udp.close();
- if (wasConnected) {
- notifyDisconnected();
- if (INFO) info("kryonet", this + " disconnected.");
- }
- setConnected(false);
- }
-
- /** Requests the connection to communicate with the remote computer to determine a new value for the
- * {@link #getReturnTripTime() return trip time}. When the connection receives a {@link FrameworkMessage.Ping} object with
- * {@link Ping#isReply isReply} set to true, the new return trip time is available. */
- public void updateReturnTripTime () {
- Ping ping = new Ping();
- ping.id = lastPingID++;
- lastPingSendTime = System.currentTimeMillis();
- sendTCP(ping);
- }
-
- /** Returns the last calculated TCP return trip time, or -1 if {@link #updateReturnTripTime()} has never been called or the
- * {@link FrameworkMessage.Ping} response has not yet been received. */
- public int getReturnTripTime () {
- return returnTripTime;
- }
-
- /** An empty object will be sent if the TCP connection has not sent an object within the specified milliseconds. Periodically
- * sending a keep alive ensures that an abnormal close is detected in a reasonable amount of time (see {@link #setTimeout(int)}
- * ). Also, some network hardware will close a TCP connection that ceases to transmit for a period of time (typically 1+
- * minutes). Set to zero to disable. Defaults to 8000. */
- public void setKeepAliveTCP (int keepAliveMillis) {
- tcp.keepAliveMillis = keepAliveMillis;
- }
-
- /** If the specified amount of time passes without receiving an object over TCP, the connection is considered closed. When a TCP
- * socket is closed normally, the remote end is notified immediately and this timeout is not needed. However, if a socket is
- * closed abnormally (eg, power loss), KryoNet uses this timeout to detect the problem. The timeout should be set higher than
- * the {@link #setKeepAliveTCP(int) TCP keep alive} for the remote end of the connection. The keep alive ensures that the remote
- * end of the connection will be constantly sending objects, and setting the timeout higher than the keep alive allows for
- * network latency. Set to zero to disable. Defaults to 12000. */
- public void setTimeout (int timeoutMillis) {
- tcp.timeoutMillis = timeoutMillis;
- }
-
- /** If the listener already exists, it is not added again. */
- public void addListener (Listener listener) {
- if (listener == null) throw new IllegalArgumentException("listener cannot be null.");
- synchronized (listenerLock) {
- Listener[] listeners = this.listeners;
- int n = listeners.length;
- for (int i = 0; i < n; i++)
- if (listener == listeners[i]) return;
- Listener[] newListeners = new Listener[n + 1];
- newListeners[0] = listener;
- System.arraycopy(listeners, 0, newListeners, 1, n);
- this.listeners = newListeners;
- }
- if (TRACE) trace("kryonet", "Connection listener added: " + listener.getClass().getName());
- }
-
- public void removeListener (Listener listener) {
- if (listener == null) throw new IllegalArgumentException("listener cannot be null.");
- synchronized (listenerLock) {
- Listener[] listeners = this.listeners;
- int n = listeners.length;
- if (n == 0) return;
- Listener[] newListeners = new Listener[n - 1];
- for (int i = 0, ii = 0; i < n; i++) {
- Listener copyListener = listeners[i];
- if (listener == copyListener) continue;
- if (ii == n - 1) return;
- newListeners[ii++] = copyListener;
- }
- this.listeners = newListeners;
- }
- if (TRACE) trace("kryonet", "Connection listener removed: " + listener.getClass().getName());
- }
-
- void notifyConnected () {
- if (INFO) {
- SocketChannel socketChannel = tcp.socketChannel;
- if (socketChannel != null) {
- Socket socket = tcp.socketChannel.socket();
- if (socket != null) {
- InetSocketAddress remoteSocketAddress = (InetSocketAddress)socket.getRemoteSocketAddress();
- if (remoteSocketAddress != null) info("kryonet", this + " connected: " + remoteSocketAddress.getAddress());
- }
- }
- }
- Listener[] listeners = this.listeners;
- for (int i = 0, n = listeners.length; i < n; i++)
- listeners[i].connected(this);
- }
-
- void notifyDisconnected () {
- Listener[] listeners = this.listeners;
- for (int i = 0, n = listeners.length; i < n; i++)
- listeners[i].disconnected(this);
- }
-
- void notifyIdle () {
- Listener[] listeners = this.listeners;
- for (int i = 0, n = listeners.length; i < n; i++) {
- listeners[i].idle(this);
- if (!isIdle()) break;
- }
- }
-
- void notifyReceived (Object object) {
- if (object instanceof Ping) {
- Ping ping = (Ping)object;
- if (ping.isReply) {
- if (ping.id == lastPingID - 1) {
- returnTripTime = (int)(System.currentTimeMillis() - lastPingSendTime);
- if (TRACE) trace("kryonet", this + " return trip time: " + returnTripTime);
- }
- } else {
- ping.isReply = true;
- sendTCP(ping);
- }
- }
- Listener[] listeners = this.listeners;
- for (int i = 0, n = listeners.length; i < n; i++)
- listeners[i].received(this, object);
- }
-
- /** Returns the local {@link Client} or {@link Server} to which this connection belongs. */
- public EndPoint getEndPoint () {
- return endPoint;
- }
-
- /** Returns the IP address and port of the remote end of the TCP connection, or null if this connection is not connected. */
- public InetSocketAddress getRemoteAddressTCP () {
- SocketChannel socketChannel = tcp.socketChannel;
- if (socketChannel != null) {
- Socket socket = tcp.socketChannel.socket();
- if (socket != null) {
- return (InetSocketAddress)socket.getRemoteSocketAddress();
- }
- }
- return null;
- }
-
- /** Returns the IP address and port of the remote end of the UDP connection, or null if this connection is not connected. */
- public InetSocketAddress getRemoteAddressUDP () {
- InetSocketAddress connectedAddress = udp.connectedAddress;
- if (connectedAddress != null) return connectedAddress;
- return udpRemoteAddress;
- }
-
- /** Workaround for broken NIO networking on Android 1.6. If true, the underlying NIO buffer is always copied to the beginning of
- * the buffer before being given to the SocketChannel for sending. The Harmony SocketChannel implementation in Android 1.6
- * ignores the buffer position, always copying from the beginning of the buffer. This is fixed in Android 2.0+. */
- public void setBufferPositionFix (boolean bufferPositionFix) {
- tcp.bufferPositionFix = bufferPositionFix;
- }
-
- /** Sets the friendly name of this connection. This is returned by {@link #toString()} and is useful for providing application
- * specific identifying information in the logging. May be null for the default name of "Connection X", where X is the
- * connection ID. */
- public void setName (String name) {
- this.name = name;
- }
-
- /** Returns the number of bytes that are waiting to be written to the TCP socket, if any. */
- public int getTcpWriteBufferSize () {
- return tcp.writeBuffer.position();
- }
-
- /** @see #setIdleThreshold(float) */
- public boolean isIdle () {
- return tcp.writeBuffer.position() / (float)tcp.writeBuffer.capacity() < tcp.idleThreshold;
- }
-
- /** If the percent of the TCP write buffer that is filled is less than the specified threshold,
- * {@link Listener#idle(Connection)} will be called for each network thread update. Default is 0.1. */
- public void setIdleThreshold (float idleThreshold) {
- tcp.idleThreshold = idleThreshold;
- }
-
- public String toString () {
- if (name != null) return name;
- return "Connection " + id;
- }
-
- void setConnected (boolean isConnected) {
- this.isConnected = isConnected;
- if (isConnected && name == null) name = "Connection " + id;
- }
-}
+/* Copyright (c) 2008, Nathan Sweet
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the distribution.
+ * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+ * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
+
+package com.esotericsoftware.kryonet;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.channels.SocketChannel;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryonet.FrameworkMessage.Ping;
+
+import static com.esotericsoftware.minlog.Log.*;
+
+// BOZO - Layer to handle handshake state.
+
+/** Represents a TCP and optionally a UDP connection between a {@link Client} and a {@link Server}. If either underlying connection
+ * is closed or errors, both connections are closed.
+ * @author Nathan Sweet */
+public class Connection {
+ int id = -1;
+ private String name;
+ EndPoint endPoint;
+ TcpConnection tcp;
+ UdpConnection udp;
+ InetSocketAddress udpRemoteAddress;
+ private Listener[] listeners = {};
+ private Object listenerLock = new Object();
+ private int lastPingID;
+ private long lastPingSendTime;
+ private int returnTripTime;
+ volatile boolean isConnected;
+ volatile KryoNetException lastProtocolError;
+
+ protected Connection () {
+ }
+
+ void initialize (Serialization serialization, int writeBufferSize, int objectBufferSize) {
+ tcp = new TcpConnection(serialization, writeBufferSize, objectBufferSize);
+ }
+
+ /** Returns the server assigned ID. Will return -1 if this connection has never been connected or the last assigned ID if this
+ * connection has been disconnected. */
+ public int getID () {
+ return id;
+ }
+
+ /** Returns true if this connection is connected to the remote end. Note that a connection can become disconnected at any time. */
+ public boolean isConnected () {
+ return isConnected;
+ }
+
+ /**
+ * Returns the last protocol error that occured on the connection.
+ *
+ * @return The last protocol error or null if none error occured.
+ */
+ public KryoNetException getLastProtocolError() {
+ return lastProtocolError;
+ }
+
+ /** Sends the object over the network using TCP.
+ * @return The number of bytes sent.
+ * @see Kryo#register(Class, com.esotericsoftware.kryo.Serializer) */
+ public int sendTCP (Object object) {
+ if (object == null) throw new IllegalArgumentException("object cannot be null.");
+ try {
+ int length = tcp.send(this, object);
+ if (length == 0) {
+ if (TRACE) trace("kryonet", this + " TCP had nothing to send.");
+ } else if (DEBUG) {
+ String objectString = object == null ? "null" : object.getClass().getSimpleName();
+ if (!(object instanceof FrameworkMessage)) {
+ debug("kryonet", this + " sent TCP: " + objectString + " (" + length + ")");
+ } else if (TRACE) {
+ trace("kryonet", this + " sent TCP: " + objectString + " (" + length + ")");
+ }
+ }
+ return length;
+ } catch (IOException ex) {
+ if (DEBUG) debug("kryonet", "Unable to send TCP with connection: " + this, ex);
+ close();
+ return 0;
+ } catch (KryoNetException ex) {
+ if (ERROR) error("kryonet", "Unable to send TCP with connection: " + this, ex);
+ close();
+ return 0;
+ }
+ }
+
+ /** Sends the object over the network using UDP.
+ * @return The number of bytes sent.
+ * @see Kryo#register(Class, com.esotericsoftware.kryo.Serializer)
+ * @throws IllegalStateException if this connection was not opened with both TCP and UDP. */
+ public int sendUDP (Object object) {
+ if (object == null) throw new IllegalArgumentException("object cannot be null.");
+ SocketAddress address = udpRemoteAddress;
+ if (address == null && udp != null) address = udp.connectedAddress;
+ if (address == null && isConnected) throw new IllegalStateException("Connection is not connected via UDP.");
+
+ try {
+ if (address == null) throw new SocketException("Connection is closed.");
+
+ int length = udp.send(this, object, address);
+ if (length == 0) {
+ if (TRACE) trace("kryonet", this + " UDP had nothing to send.");
+ } else if (DEBUG) {
+ if (length != -1) {
+ String objectString = object == null ? "null" : object.getClass().getSimpleName();
+ if (!(object instanceof FrameworkMessage)) {
+ debug("kryonet", this + " sent UDP: " + objectString + " (" + length + ")");
+ } else if (TRACE) {
+ trace("kryonet", this + " sent UDP: " + objectString + " (" + length + ")");
+ }
+ } else
+ debug("kryonet", this + " was unable to send, UDP socket buffer full.");
+ }
+ return length;
+ } catch (IOException ex) {
+ if (DEBUG) debug("kryonet", "Unable to send UDP with connection: " + this, ex);
+ close();
+ return 0;
+ } catch (KryoNetException ex) {
+ if (ERROR) error("kryonet", "Unable to send UDP with connection: " + this, ex);
+ close();
+ return 0;
+ }
+ }
+
+ public void close () {
+ boolean wasConnected = isConnected;
+ isConnected = false;
+ tcp.close();
+ if (udp != null && udp.connectedAddress != null) udp.close();
+ if (wasConnected) {
+ notifyDisconnected();
+ if (INFO) info("kryonet", this + " disconnected.");
+ }
+ setConnected(false);
+ }
+
+ /** Requests the connection to communicate with the remote computer to determine a new value for the
+ * {@link #getReturnTripTime() return trip time}. When the connection receives a {@link FrameworkMessage.Ping} object with
+ * {@link Ping#isReply isReply} set to true, the new return trip time is available. */
+ public void updateReturnTripTime () {
+ Ping ping = new Ping();
+ ping.id = lastPingID++;
+ lastPingSendTime = System.currentTimeMillis();
+ sendTCP(ping);
+ }
+
+ /** Returns the last calculated TCP return trip time, or -1 if {@link #updateReturnTripTime()} has never been called or the
+ * {@link FrameworkMessage.Ping} response has not yet been received. */
+ public int getReturnTripTime () {
+ return returnTripTime;
+ }
+
+ /** An empty object will be sent if the TCP connection has not sent an object within the specified milliseconds. Periodically
+ * sending a keep alive ensures that an abnormal close is detected in a reasonable amount of time (see {@link #setTimeout(int)}
+ * ). Also, some network hardware will close a TCP connection that ceases to transmit for a period of time (typically 1+
+ * minutes). Set to zero to disable. Defaults to 8000. */
+ public void setKeepAliveTCP (int keepAliveMillis) {
+ tcp.keepAliveMillis = keepAliveMillis;
+ }
+
+ /** If the specified amount of time passes without receiving an object over TCP, the connection is considered closed. When a TCP
+ * socket is closed normally, the remote end is notified immediately and this timeout is not needed. However, if a socket is
+ * closed abnormally (eg, power loss), KryoNet uses this timeout to detect the problem. The timeout should be set higher than
+ * the {@link #setKeepAliveTCP(int) TCP keep alive} for the remote end of the connection. The keep alive ensures that the remote
+ * end of the connection will be constantly sending objects, and setting the timeout higher than the keep alive allows for
+ * network latency. Set to zero to disable. Defaults to 12000. */
+ public void setTimeout (int timeoutMillis) {
+ tcp.timeoutMillis = timeoutMillis;
+ }
+
+ /** If the listener already exists, it is not added again. */
+ public void addListener (Listener listener) {
+ if (listener == null) throw new IllegalArgumentException("listener cannot be null.");
+ synchronized (listenerLock) {
+ Listener[] listeners = this.listeners;
+ int n = listeners.length;
+ for (int i = 0; i < n; i++)
+ if (listener == listeners[i]) return;
+ Listener[] newListeners = new Listener[n + 1];
+ newListeners[0] = listener;
+ System.arraycopy(listeners, 0, newListeners, 1, n);
+ this.listeners = newListeners;
+ }
+ if (TRACE) trace("kryonet", "Connection listener added: " + listener.getClass().getName());
+ }
+
+ public void removeListener (Listener listener) {
+ if (listener == null) throw new IllegalArgumentException("listener cannot be null.");
+ synchronized (listenerLock) {
+ Listener[] listeners = this.listeners;
+ int n = listeners.length;
+ if (n == 0) return;
+ Listener[] newListeners = new Listener[n - 1];
+ for (int i = 0, ii = 0; i < n; i++) {
+ Listener copyListener = listeners[i];
+ if (listener == copyListener) continue;
+ if (ii == n - 1) return;
+ newListeners[ii++] = copyListener;
+ }
+ this.listeners = newListeners;
+ }
+ if (TRACE) trace("kryonet", "Connection listener removed: " + listener.getClass().getName());
+ }
+
+ void notifyConnected () {
+ if (INFO) {
+ SocketChannel socketChannel = tcp.socketChannel;
+ if (socketChannel != null) {
+ Socket socket = tcp.socketChannel.socket();
+ if (socket != null) {
+ InetSocketAddress remoteSocketAddress = (InetSocketAddress)socket.getRemoteSocketAddress();
+ if (remoteSocketAddress != null) info("kryonet", this + " connected: " + remoteSocketAddress.getAddress());
+ }
+ }
+ }
+ Listener[] listeners = this.listeners;
+ for (int i = 0, n = listeners.length; i < n; i++)
+ listeners[i].connected(this);
+ }
+
+ void notifyDisconnected () {
+ Listener[] listeners = this.listeners;
+ for (int i = 0, n = listeners.length; i < n; i++)
+ listeners[i].disconnected(this);
+ }
+
+ void notifyIdle () {
+ Listener[] listeners = this.listeners;
+ for (int i = 0, n = listeners.length; i < n; i++) {
+ listeners[i].idle(this);
+ if (!isIdle()) break;
+ }
+ }
+
+ void notifyReceived (Object object) {
+ if (object instanceof Ping) {
+ Ping ping = (Ping)object;
+ if (ping.isReply) {
+ if (ping.id == lastPingID - 1) {
+ returnTripTime = (int)(System.currentTimeMillis() - lastPingSendTime);
+ if (TRACE) trace("kryonet", this + " return trip time: " + returnTripTime);
+ }
+ } else {
+ ping.isReply = true;
+ sendTCP(ping);
+ }
+ }
+ Listener[] listeners = this.listeners;
+ for (int i = 0, n = listeners.length; i < n; i++)
+ listeners[i].received(this, object);
+ }
+
+ /** Returns the local {@link Client} or {@link Server} to which this connection belongs. */
+ public EndPoint getEndPoint () {
+ return endPoint;
+ }
+
+ /** Returns the IP address and port of the remote end of the TCP connection, or null if this connection is not connected. */
+ public InetSocketAddress getRemoteAddressTCP () {
+ SocketChannel socketChannel = tcp.socketChannel;
+ if (socketChannel != null) {
+ Socket socket = tcp.socketChannel.socket();
+ if (socket != null) {
+ return (InetSocketAddress)socket.getRemoteSocketAddress();
+ }
+ }
+ return null;
+ }
+
+ /** Returns the IP address and port of the remote end of the UDP connection, or null if this connection is not connected. */
+ public InetSocketAddress getRemoteAddressUDP () {
+ InetSocketAddress connectedAddress = udp.connectedAddress;
+ if (connectedAddress != null) return connectedAddress;
+ return udpRemoteAddress;
+ }
+
+ /** Workaround for broken NIO networking on Android 1.6. If true, the underlying NIO buffer is always copied to the beginning of
+ * the buffer before being given to the SocketChannel for sending. The Harmony SocketChannel implementation in Android 1.6
+ * ignores the buffer position, always copying from the beginning of the buffer. This is fixed in Android 2.0+. */
+ public void setBufferPositionFix (boolean bufferPositionFix) {
+ tcp.bufferPositionFix = bufferPositionFix;
+ }
+
+ /** Sets the friendly name of this connection. This is returned by {@link #toString()} and is useful for providing application
+ * specific identifying information in the logging. May be null for the default name of "Connection X", where X is the
+ * connection ID. */
+ public void setName (String name) {
+ this.name = name;
+ }
+
+ /** Returns the number of bytes that are waiting to be written to the TCP socket, if any. */
+ public int getTcpWriteBufferSize () {
+ return tcp.writeBuffer.position();
+ }
+
+ /** @see #setIdleThreshold(float) */
+ public boolean isIdle () {
+ return tcp.writeBuffer.position() / (float)tcp.writeBuffer.capacity() < tcp.idleThreshold;
+ }
+
+ /** If the percent of the TCP write buffer that is filled is less than the specified threshold,
+ * {@link Listener#idle(Connection)} will be called for each network thread update. Default is 0.1. */
+ public void setIdleThreshold (float idleThreshold) {
+ tcp.idleThreshold = idleThreshold;
+ }
+
+ public String toString () {
+ if (name != null) return name;
+ return "Connection " + id;
+ }
+
+ void setConnected (boolean isConnected) {
+ this.isConnected = isConnected;
+ if (isConnected && name == null) name = "Connection " + id;
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + id;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Connection other = (Connection) obj;
+ if (id != other.id) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/src/com/esotericsoftware/kryonet/Listener.java b/src/com/esotericsoftware/kryonet/Listener.java
index bd9ab148..f07a554d 100644
--- a/src/com/esotericsoftware/kryonet/Listener.java
+++ b/src/com/esotericsoftware/kryonet/Listener.java
@@ -1,189 +1,189 @@
-/* Copyright (c) 2008, Nathan Sweet
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
- * conditions are met:
- *
- * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials provided with the distribution.
- * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
- * from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
- * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
- * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
-
-package com.esotericsoftware.kryonet;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static com.esotericsoftware.minlog.Log.*;
-
-/** Used to be notified about connection events. */
-public class Listener {
- /** Called when the remote end has been connected. This will be invoked before any objects are received by
- * {@link #received(Connection, Object)}. This will be invoked on the same thread as {@link Client#update(int)} and
- * {@link Server#update(int)}. This method should not block for long periods as other network activity will not be processed
- * until it returns. */
- public void connected (Connection connection) {
- }
-
- /** Called when the remote end is no longer connected. There is no guarantee as to what thread will invoke this method. */
- public void disconnected (Connection connection) {
- }
-
- /** Called when an object has been received from the remote end of the connection. This will be invoked on the same thread as
- * {@link Client#update(int)} and {@link Server#update(int)}. This method should not block for long periods as other network
- * activity will not be processed until it returns. */
- public void received (Connection connection, Object object) {
- }
-
- /** Called when the connection is below the {@link Connection#setIdleThreshold(float) idle threshold}. */
- public void idle (Connection connection) {
- }
-
- /** Uses reflection to called "received(Connection, XXX)" on the listener, where XXX is the received object type. Note this
- * class uses a HashMap lookup and (cached) reflection, so is not as efficient as writing a series of "instanceof" statements. */
- static public class ReflectionListener extends Listener {
- private final HashMap classToMethod = new HashMap();
-
- public void received (Connection connection, Object object) {
- Class type = object.getClass();
- Method method = classToMethod.get(type);
- if (method == null) {
- if (classToMethod.containsKey(type)) return; // Only fail on the first attempt to find the method.
- try {
- method = getClass().getMethod("received", new Class[] {Connection.class, type});
- method.setAccessible(true);
- } catch (SecurityException ex) {
- if (ERROR) error("kryonet", "Unable to access method: received(Connection, " + type.getName() + ")", ex);
- return;
- } catch (NoSuchMethodException ex) {
- if (DEBUG)
- debug("kryonet",
- "Unable to find listener method: " + getClass().getName() + "#received(Connection, " + type.getName() + ")");
- return;
- } finally {
- classToMethod.put(type, method);
- }
- }
- try {
- method.invoke(this, connection, object);
- } catch (Throwable ex) {
- if (ex instanceof InvocationTargetException && ex.getCause() != null) ex = ex.getCause();
- if (ex instanceof RuntimeException) throw (RuntimeException)ex;
- throw new RuntimeException("Error invoking method: " + getClass().getName() + "#received(Connection, "
- + type.getName() + ")", ex);
- }
- }
- }
-
- /** Wraps a listener and queues notifications as {@link Runnable runnables}. This allows the runnables to be processed on a
- * different thread, preventing the connection's update thread from being blocked. */
- static public abstract class QueuedListener extends Listener {
- final Listener listener;
-
- public QueuedListener (Listener listener) {
- if (listener == null) throw new IllegalArgumentException("listener cannot be null.");
- this.listener = listener;
- }
-
- public void connected (final Connection connection) {
- queue(new Runnable() {
- public void run () {
- listener.connected(connection);
- }
- });
- }
-
- public void disconnected (final Connection connection) {
- queue(new Runnable() {
- public void run () {
- listener.disconnected(connection);
- }
- });
- }
-
- public void received (final Connection connection, final Object object) {
- queue(new Runnable() {
- public void run () {
- listener.received(connection, object);
- }
- });
- }
-
- public void idle (final Connection connection) {
- queue(new Runnable() {
- public void run () {
- listener.idle(connection);
- }
- });
- }
-
- abstract protected void queue (Runnable runnable);
- }
-
- /** Wraps a listener and processes notification events on a separate thread. */
- static public class ThreadedListener extends QueuedListener {
- protected final ExecutorService threadPool;
-
- /** Creates a single thread to process notification events. */
- public ThreadedListener (Listener listener) {
- this(listener, Executors.newFixedThreadPool(1));
- }
-
- /** Uses the specified threadPool to process notification events. */
- public ThreadedListener (Listener listener, ExecutorService threadPool) {
- super(listener);
- if (threadPool == null) throw new IllegalArgumentException("threadPool cannot be null.");
- this.threadPool = threadPool;
- }
-
- public void queue (Runnable runnable) {
- threadPool.execute(runnable);
- }
- }
-
- /** Delays the notification of the wrapped listener to simulate lag on incoming objects. Notification events are processed on a
- * separate thread after a delay. Note that only incoming objects are delayed. To delay outgoing objects, use a LagListener at
- * the other end of the connection. */
- static public class LagListener extends QueuedListener {
- private final ScheduledExecutorService threadPool;
- private final int lagMillisMin, lagMillisMax;
- final LinkedList runnables = new LinkedList();
-
- public LagListener (int lagMillisMin, int lagMillisMax, Listener listener) {
- super(listener);
- this.lagMillisMin = lagMillisMin;
- this.lagMillisMax = lagMillisMax;
- threadPool = Executors.newScheduledThreadPool(1);
- }
-
- public void queue (Runnable runnable) {
- synchronized (runnables) {
- runnables.addFirst(runnable);
- }
- int lag = lagMillisMin + (int)(Math.random() * (lagMillisMax - lagMillisMin));
- threadPool.schedule(new Runnable() {
- public void run () {
- Runnable runnable;
- synchronized (runnables) {
- runnable = runnables.removeLast();
- }
- runnable.run();
- }
- }, lag, TimeUnit.MILLISECONDS);
- }
- }
-}
+/* Copyright (c) 2008, Nathan Sweet
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the distribution.
+ * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+ * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
+
+package com.esotericsoftware.kryonet;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static com.esotericsoftware.minlog.Log.*;
+
+/** Used to be notified about connection events. */
+public interface Listener {
+ /** Called when the remote end has been connected. This will be invoked before any objects are received by
+ * {@link #received(Connection, Object)}. This will be invoked on the same thread as {@link Client#update(int)} and
+ * {@link Server#update(int)}. This method should not block for long periods as other network activity will not be processed
+ * until it returns. */
+ public default void connected (Connection connection) {
+ };
+
+ /** Called when the remote end is no longer connected. There is no guarantee as to what thread will invoke this method. */
+ public default void disconnected (Connection connection) {
+ };
+
+ /** Called when an object has been received from the remote end of the connection. This will be invoked on the same thread as
+ * {@link Client#update(int)} and {@link Server#update(int)}. This method should not block for long periods as other network
+ * activity will not be processed until it returns. */
+ public default void received (Connection connection, Object object) {
+ };
+
+ /** Called when the connection is below the {@link Connection#setIdleThreshold(float) idle threshold}. */
+ public default void idle (Connection connection) {
+ };
+
+ /** Uses reflection to called "received(Connection, XXX)" on the listener, where XXX is the received object type. Note this
+ * class uses a HashMap lookup and (cached) reflection, so is not as efficient as writing a series of "instanceof" statements. */
+ static public class ReflectionListener implements Listener {
+ private final HashMap classToMethod = new HashMap();
+
+ public void received (Connection connection, Object object) {
+ Class type = object.getClass();
+ Method method = classToMethod.get(type);
+ if (method == null) {
+ if (classToMethod.containsKey(type)) return; // Only fail on the first attempt to find the method.
+ try {
+ method = getClass().getMethod("received", new Class[] {Connection.class, type});
+ method.setAccessible(true);
+ } catch (SecurityException ex) {
+ if (ERROR) error("kryonet", "Unable to access method: received(Connection, " + type.getName() + ")", ex);
+ return;
+ } catch (NoSuchMethodException ex) {
+ if (DEBUG)
+ debug("kryonet",
+ "Unable to find listener method: " + getClass().getName() + "#received(Connection, " + type.getName() + ")");
+ return;
+ } finally {
+ classToMethod.put(type, method);
+ }
+ }
+ try {
+ method.invoke(this, connection, object);
+ } catch (Throwable ex) {
+ if (ex instanceof InvocationTargetException && ex.getCause() != null) ex = ex.getCause();
+ if (ex instanceof RuntimeException) throw (RuntimeException)ex;
+ throw new RuntimeException("Error invoking method: " + getClass().getName() + "#received(Connection, "
+ + type.getName() + ")", ex);
+ }
+ }
+ }
+
+ /** Wraps a listener and queues notifications as {@link Runnable runnables}. This allows the runnables to be processed on a
+ * different thread, preventing the connection's update thread from being blocked. */
+ static public abstract class QueuedListener implements Listener {
+ final Listener listener;
+
+ public QueuedListener (Listener listener) {
+ if (listener == null) throw new IllegalArgumentException("listener cannot be null.");
+ this.listener = listener;
+ }
+
+ public void connected (final Connection connection) {
+ queue(new Runnable() {
+ public void run () {
+ listener.connected(connection);
+ }
+ });
+ }
+
+ public void disconnected (final Connection connection) {
+ queue(new Runnable() {
+ public void run () {
+ listener.disconnected(connection);
+ }
+ });
+ }
+
+ public void received (final Connection connection, final Object object) {
+ queue(new Runnable() {
+ public void run () {
+ listener.received(connection, object);
+ }
+ });
+ }
+
+ public void idle (final Connection connection) {
+ queue(new Runnable() {
+ public void run () {
+ listener.idle(connection);
+ }
+ });
+ }
+
+ abstract protected void queue (Runnable runnable);
+ }
+
+ /** Wraps a listener and processes notification events on a separate thread. */
+ static public class ThreadedListener extends QueuedListener {
+ protected final ExecutorService threadPool;
+
+ /** Creates a single thread to process notification events. */
+ public ThreadedListener (Listener listener) {
+ this(listener, Executors.newFixedThreadPool(1));
+ }
+
+ /** Uses the specified threadPool to process notification events. */
+ public ThreadedListener (Listener listener, ExecutorService threadPool) {
+ super(listener);
+ if (threadPool == null) throw new IllegalArgumentException("threadPool cannot be null.");
+ this.threadPool = threadPool;
+ }
+
+ public void queue (Runnable runnable) {
+ threadPool.execute(runnable);
+ }
+ }
+
+ /** Delays the notification of the wrapped listener to simulate lag on incoming objects. Notification events are processed on a
+ * separate thread after a delay. Note that only incoming objects are delayed. To delay outgoing objects, use a LagListener at
+ * the other end of the connection. */
+ static public class LagListener extends QueuedListener {
+ private final ScheduledExecutorService threadPool;
+ private final int lagMillisMin, lagMillisMax;
+ final LinkedList runnables = new LinkedList();
+
+ public LagListener (int lagMillisMin, int lagMillisMax, Listener listener) {
+ super(listener);
+ this.lagMillisMin = lagMillisMin;
+ this.lagMillisMax = lagMillisMax;
+ threadPool = Executors.newScheduledThreadPool(1);
+ }
+
+ public void queue (Runnable runnable) {
+ synchronized (runnables) {
+ runnables.addFirst(runnable);
+ }
+ int lag = lagMillisMin + (int)(Math.random() * (lagMillisMax - lagMillisMin));
+ threadPool.schedule(new Runnable() {
+ public void run () {
+ Runnable runnable;
+ synchronized (runnables) {
+ runnable = runnables.removeLast();
+ }
+ runnable.run();
+ }
+ }, lag, TimeUnit.MILLISECONDS);
+ }
+ }
+}
diff --git a/src/com/esotericsoftware/kryonet/Server.java b/src/com/esotericsoftware/kryonet/Server.java
index 3bdd7f43..74d1016e 100644
--- a/src/com/esotericsoftware/kryonet/Server.java
+++ b/src/com/esotericsoftware/kryonet/Server.java
@@ -1,601 +1,602 @@
-/* Copyright (c) 2008, Nathan Sweet
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
- * conditions are met:
- *
- * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials provided with the distribution.
- * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
- * from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
- * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
- * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
-
-package com.esotericsoftware.kryonet;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.Set;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.util.IntMap;
-import com.esotericsoftware.kryonet.FrameworkMessage.DiscoverHost;
-import com.esotericsoftware.kryonet.FrameworkMessage.RegisterTCP;
-import com.esotericsoftware.kryonet.FrameworkMessage.RegisterUDP;
-
-import static com.esotericsoftware.minlog.Log.*;
-
-/** Manages TCP and optionally UDP connections from many {@link Client Clients}.
- * @author Nathan Sweet */
-public class Server implements EndPoint {
- private final Serialization serialization;
- private final int writeBufferSize, objectBufferSize;
- private final Selector selector;
- private int emptySelects;
- private ServerSocketChannel serverChannel;
- private UdpConnection udp;
- private Connection[] connections = {};
- private IntMap pendingConnections = new IntMap();
- Listener[] listeners = {};
- private Object listenerLock = new Object();
- private int nextConnectionID = 1;
- private volatile boolean shutdown;
- private Object updateLock = new Object();
- private Thread updateThread;
- private ServerDiscoveryHandler discoveryHandler;
-
- private Listener dispatchListener = new Listener() {
- public void connected (Connection connection) {
- Listener[] listeners = Server.this.listeners;
- for (int i = 0, n = listeners.length; i < n; i++)
- listeners[i].connected(connection);
- }
-
- public void disconnected (Connection connection) {
- removeConnection(connection);
- Listener[] listeners = Server.this.listeners;
- for (int i = 0, n = listeners.length; i < n; i++)
- listeners[i].disconnected(connection);
- }
-
- public void received (Connection connection, Object object) {
- Listener[] listeners = Server.this.listeners;
- for (int i = 0, n = listeners.length; i < n; i++)
- listeners[i].received(connection, object);
- }
-
- public void idle (Connection connection) {
- Listener[] listeners = Server.this.listeners;
- for (int i = 0, n = listeners.length; i < n; i++)
- listeners[i].idle(connection);
- }
- };
-
- /** Creates a Server with a write buffer size of 16384 and an object buffer size of 2048. */
- public Server () {
- this(16384, 2048);
- }
-
- /** @param writeBufferSize One buffer of this size is allocated for each connected client. Objects are serialized to the write
- * buffer where the bytes are queued until they can be written to the TCP socket.
- *
- * Normally the socket is writable and the bytes are written immediately. If the socket cannot be written to and
- * enough serialized objects are queued to overflow the buffer, then the connection will be closed.
- *
- * The write buffer should be sized at least as large as the largest object that will be sent, plus some head room to
- * allow for some serialized objects to be queued in case the buffer is temporarily not writable. The amount of head
- * room needed is dependent upon the size of objects being sent and how often they are sent.
- * @param objectBufferSize One (using only TCP) or three (using both TCP and UDP) buffers of this size are allocated. These
- * buffers are used to hold the bytes for a single object graph until it can be sent over the network or
- * deserialized.
- *
- * The object buffers should be sized at least as large as the largest object that will be sent or received. */
- public Server (int writeBufferSize, int objectBufferSize) {
- this(writeBufferSize, objectBufferSize, new KryoSerialization());
- }
-
- public Server (int writeBufferSize, int objectBufferSize, Serialization serialization) {
- this.writeBufferSize = writeBufferSize;
- this.objectBufferSize = objectBufferSize;
-
- this.serialization = serialization;
-
- this.discoveryHandler = ServerDiscoveryHandler.DEFAULT;
-
- try {
- selector = Selector.open();
- } catch (IOException ex) {
- throw new RuntimeException("Error opening selector.", ex);
- }
- }
-
- public void setDiscoveryHandler (ServerDiscoveryHandler newDiscoveryHandler) {
- discoveryHandler = newDiscoveryHandler;
- }
-
- public Serialization getSerialization () {
- return serialization;
- }
-
- public Kryo getKryo () {
- return ((KryoSerialization)serialization).getKryo();
- }
-
- /** Opens a TCP only server.
- * @throws IOException if the server could not be opened. */
- public void bind (int tcpPort) throws IOException {
- bind(new InetSocketAddress(tcpPort), null);
- }
-
- /** Opens a TCP and UDP server.
- * @throws IOException if the server could not be opened. */
- public void bind (int tcpPort, int udpPort) throws IOException {
- bind(new InetSocketAddress(tcpPort), new InetSocketAddress(udpPort));
- }
-
- /** @param udpPort May be null. */
- public void bind (InetSocketAddress tcpPort, InetSocketAddress udpPort) throws IOException {
- close();
- synchronized (updateLock) {
- selector.wakeup();
- try {
- serverChannel = selector.provider().openServerSocketChannel();
- serverChannel.socket().bind(tcpPort);
- serverChannel.configureBlocking(false);
- serverChannel.register(selector, SelectionKey.OP_ACCEPT);
- if (DEBUG) debug("kryonet", "Accepting connections on port: " + tcpPort + "/TCP");
-
- if (udpPort != null) {
- udp = new UdpConnection(serialization, objectBufferSize);
- udp.bind(selector, udpPort);
- if (DEBUG) debug("kryonet", "Accepting connections on port: " + udpPort + "/UDP");
- }
- } catch (IOException ex) {
- close();
- throw ex;
- }
- }
- if (INFO) info("kryonet", "Server opened.");
- }
-
- /** Accepts any new connections and reads or writes any pending data for the current connections.
- * @param timeout Wait for up to the specified milliseconds for a connection to be ready to process. May be zero to return
- * immediately if there are no connections to process. */
- public void update (int timeout) throws IOException {
- updateThread = Thread.currentThread();
- synchronized (updateLock) { // Blocks to avoid a select while the selector is used to bind the server connection.
- }
- long startTime = System.currentTimeMillis();
- int select = 0;
- if (timeout > 0) {
- select = selector.select(timeout);
- } else {
- select = selector.selectNow();
- }
- if (select == 0) {
- emptySelects++;
- if (emptySelects == 100) {
- emptySelects = 0;
- // NIO freaks and returns immediately with 0 sometimes, so try to keep from hogging the CPU.
- long elapsedTime = System.currentTimeMillis() - startTime;
- try {
- if (elapsedTime < 25) Thread.sleep(25 - elapsedTime);
- } catch (InterruptedException ex) {
- }
- }
- } else {
- emptySelects = 0;
- Set keys = selector.selectedKeys();
- synchronized (keys) {
- UdpConnection udp = this.udp;
- outer:
- for (Iterator iter = keys.iterator(); iter.hasNext();) {
- keepAlive();
- SelectionKey selectionKey = iter.next();
- iter.remove();
- Connection fromConnection = (Connection)selectionKey.attachment();
- try {
- int ops = selectionKey.readyOps();
-
- if (fromConnection != null) { // Must be a TCP read or write operation.
- if (udp != null && fromConnection.udpRemoteAddress == null) {
- fromConnection.close();
- continue;
- }
- if ((ops & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
- try {
- while (true) {
- Object object = fromConnection.tcp.readObject(fromConnection);
- if (object == null) break;
- if (DEBUG) {
- String objectString = object == null ? "null" : object.getClass().getSimpleName();
- if (!(object instanceof FrameworkMessage)) {
- debug("kryonet", fromConnection + " received TCP: " + objectString);
- } else if (TRACE) {
- trace("kryonet", fromConnection + " received TCP: " + objectString);
- }
- }
- fromConnection.notifyReceived(object);
- }
- } catch (IOException ex) {
- if (TRACE) {
- trace("kryonet", "Unable to read TCP from: " + fromConnection, ex);
- } else if (DEBUG) {
- debug("kryonet", fromConnection + " update: " + ex.getMessage());
- }
- fromConnection.close();
- } catch (KryoNetException ex) {
- if (ERROR) error("kryonet", "Error reading TCP from connection: " + fromConnection, ex);
- fromConnection.close();
- }
- }
- if ((ops & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
- try {
- fromConnection.tcp.writeOperation();
- } catch (IOException ex) {
- if (TRACE) {
- trace("kryonet", "Unable to write TCP to connection: " + fromConnection, ex);
- } else if (DEBUG) {
- debug("kryonet", fromConnection + " update: " + ex.getMessage());
- }
- fromConnection.close();
- }
- }
- continue;
- }
-
- if ((ops & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
- ServerSocketChannel serverChannel = this.serverChannel;
- if (serverChannel == null) continue;
- try {
- SocketChannel socketChannel = serverChannel.accept();
- if (socketChannel != null) acceptOperation(socketChannel);
- } catch (IOException ex) {
- if (DEBUG) debug("kryonet", "Unable to accept new connection.", ex);
- }
- continue;
- }
-
- // Must be a UDP read operation.
- if (udp == null) {
- selectionKey.channel().close();
- continue;
- }
- InetSocketAddress fromAddress;
- try {
- fromAddress = udp.readFromAddress();
- } catch (IOException ex) {
- if (WARN) warn("kryonet", "Error reading UDP data.", ex);
- continue;
- }
- if (fromAddress == null) continue;
-
- Connection[] connections = this.connections;
- for (int i = 0, n = connections.length; i < n; i++) {
- Connection connection = connections[i];
- if (fromAddress.equals(connection.udpRemoteAddress)) {
- fromConnection = connection;
- break;
- }
- }
-
- Object object;
- try {
- object = udp.readObject(fromConnection);
- } catch (KryoNetException ex) {
- if (WARN) {
- if (fromConnection != null) {
- if (ERROR) error("kryonet", "Error reading UDP from connection: " + fromConnection, ex);
- } else
- warn("kryonet", "Error reading UDP from unregistered address: " + fromAddress, ex);
- }
- continue;
- }
-
- if (object instanceof FrameworkMessage) {
- if (object instanceof RegisterUDP) {
- // Store the fromAddress on the connection and reply over TCP with a RegisterUDP to indicate success.
- int fromConnectionID = ((RegisterUDP)object).connectionID;
- Connection connection = pendingConnections.remove(fromConnectionID);
- if (connection != null) {
- if (connection.udpRemoteAddress != null) continue outer;
- connection.udpRemoteAddress = fromAddress;
- addConnection(connection);
- connection.sendTCP(new RegisterUDP());
- if (DEBUG)
- debug("kryonet", "Port " + udp.datagramChannel.socket().getLocalPort() + "/UDP connected to: "
- + fromAddress);
- connection.notifyConnected();
- continue;
- }
- if (DEBUG)
- debug("kryonet", "Ignoring incoming RegisterUDP with invalid connection ID: " + fromConnectionID);
- continue;
- }
- if (object instanceof DiscoverHost) {
- try {
- boolean responseSent = discoveryHandler
- .onDiscoverHost(udp.datagramChannel, fromAddress, serialization);
- if (DEBUG && responseSent) debug("kryonet", "Responded to host discovery from: " + fromAddress);
- } catch (IOException ex) {
- if (WARN) warn("kryonet", "Error replying to host discovery from: " + fromAddress, ex);
- }
- continue;
- }
- }
-
- if (fromConnection != null) {
- if (DEBUG) {
- String objectString = object == null ? "null" : object.getClass().getSimpleName();
- if (object instanceof FrameworkMessage) {
- if (TRACE) trace("kryonet", fromConnection + " received UDP: " + objectString);
- } else
- debug("kryonet", fromConnection + " received UDP: " + objectString);
- }
- fromConnection.notifyReceived(object);
- continue;
- }
- if (DEBUG) debug("kryonet", "Ignoring UDP from unregistered address: " + fromAddress);
- } catch (CancelledKeyException ex) {
- if (fromConnection != null)
- fromConnection.close();
- else
- selectionKey.channel().close();
- }
- }
- }
- }
- long time = System.currentTimeMillis();
- Connection[] connections = this.connections;
- for (int i = 0, n = connections.length; i < n; i++) {
- Connection connection = connections[i];
- if (connection.tcp.isTimedOut(time)) {
- if (DEBUG) debug("kryonet", connection + " timed out.");
- connection.close();
- } else {
- if (connection.tcp.needsKeepAlive(time)) connection.sendTCP(FrameworkMessage.keepAlive);
- }
- if (connection.isIdle()) connection.notifyIdle();
- }
- }
-
- private void keepAlive () {
- long time = System.currentTimeMillis();
- Connection[] connections = this.connections;
- for (int i = 0, n = connections.length; i < n; i++) {
- Connection connection = connections[i];
- if (connection.tcp.needsKeepAlive(time)) connection.sendTCP(FrameworkMessage.keepAlive);
- }
- }
-
- public void run () {
- if (TRACE) trace("kryonet", "Server thread started.");
- shutdown = false;
- while (!shutdown) {
- try {
- update(250);
- } catch (IOException ex) {
- if (ERROR) error("kryonet", "Error updating server connections.", ex);
- close();
- }
- }
- if (TRACE) trace("kryonet", "Server thread stopped.");
- }
-
- public void start () {
- new Thread(this, "Server").start();
- }
-
- public void stop () {
- if (shutdown) return;
- close();
- if (TRACE) trace("kryonet", "Server thread stopping.");
- shutdown = true;
- }
-
- private void acceptOperation (SocketChannel socketChannel) {
- Connection connection = newConnection();
- connection.initialize(serialization, writeBufferSize, objectBufferSize);
- connection.endPoint = this;
- UdpConnection udp = this.udp;
- if (udp != null) connection.udp = udp;
- try {
- SelectionKey selectionKey = connection.tcp.accept(selector, socketChannel);
- selectionKey.attach(connection);
-
- int id = nextConnectionID++;
- if (nextConnectionID == -1) nextConnectionID = 1;
- connection.id = id;
- connection.setConnected(true);
- connection.addListener(dispatchListener);
-
- if (udp == null)
- addConnection(connection);
- else
- pendingConnections.put(id, connection);
-
- RegisterTCP registerConnection = new RegisterTCP();
- registerConnection.connectionID = id;
- connection.sendTCP(registerConnection);
-
- if (udp == null) connection.notifyConnected();
- } catch (IOException ex) {
- connection.close();
- if (DEBUG) debug("kryonet", "Unable to accept TCP connection.", ex);
- }
- }
-
- /** Allows the connections used by the server to be subclassed. This can be useful for storage per connection without an
- * additional lookup. */
- protected Connection newConnection () {
- return new Connection();
- }
-
- private void addConnection (Connection connection) {
- Connection[] newConnections = new Connection[connections.length + 1];
- newConnections[0] = connection;
- System.arraycopy(connections, 0, newConnections, 1, connections.length);
- connections = newConnections;
- }
-
- void removeConnection (Connection connection) {
- ArrayList temp = new ArrayList(Arrays.asList(connections));
- temp.remove(connection);
- connections = temp.toArray(new Connection[temp.size()]);
-
- pendingConnections.remove(connection.id);
- }
-
- // BOZO - Provide mechanism for sending to multiple clients without serializing multiple times.
-
- public void sendToAllTCP (Object object) {
- Connection[] connections = this.connections;
- for (int i = 0, n = connections.length; i < n; i++) {
- Connection connection = connections[i];
- connection.sendTCP(object);
- }
- }
-
- public void sendToAllExceptTCP (int connectionID, Object object) {
- Connection[] connections = this.connections;
- for (int i = 0, n = connections.length; i < n; i++) {
- Connection connection = connections[i];
- if (connection.id != connectionID) connection.sendTCP(object);
- }
- }
-
- public void sendToTCP (int connectionID, Object object) {
- Connection[] connections = this.connections;
- for (int i = 0, n = connections.length; i < n; i++) {
- Connection connection = connections[i];
- if (connection.id == connectionID) {
- connection.sendTCP(object);
- break;
- }
- }
- }
-
- public void sendToAllUDP (Object object) {
- Connection[] connections = this.connections;
- for (int i = 0, n = connections.length; i < n; i++) {
- Connection connection = connections[i];
- connection.sendUDP(object);
- }
- }
-
- public void sendToAllExceptUDP (int connectionID, Object object) {
- Connection[] connections = this.connections;
- for (int i = 0, n = connections.length; i < n; i++) {
- Connection connection = connections[i];
- if (connection.id != connectionID) connection.sendUDP(object);
- }
- }
-
- public void sendToUDP (int connectionID, Object object) {
- Connection[] connections = this.connections;
- for (int i = 0, n = connections.length; i < n; i++) {
- Connection connection = connections[i];
- if (connection.id == connectionID) {
- connection.sendUDP(object);
- break;
- }
- }
- }
-
- public void addListener (Listener listener) {
- if (listener == null) throw new IllegalArgumentException("listener cannot be null.");
- synchronized (listenerLock) {
- Listener[] listeners = this.listeners;
- int n = listeners.length;
- for (int i = 0; i < n; i++)
- if (listener == listeners[i]) return;
- Listener[] newListeners = new Listener[n + 1];
- newListeners[0] = listener;
- System.arraycopy(listeners, 0, newListeners, 1, n);
- this.listeners = newListeners;
- }
- if (TRACE) trace("kryonet", "Server listener added: " + listener.getClass().getName());
- }
-
- public void removeListener (Listener listener) {
- if (listener == null) throw new IllegalArgumentException("listener cannot be null.");
- synchronized (listenerLock) {
- Listener[] listeners = this.listeners;
- int n = listeners.length;
- Listener[] newListeners = new Listener[n - 1];
- for (int i = 0, ii = 0; i < n; i++) {
- Listener copyListener = listeners[i];
- if (listener == copyListener) continue;
- if (ii == n - 1) return;
- newListeners[ii++] = copyListener;
- }
- this.listeners = newListeners;
- }
- if (TRACE) trace("kryonet", "Server listener removed: " + listener.getClass().getName());
- }
-
- /** Closes all open connections and the server port(s). */
- public void close () {
- Connection[] connections = this.connections;
- if (INFO && connections.length > 0) info("kryonet", "Closing server connections...");
- for (int i = 0, n = connections.length; i < n; i++)
- connections[i].close();
- connections = new Connection[0];
-
- ServerSocketChannel serverChannel = this.serverChannel;
- if (serverChannel != null) {
- try {
- serverChannel.close();
- if (INFO) info("kryonet", "Server closed.");
- } catch (IOException ex) {
- if (DEBUG) debug("kryonet", "Unable to close server.", ex);
- }
- this.serverChannel = null;
- }
-
- UdpConnection udp = this.udp;
- if (udp != null) {
- udp.close();
- this.udp = null;
- }
-
- synchronized (updateLock) { // Blocks to avoid a select while the selector is used to bind the server connection.
- }
- // Select one last time to complete closing the socket.
- selector.wakeup();
- try {
- selector.selectNow();
- } catch (IOException ignored) {
- }
- }
-
- /** Releases the resources used by this server, which may no longer be used. */
- public void dispose () throws IOException {
- close();
- selector.close();
- }
-
- public Thread getUpdateThread () {
- return updateThread;
- }
-
- /** Returns the current connections. The array returned should not be modified. */
- public Connection[] getConnections () {
- return connections;
- }
-}
+/* Copyright (c) 2008, Nathan Sweet
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the distribution.
+ * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+ * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
+
+package com.esotericsoftware.kryonet;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Set;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.util.IntMap;
+import com.esotericsoftware.kryonet.FrameworkMessage.DiscoverHost;
+import com.esotericsoftware.kryonet.FrameworkMessage.RegisterTCP;
+import com.esotericsoftware.kryonet.FrameworkMessage.RegisterUDP;
+
+import static com.esotericsoftware.minlog.Log.*;
+
+/** Manages TCP and optionally UDP connections from many {@link Client Clients}.
+ * @author Nathan Sweet */
+public class Server implements EndPoint {
+ private final Serialization serialization;
+ private final int writeBufferSize, objectBufferSize;
+ private final Selector selector;
+ private int emptySelects;
+ private ServerSocketChannel serverChannel;
+ private UdpConnection udp;
+ private Connection[] connections = {};
+ private IntMap pendingConnections = new IntMap();
+ Listener[] listeners = {};
+ private Object listenerLock = new Object();
+ private int nextConnectionID = 1;
+ private volatile boolean shutdown;
+ private Object updateLock = new Object();
+ private Thread updateThread;
+ private ServerDiscoveryHandler discoveryHandler;
+
+ private Listener dispatchListener = new Listener() {
+ public void connected (Connection connection) {
+ Listener[] listeners = Server.this.listeners;
+ for (int i = 0, n = listeners.length; i < n; i++)
+ listeners[i].connected(connection);
+ }
+
+ public void disconnected (Connection connection) {
+ removeConnection(connection);
+ Listener[] listeners = Server.this.listeners;
+ for (int i = 0, n = listeners.length; i < n; i++)
+ listeners[i].disconnected(connection);
+ }
+
+ public void received (Connection connection, Object object) {
+ Listener[] listeners = Server.this.listeners;
+ for (int i = 0, n = listeners.length; i < n; i++)
+ listeners[i].received(connection, object);
+ }
+
+ public void idle (Connection connection) {
+ Listener[] listeners = Server.this.listeners;
+ for (int i = 0, n = listeners.length; i < n; i++)
+ listeners[i].idle(connection);
+ }
+ };
+
+ /** Creates a Server with a write buffer size of 16384 and an object buffer size of 2048. */
+ public Server () {
+ this(16384, 2048);
+ }
+
+ /** @param writeBufferSize One buffer of this size is allocated for each connected client. Objects are serialized to the write
+ * buffer where the bytes are queued until they can be written to the TCP socket.
+ *
+ * Normally the socket is writable and the bytes are written immediately. If the socket cannot be written to and
+ * enough serialized objects are queued to overflow the buffer, then the connection will be closed.
+ *
+ * The write buffer should be sized at least as large as the largest object that will be sent, plus some head room to
+ * allow for some serialized objects to be queued in case the buffer is temporarily not writable. The amount of head
+ * room needed is dependent upon the size of objects being sent and how often they are sent.
+ * @param objectBufferSize One (using only TCP) or three (using both TCP and UDP) buffers of this size are allocated. These
+ * buffers are used to hold the bytes for a single object graph until it can be sent over the network or
+ * deserialized.
+ *
+ * The object buffers should be sized at least as large as the largest object that will be sent or received. */
+ public Server (int writeBufferSize, int objectBufferSize) {
+ this(writeBufferSize, objectBufferSize, new KryoSerialization());
+ }
+
+ public Server (int writeBufferSize, int objectBufferSize, Serialization serialization) {
+ this.writeBufferSize = writeBufferSize;
+ this.objectBufferSize = objectBufferSize;
+
+ this.serialization = serialization;
+
+ this.discoveryHandler = new ServerDiscoveryHandler() {
+ };
+
+ try {
+ selector = Selector.open();
+ } catch (IOException ex) {
+ throw new RuntimeException("Error opening selector.", ex);
+ }
+ }
+
+ public void setDiscoveryHandler (ServerDiscoveryHandler newDiscoveryHandler) {
+ discoveryHandler = newDiscoveryHandler;
+ }
+
+ public Serialization getSerialization () {
+ return serialization;
+ }
+
+ public Kryo getKryo () {
+ return ((KryoSerialization)serialization).getKryo();
+ }
+
+ /** Opens a TCP only server.
+ * @throws IOException if the server could not be opened. */
+ public void bind (int tcpPort) throws IOException {
+ bind(new InetSocketAddress(tcpPort), null);
+ }
+
+ /** Opens a TCP and UDP server.
+ * @throws IOException if the server could not be opened. */
+ public void bind (int tcpPort, int udpPort) throws IOException {
+ bind(new InetSocketAddress(tcpPort), new InetSocketAddress(udpPort));
+ }
+
+ /** @param udpPort May be null. */
+ public void bind (InetSocketAddress tcpPort, InetSocketAddress udpPort) throws IOException {
+ close();
+ synchronized (updateLock) {
+ selector.wakeup();
+ try {
+ serverChannel = selector.provider().openServerSocketChannel();
+ serverChannel.socket().bind(tcpPort);
+ serverChannel.configureBlocking(false);
+ serverChannel.register(selector, SelectionKey.OP_ACCEPT);
+ if (DEBUG) debug("kryonet", "Accepting connections on port: " + tcpPort + "/TCP");
+
+ if (udpPort != null) {
+ udp = new UdpConnection(serialization, objectBufferSize);
+ udp.bind(selector, udpPort);
+ if (DEBUG) debug("kryonet", "Accepting connections on port: " + udpPort + "/UDP");
+ }
+ } catch (IOException ex) {
+ close();
+ throw ex;
+ }
+ }
+ if (INFO) info("kryonet", "Server opened.");
+ }
+
+ /** Accepts any new connections and reads or writes any pending data for the current connections.
+ * @param timeout Wait for up to the specified milliseconds for a connection to be ready to process. May be zero to return
+ * immediately if there are no connections to process. */
+ public void update (int timeout) throws IOException {
+ updateThread = Thread.currentThread();
+ synchronized (updateLock) { // Blocks to avoid a select while the selector is used to bind the server connection.
+ }
+ long startTime = System.currentTimeMillis();
+ int select = 0;
+ if (timeout > 0) {
+ select = selector.select(timeout);
+ } else {
+ select = selector.selectNow();
+ }
+ if (select == 0) {
+ emptySelects++;
+ if (emptySelects == 100) {
+ emptySelects = 0;
+ // NIO freaks and returns immediately with 0 sometimes, so try to keep from hogging the CPU.
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ try {
+ if (elapsedTime < 25) Thread.sleep(25 - elapsedTime);
+ } catch (InterruptedException ex) {
+ }
+ }
+ } else {
+ emptySelects = 0;
+ Set keys = selector.selectedKeys();
+ synchronized (keys) {
+ UdpConnection udp = this.udp;
+ outer:
+ for (Iterator iter = keys.iterator(); iter.hasNext();) {
+ keepAlive();
+ SelectionKey selectionKey = iter.next();
+ iter.remove();
+ Connection fromConnection = (Connection)selectionKey.attachment();
+ try {
+ int ops = selectionKey.readyOps();
+
+ if (fromConnection != null) { // Must be a TCP read or write operation.
+ if (udp != null && fromConnection.udpRemoteAddress == null) {
+ fromConnection.close();
+ continue;
+ }
+ if ((ops & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
+ try {
+ while (true) {
+ Object object = fromConnection.tcp.readObject(fromConnection);
+ if (object == null) break;
+ if (DEBUG) {
+ String objectString = object == null ? "null" : object.getClass().getSimpleName();
+ if (!(object instanceof FrameworkMessage)) {
+ debug("kryonet", fromConnection + " received TCP: " + objectString);
+ } else if (TRACE) {
+ trace("kryonet", fromConnection + " received TCP: " + objectString);
+ }
+ }
+ fromConnection.notifyReceived(object);
+ }
+ } catch (IOException ex) {
+ if (TRACE) {
+ trace("kryonet", "Unable to read TCP from: " + fromConnection, ex);
+ } else if (DEBUG) {
+ debug("kryonet", fromConnection + " update: " + ex.getMessage());
+ }
+ fromConnection.close();
+ } catch (KryoNetException ex) {
+ if (ERROR) error("kryonet", "Error reading TCP from connection: " + fromConnection, ex);
+ fromConnection.close();
+ }
+ }
+ if ((ops & SelectionKey.OP_WRITE) == SelectionKey.OP_WRITE) {
+ try {
+ fromConnection.tcp.writeOperation();
+ } catch (IOException ex) {
+ if (TRACE) {
+ trace("kryonet", "Unable to write TCP to connection: " + fromConnection, ex);
+ } else if (DEBUG) {
+ debug("kryonet", fromConnection + " update: " + ex.getMessage());
+ }
+ fromConnection.close();
+ }
+ }
+ continue;
+ }
+
+ if ((ops & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
+ ServerSocketChannel serverChannel = this.serverChannel;
+ if (serverChannel == null) continue;
+ try {
+ SocketChannel socketChannel = serverChannel.accept();
+ if (socketChannel != null) acceptOperation(socketChannel);
+ } catch (IOException ex) {
+ if (DEBUG) debug("kryonet", "Unable to accept new connection.", ex);
+ }
+ continue;
+ }
+
+ // Must be a UDP read operation.
+ if (udp == null) {
+ selectionKey.channel().close();
+ continue;
+ }
+ InetSocketAddress fromAddress;
+ try {
+ fromAddress = udp.readFromAddress();
+ } catch (IOException ex) {
+ if (WARN) warn("kryonet", "Error reading UDP data.", ex);
+ continue;
+ }
+ if (fromAddress == null) continue;
+
+ Connection[] connections = this.connections;
+ for (int i = 0, n = connections.length; i < n; i++) {
+ Connection connection = connections[i];
+ if (fromAddress.equals(connection.udpRemoteAddress)) {
+ fromConnection = connection;
+ break;
+ }
+ }
+
+ Object object;
+ try {
+ object = udp.readObject(fromConnection);
+ } catch (KryoNetException ex) {
+ if (WARN) {
+ if (fromConnection != null) {
+ if (ERROR) error("kryonet", "Error reading UDP from connection: " + fromConnection, ex);
+ } else
+ warn("kryonet", "Error reading UDP from unregistered address: " + fromAddress, ex);
+ }
+ continue;
+ }
+
+ if (object instanceof FrameworkMessage) {
+ if (object instanceof RegisterUDP) {
+ // Store the fromAddress on the connection and reply over TCP with a RegisterUDP to indicate success.
+ int fromConnectionID = ((RegisterUDP)object).connectionID;
+ Connection connection = pendingConnections.remove(fromConnectionID);
+ if (connection != null) {
+ if (connection.udpRemoteAddress != null) continue outer;
+ connection.udpRemoteAddress = fromAddress;
+ addConnection(connection);
+ connection.sendTCP(new RegisterUDP());
+ if (DEBUG)
+ debug("kryonet", "Port " + udp.datagramChannel.socket().getLocalPort() + "/UDP connected to: "
+ + fromAddress);
+ connection.notifyConnected();
+ continue;
+ }
+ if (DEBUG)
+ debug("kryonet", "Ignoring incoming RegisterUDP with invalid connection ID: " + fromConnectionID);
+ continue;
+ }
+ if (object instanceof DiscoverHost) {
+ try {
+ boolean responseSent = discoveryHandler
+ .onDiscoverHost(udp.datagramChannel, fromAddress, serialization);
+ if (DEBUG && responseSent) debug("kryonet", "Responded to host discovery from: " + fromAddress);
+ } catch (IOException ex) {
+ if (WARN) warn("kryonet", "Error replying to host discovery from: " + fromAddress, ex);
+ }
+ continue;
+ }
+ }
+
+ if (fromConnection != null) {
+ if (DEBUG) {
+ String objectString = object == null ? "null" : object.getClass().getSimpleName();
+ if (object instanceof FrameworkMessage) {
+ if (TRACE) trace("kryonet", fromConnection + " received UDP: " + objectString);
+ } else
+ debug("kryonet", fromConnection + " received UDP: " + objectString);
+ }
+ fromConnection.notifyReceived(object);
+ continue;
+ }
+ if (DEBUG) debug("kryonet", "Ignoring UDP from unregistered address: " + fromAddress);
+ } catch (CancelledKeyException ex) {
+ if (fromConnection != null)
+ fromConnection.close();
+ else
+ selectionKey.channel().close();
+ }
+ }
+ }
+ }
+ long time = System.currentTimeMillis();
+ Connection[] connections = this.connections;
+ for (int i = 0, n = connections.length; i < n; i++) {
+ Connection connection = connections[i];
+ if (connection.tcp.isTimedOut(time)) {
+ if (DEBUG) debug("kryonet", connection + " timed out.");
+ connection.close();
+ } else {
+ if (connection.tcp.needsKeepAlive(time)) connection.sendTCP(FrameworkMessage.keepAlive);
+ }
+ if (connection.isIdle()) connection.notifyIdle();
+ }
+ }
+
+ private void keepAlive () {
+ long time = System.currentTimeMillis();
+ Connection[] connections = this.connections;
+ for (int i = 0, n = connections.length; i < n; i++) {
+ Connection connection = connections[i];
+ if (connection.tcp.needsKeepAlive(time)) connection.sendTCP(FrameworkMessage.keepAlive);
+ }
+ }
+
+ public void run () {
+ if (TRACE) trace("kryonet", "Server thread started.");
+ shutdown = false;
+ while (!shutdown) {
+ try {
+ update(250);
+ } catch (IOException ex) {
+ if (ERROR) error("kryonet", "Error updating server connections.", ex);
+ close();
+ }
+ }
+ if (TRACE) trace("kryonet", "Server thread stopped.");
+ }
+
+ public void start () {
+ new Thread(this, "Server").start();
+ }
+
+ public void stop () {
+ if (shutdown) return;
+ close();
+ if (TRACE) trace("kryonet", "Server thread stopping.");
+ shutdown = true;
+ }
+
+ private void acceptOperation (SocketChannel socketChannel) {
+ Connection connection = newConnection();
+ connection.initialize(serialization, writeBufferSize, objectBufferSize);
+ connection.endPoint = this;
+ UdpConnection udp = this.udp;
+ if (udp != null) connection.udp = udp;
+ try {
+ SelectionKey selectionKey = connection.tcp.accept(selector, socketChannel);
+ selectionKey.attach(connection);
+
+ int id = nextConnectionID++;
+ if (nextConnectionID == -1) nextConnectionID = 1;
+ connection.id = id;
+ connection.setConnected(true);
+ connection.addListener(dispatchListener);
+
+ if (udp == null)
+ addConnection(connection);
+ else
+ pendingConnections.put(id, connection);
+
+ RegisterTCP registerConnection = new RegisterTCP();
+ registerConnection.connectionID = id;
+ connection.sendTCP(registerConnection);
+
+ if (udp == null) connection.notifyConnected();
+ } catch (IOException ex) {
+ connection.close();
+ if (DEBUG) debug("kryonet", "Unable to accept TCP connection.", ex);
+ }
+ }
+
+ /** Allows the connections used by the server to be subclassed. This can be useful for storage per connection without an
+ * additional lookup. */
+ protected Connection newConnection () {
+ return new Connection();
+ }
+
+ private void addConnection (Connection connection) {
+ Connection[] newConnections = new Connection[connections.length + 1];
+ newConnections[0] = connection;
+ System.arraycopy(connections, 0, newConnections, 1, connections.length);
+ connections = newConnections;
+ }
+
+ void removeConnection (Connection connection) {
+ ArrayList temp = new ArrayList(Arrays.asList(connections));
+ temp.remove(connection);
+ connections = temp.toArray(new Connection[temp.size()]);
+
+ pendingConnections.remove(connection.id);
+ }
+
+ // BOZO - Provide mechanism for sending to multiple clients without serializing multiple times.
+
+ public void sendToAllTCP (Object object) {
+ Connection[] connections = this.connections;
+ for (int i = 0, n = connections.length; i < n; i++) {
+ Connection connection = connections[i];
+ connection.sendTCP(object);
+ }
+ }
+
+ public void sendToAllExceptTCP (int connectionID, Object object) {
+ Connection[] connections = this.connections;
+ for (int i = 0, n = connections.length; i < n; i++) {
+ Connection connection = connections[i];
+ if (connection.id != connectionID) connection.sendTCP(object);
+ }
+ }
+
+ public void sendToTCP (int connectionID, Object object) {
+ Connection[] connections = this.connections;
+ for (int i = 0, n = connections.length; i < n; i++) {
+ Connection connection = connections[i];
+ if (connection.id == connectionID) {
+ connection.sendTCP(object);
+ break;
+ }
+ }
+ }
+
+ public void sendToAllUDP (Object object) {
+ Connection[] connections = this.connections;
+ for (int i = 0, n = connections.length; i < n; i++) {
+ Connection connection = connections[i];
+ connection.sendUDP(object);
+ }
+ }
+
+ public void sendToAllExceptUDP (int connectionID, Object object) {
+ Connection[] connections = this.connections;
+ for (int i = 0, n = connections.length; i < n; i++) {
+ Connection connection = connections[i];
+ if (connection.id != connectionID) connection.sendUDP(object);
+ }
+ }
+
+ public void sendToUDP (int connectionID, Object object) {
+ Connection[] connections = this.connections;
+ for (int i = 0, n = connections.length; i < n; i++) {
+ Connection connection = connections[i];
+ if (connection.id == connectionID) {
+ connection.sendUDP(object);
+ break;
+ }
+ }
+ }
+
+ public void addListener (Listener listener) {
+ if (listener == null) throw new IllegalArgumentException("listener cannot be null.");
+ synchronized (listenerLock) {
+ Listener[] listeners = this.listeners;
+ int n = listeners.length;
+ for (int i = 0; i < n; i++)
+ if (listener == listeners[i]) return;
+ Listener[] newListeners = new Listener[n + 1];
+ newListeners[0] = listener;
+ System.arraycopy(listeners, 0, newListeners, 1, n);
+ this.listeners = newListeners;
+ }
+ if (TRACE) trace("kryonet", "Server listener added: " + listener.getClass().getName());
+ }
+
+ public void removeListener (Listener listener) {
+ if (listener == null) throw new IllegalArgumentException("listener cannot be null.");
+ synchronized (listenerLock) {
+ Listener[] listeners = this.listeners;
+ int n = listeners.length;
+ Listener[] newListeners = new Listener[n - 1];
+ for (int i = 0, ii = 0; i < n; i++) {
+ Listener copyListener = listeners[i];
+ if (listener == copyListener) continue;
+ if (ii == n - 1) return;
+ newListeners[ii++] = copyListener;
+ }
+ this.listeners = newListeners;
+ }
+ if (TRACE) trace("kryonet", "Server listener removed: " + listener.getClass().getName());
+ }
+
+ /** Closes all open connections and the server port(s). */
+ public void close () {
+ Connection[] connections = this.connections;
+ if (INFO && connections.length > 0) info("kryonet", "Closing server connections...");
+ for (int i = 0, n = connections.length; i < n; i++)
+ connections[i].close();
+ connections = new Connection[0];
+
+ ServerSocketChannel serverChannel = this.serverChannel;
+ if (serverChannel != null) {
+ try {
+ serverChannel.close();
+ if (INFO) info("kryonet", "Server closed.");
+ } catch (IOException ex) {
+ if (DEBUG) debug("kryonet", "Unable to close server.", ex);
+ }
+ this.serverChannel = null;
+ }
+
+ UdpConnection udp = this.udp;
+ if (udp != null) {
+ udp.close();
+ this.udp = null;
+ }
+
+ synchronized (updateLock) { // Blocks to avoid a select while the selector is used to bind the server connection.
+ }
+ // Select one last time to complete closing the socket.
+ selector.wakeup();
+ try {
+ selector.selectNow();
+ } catch (IOException ignored) {
+ }
+ }
+
+ /** Releases the resources used by this server, which may no longer be used. */
+ public void dispose () throws IOException {
+ close();
+ selector.close();
+ }
+
+ public Thread getUpdateThread () {
+ return updateThread;
+ }
+
+ /** Returns the current connections. The array returned should not be modified. */
+ public Connection[] getConnections () {
+ return connections;
+ }
+}
diff --git a/src/com/esotericsoftware/kryonet/ServerDiscoveryHandler.java b/src/com/esotericsoftware/kryonet/ServerDiscoveryHandler.java
index 42fa6864..be7ea98c 100644
--- a/src/com/esotericsoftware/kryonet/ServerDiscoveryHandler.java
+++ b/src/com/esotericsoftware/kryonet/ServerDiscoveryHandler.java
@@ -1,15 +1,15 @@
/* Copyright (c) 2008, Nathan Sweet
* All rights reserved.
- *
+ *
* Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
* conditions are met:
- *
+ *
* - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
* - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
* disclaimer in the documentation and/or other materials provided with the distribution.
* - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
* from this software without specific prior written permission.
- *
+ *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
* BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
* SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
@@ -27,24 +27,15 @@
import com.esotericsoftware.kryonet.FrameworkMessage.DiscoverHost;
public interface ServerDiscoveryHandler {
- /** This implementation of {@link ServerDiscoveryHandler} is responsible for providing the {@link Server} with it's default
- * behavior. */
- public static final ServerDiscoveryHandler DEFAULT = new ServerDiscoveryHandler() {
- private ByteBuffer emptyBuffer = ByteBuffer.allocate(0);
-
- @Override
- public boolean onDiscoverHost (DatagramChannel datagramChannel, InetSocketAddress fromAddress, Serialization serialization)
- throws IOException {
- datagramChannel.send(emptyBuffer, fromAddress);
- return true;
- }
- };
/** Called when the {@link Server} receives a {@link DiscoverHost} packet.
* @param fromAddress {@link InetSocketAddress} the {@link DiscoverHost} came from
* @param serialization the {@link Server}'s {@link Serialization} instance
* @return true if a response was sent to {@code fromAddress}, false otherwise
* @throws IOException from the use of {@link DatagramChannel#send(ByteBuffer, java.net.SocketAddress)} */
- public boolean onDiscoverHost (DatagramChannel datagramChannel, InetSocketAddress fromAddress, Serialization serialization)
- throws IOException;
+ public default boolean onDiscoverHost (DatagramChannel datagramChannel, InetSocketAddress fromAddress, Serialization serialization)
+ throws IOException {
+ datagramChannel.send(ByteBuffer.allocate(0), fromAddress);
+ return true;
+ };
}
diff --git a/src/com/esotericsoftware/kryonet/UdpConnection.java b/src/com/esotericsoftware/kryonet/UdpConnection.java
index 732a4f07..64a423d7 100644
--- a/src/com/esotericsoftware/kryonet/UdpConnection.java
+++ b/src/com/esotericsoftware/kryonet/UdpConnection.java
@@ -91,7 +91,10 @@ public InetSocketAddress readFromAddress () throws IOException {
DatagramChannel datagramChannel = this.datagramChannel;
if (datagramChannel == null) throw new SocketException("Connection is closed.");
lastCommunicationTime = System.currentTimeMillis();
- return (InetSocketAddress)datagramChannel.receive(readBuffer);
+ if(!datagramChannel.isConnected())
+ return (InetSocketAddress)datagramChannel.receive(readBuffer); // always null on Android >= 5.0
+ datagramChannel.read(readBuffer);
+ return connectedAddress;
}
public Object readObject (Connection connection) {
diff --git a/src/com/esotericsoftware/kryonet/util/TcpIdleSender.java b/src/com/esotericsoftware/kryonet/util/TcpIdleSender.java
index fc43cb31..f9bbb911 100644
--- a/src/com/esotericsoftware/kryonet/util/TcpIdleSender.java
+++ b/src/com/esotericsoftware/kryonet/util/TcpIdleSender.java
@@ -1,47 +1,47 @@
-/* Copyright (c) 2008, Nathan Sweet
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
- * conditions are met:
- *
- * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials provided with the distribution.
- * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
- * from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
- * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
- * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
-
-package com.esotericsoftware.kryonet.util;
-
-import com.esotericsoftware.kryonet.Connection;
-import com.esotericsoftware.kryonet.Listener;
-
-abstract public class TcpIdleSender extends Listener {
- boolean started;
-
- public void idle (Connection connection) {
- if (!started) {
- started = true;
- start();
- }
- Object object = next();
- if (object == null)
- connection.removeListener(this);
- else
- connection.sendTCP(object);
- }
-
- /** Called once, before the first send. Subclasses can override this method to send something so the receiving side expects
- * subsequent objects. */
- protected void start () {
- }
-
- /** Returns the next object to send, or null if no more objects will be sent. */
- abstract protected Object next ();
-}
+/* Copyright (c) 2008, Nathan Sweet
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the distribution.
+ * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+ * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
+
+package com.esotericsoftware.kryonet.util;
+
+import com.esotericsoftware.kryonet.Connection;
+import com.esotericsoftware.kryonet.Listener;
+
+abstract public class TcpIdleSender implements Listener {
+ boolean started;
+
+ public void idle (Connection connection) {
+ if (!started) {
+ started = true;
+ start();
+ }
+ Object object = next();
+ if (object == null)
+ connection.removeListener(this);
+ else
+ connection.sendTCP(object);
+ }
+
+ /** Called once, before the first send. Subclasses can override this method to send something so the receiving side expects
+ * subsequent objects. */
+ protected void start () {
+ }
+
+ /** Returns the next object to send, or null if no more objects will be sent. */
+ abstract protected Object next ();
+}
diff --git a/test/com/esotericsoftware/kryonet/DiscoverHostTest.java b/test/com/esotericsoftware/kryonet/DiscoverHostTest.java
index 3934d540..6bd0aba8 100644
--- a/test/com/esotericsoftware/kryonet/DiscoverHostTest.java
+++ b/test/com/esotericsoftware/kryonet/DiscoverHostTest.java
@@ -1,179 +1,175 @@
-/* Copyright (c) 2008, Nathan Sweet
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
- * conditions are met:
- *
- * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials provided with the distribution.
- * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
- * from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
- * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
- * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
-
-package com.esotericsoftware.kryonet;
-
-import static com.esotericsoftware.minlog.Log.*;
-
-import java.io.IOException;
-import java.net.DatagramPacket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.DatagramChannel;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-
-public class DiscoverHostTest extends KryoNetTestCase {
-
- public void testBroadcast () throws IOException {
- // This server exists solely to reply to Client#discoverHost.
- // It wouldn't be needed if the real server was using UDP.
- final Server broadcastServer = new Server();
- startEndPoint(broadcastServer);
- broadcastServer.bind(0, udpPort);
-
- final Server server = new Server();
- startEndPoint(server);
- server.bind(54555);
- server.addListener(new Listener() {
- public void disconnected (Connection connection) {
- broadcastServer.stop();
- server.stop();
- }
- });
-
- // ----
-
- Client client = new Client();
- InetAddress host = client.discoverHost(udpPort, 2000);
- if (host == null) {
- stopEndPoints();
- fail("No servers found.");
- return;
- }
-
- startEndPoint(client);
- client.connect(2000, host, tcpPort);
- client.stop();
-
- waitForThreads();
- }
-
- public void testCustomBroadcast () throws IOException {
-
- ServerDiscoveryHandler serverDiscoveryHandler = new ServerDiscoveryHandler() {
- @Override
- public boolean onDiscoverHost (DatagramChannel datagramChannel, InetSocketAddress fromAddress,
- Serialization serialization) throws IOException {
-
- DiscoveryResponsePacket packet = new DiscoveryResponsePacket();
- packet.id = 42;
- packet.gameName = "gameName";
- packet.playerName = "playerName";
-
- ByteBuffer buffer = ByteBuffer.allocate(256);
- serialization.write(null, buffer, packet);
- buffer.flip();
-
- datagramChannel.send(buffer, fromAddress);
-
- return true;
- }
- };
-
- ClientDiscoveryHandler clientDiscoveryHandler = new ClientDiscoveryHandler() {
- private Input input = null;
-
- @Override
- public DatagramPacket onRequestNewDatagramPacket () {
- byte[] buffer = new byte[1024];
- input = new Input(buffer);
- return new DatagramPacket(buffer, buffer.length);
- }
-
- @Override
- public void onDiscoveredHost (DatagramPacket datagramPacket, Kryo kryo) {
- if (input != null) {
- DiscoveryResponsePacket packet;
- packet = (DiscoveryResponsePacket)kryo.readClassAndObject(input);
- info("test", "packet.id = " + packet.id);
- info("test", "packet.gameName = " + packet.gameName);
- info("test", "packet.playerName = " + packet.playerName);
- info("test", "datagramPacket.getAddress() = " + datagramPacket.getAddress());
- info("test", "datagramPacket.getPort() = " + datagramPacket.getPort());
- assertEquals(42, packet.id);
- assertEquals("gameName", packet.gameName);
- assertEquals("playerName", packet.playerName);
- assertEquals(udpPort, datagramPacket.getPort());
- }
- }
-
- @Override
- public void onFinally () {
- if (input != null) {
- input.close();
- }
- }
- };
-
- // This server exists solely to reply to Client#discoverHost.
- // It wouldn't be needed if the real server was using UDP.
- final Server broadcastServer = new Server();
-
- broadcastServer.getKryo().register(DiscoveryResponsePacket.class);
- broadcastServer.setDiscoveryHandler(serverDiscoveryHandler);
-
- startEndPoint(broadcastServer);
- broadcastServer.bind(0, udpPort);
-
- final Server server = new Server();
- startEndPoint(server);
- server.bind(54555);
- server.addListener(new Listener() {
- public void disconnected (Connection connection) {
- broadcastServer.stop();
- server.stop();
- }
- });
-
- // ----
-
- Client client = new Client();
-
- client.getKryo().register(DiscoveryResponsePacket.class);
- client.setDiscoveryHandler(clientDiscoveryHandler);
-
- InetAddress host = client.discoverHost(udpPort, 2000);
- if (host == null) {
- stopEndPoints();
- fail("No servers found.");
- return;
- }
-
- startEndPoint(client);
- client.connect(2000, host, tcpPort);
- client.stop();
-
- waitForThreads();
- }
-
- public static class DiscoveryResponsePacket {
-
- public DiscoveryResponsePacket () {
- //
- }
-
- public int id;
- public String gameName;
- public String playerName;
- }
-
-}
+/* Copyright (c) 2008, Nathan Sweet
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the distribution.
+ * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+ * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
+
+package com.esotericsoftware.kryonet;
+
+import static com.esotericsoftware.minlog.Log.*;
+
+import java.io.IOException;
+import java.net.DatagramPacket;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.DatagramChannel;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+
+public class DiscoverHostTest extends KryoNetTestCase {
+
+ public void testBroadcast () throws IOException {
+ // This server exists solely to reply to Client#discoverHost.
+ // It wouldn't be needed if the real server was using UDP.
+ final Server broadcastServer = new Server();
+ startEndPoint(broadcastServer);
+ broadcastServer.bind(0, udpPort);
+
+ final Server server = new Server();
+ startEndPoint(server);
+ server.bind(54555);
+ server.addListener(new Listener() {
+ public void disconnected (Connection connection) {
+ broadcastServer.stop();
+ server.stop();
+ }
+ });
+
+ // ----
+
+ Client client = new Client();
+ InetAddress host = client.discoverHost(udpPort, 2000);
+ if (host == null) {
+ stopEndPoints();
+ fail("No servers found.");
+ return;
+ }
+
+ startEndPoint(client);
+ client.connect(2000, host, tcpPort);
+ client.stop();
+
+ waitForThreads();
+ }
+
+ public void testCustomBroadcast () throws IOException {
+
+ ServerDiscoveryHandler serverDiscoveryHandler = new ServerDiscoveryHandler() {
+ public boolean onDiscoverHost (DatagramChannel datagramChannel, InetSocketAddress fromAddress,
+ Serialization serialization) throws IOException {
+
+ DiscoveryResponsePacket packet = new DiscoveryResponsePacket();
+ packet.id = 42;
+ packet.gameName = "gameName";
+ packet.playerName = "playerName";
+
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ serialization.write(null, buffer, packet);
+ buffer.flip();
+
+ datagramChannel.send(buffer, fromAddress);
+
+ return true;
+ }
+ };
+
+ ClientDiscoveryHandler clientDiscoveryHandler = new ClientDiscoveryHandler() {
+ private Input input = null;
+
+ public DatagramPacket onRequestNewDatagramPacket () {
+ byte[] buffer = new byte[1024];
+ input = new Input(buffer);
+ return new DatagramPacket(buffer, buffer.length);
+ }
+
+ public void onDiscoveredHost (DatagramPacket datagramPacket, Kryo kryo) {
+ if (input != null) {
+ DiscoveryResponsePacket packet;
+ packet = (DiscoveryResponsePacket)kryo.readClassAndObject(input);
+ info("test", "packet.id = " + packet.id);
+ info("test", "packet.gameName = " + packet.gameName);
+ info("test", "packet.playerName = " + packet.playerName);
+ info("test", "datagramPacket.getAddress() = " + datagramPacket.getAddress());
+ info("test", "datagramPacket.getPort() = " + datagramPacket.getPort());
+ assertEquals(42, packet.id);
+ assertEquals("gameName", packet.gameName);
+ assertEquals("playerName", packet.playerName);
+ assertEquals(udpPort, datagramPacket.getPort());
+ }
+ }
+
+ public void onFinally () {
+ if (input != null) {
+ input.close();
+ }
+ }
+ };
+
+ // This server exists solely to reply to Client#discoverHost.
+ // It wouldn't be needed if the real server was using UDP.
+ final Server broadcastServer = new Server();
+
+ broadcastServer.getKryo().register(DiscoveryResponsePacket.class);
+ broadcastServer.setDiscoveryHandler(serverDiscoveryHandler);
+
+ startEndPoint(broadcastServer);
+ broadcastServer.bind(0, udpPort);
+
+ final Server server = new Server();
+ startEndPoint(server);
+ server.bind(54555);
+ server.addListener(new Listener() {
+ public void disconnected (Connection connection) {
+ broadcastServer.stop();
+ server.stop();
+ }
+ });
+
+ // ----
+
+ Client client = new Client();
+
+ client.getKryo().register(DiscoveryResponsePacket.class);
+ client.setDiscoveryHandler(clientDiscoveryHandler);
+
+ InetAddress host = client.discoverHost(udpPort, 2000);
+ if (host == null) {
+ stopEndPoints();
+ fail("No servers found.");
+ return;
+ }
+
+ startEndPoint(client);
+ client.connect(2000, host, tcpPort);
+ client.stop();
+
+ waitForThreads();
+ }
+
+ public static class DiscoveryResponsePacket {
+
+ public DiscoveryResponsePacket () {
+ //
+ }
+
+ public int id;
+ public String gameName;
+ public String playerName;
+ }
+
+}
diff --git a/test/com/esotericsoftware/kryonet/KryoNetBufferUnderflowTest.java b/test/com/esotericsoftware/kryonet/KryoNetBufferUnderflowTest.java
index 622df42a..4f2dcc49 100644
--- a/test/com/esotericsoftware/kryonet/KryoNetBufferUnderflowTest.java
+++ b/test/com/esotericsoftware/kryonet/KryoNetBufferUnderflowTest.java
@@ -1,89 +1,88 @@
-/* Copyright (c) 2008, Nathan Sweet
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
- * conditions are met:
- *
- * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials provided with the distribution.
- * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
- * from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
- * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
- * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
-
-package com.esotericsoftware.kryonet;
-
-import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class KryoNetBufferUnderflowTest {
- public static void main (String[] args) throws IOException, InterruptedException {
- final int port = 7000;
- final int writeBufferSize = 16384;
- final int objectBufferSize = 2048;
- final AtomicBoolean received = new AtomicBoolean();
-
- // Creating server
- final Server server = new Server(writeBufferSize, objectBufferSize);
- server.bind(port);
- server.start();
- System.out.println("Server listening on port " + port);
-
- // Creating client
- final Client client = new Client(writeBufferSize, objectBufferSize);
- client.start();
- client.addListener(new Listener() {
- @Override
- public void received (Connection connection, Object object) {
- if (object instanceof String) {
- System.out.println("Received: " + object);
- received.set(true);
- } else
- System.err.println("Received unexpected object");
- }
- });
- client.connect(5000, "localhost", port);
- System.out.println("Client connected");
-
- // Catching exception
- Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
- @Override
- public void uncaughtException (Thread t, Throwable e) {
- e.printStackTrace();
- received.set(true);
- // Stopping it all
- System.out.println("Stopping client and server");
- client.stop();
- server.stop();
- }
- });
-
- // Sending small messages
- for (int i = 0; i < 5; i++) {
- String smallMessage = "RandomStringUtils.randomAlphanumeric(256)";
- System.out.println("Sending: " + smallMessage);
- received.set(false);
- server.sendToAllTCP(smallMessage);
- while (!received.get()) {
- Thread.sleep(100);
- }
- }
-
- // Sending large message
- String bigMessage = "RandomStringUtils.randomAlphanumeric(532)RandomStringUtils.randomAlphanumeric(532)RandomStringUtils.randomAlphanumeric(532)RandomStringUtils.randomAlphanumeric(532)RandomStringUtils.randomAlphanumeric(532)RandomStringUtils.randomAlphanumeric(532)RandomStringUtils.randomAlphanumeric(532)";
- bigMessage = bigMessage + bigMessage + bigMessage + bigMessage + bigMessage + bigMessage + bigMessage;
- System.out.println("Sending: " + bigMessage);
- received.set(false);
- server.sendToAllTCP(bigMessage);
- while (!received.get()) {
- Thread.sleep(100);
- }
- }
-}
+/* Copyright (c) 2008, Nathan Sweet
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the distribution.
+ * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+ * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
+
+package com.esotericsoftware.kryonet;
+
+import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class KryoNetBufferUnderflowTest {
+ public static void main (String[] args) throws IOException, InterruptedException {
+ final int port = 7000;
+ final int writeBufferSize = 16384;
+ final int objectBufferSize = 2048;
+ final AtomicBoolean received = new AtomicBoolean();
+
+ // Creating server
+ final Server server = new Server(writeBufferSize, objectBufferSize);
+ server.bind(port);
+ server.start();
+ System.out.println("Server listening on port " + port);
+
+ // Creating client
+ final Client client = new Client(writeBufferSize, objectBufferSize);
+ client.start();
+ client.addListener(new Listener() {
+ @Override
+ public void received (Connection connection, Object object) {
+ if (object instanceof String) {
+ System.out.println("Received: " + object);
+ received.set(true);
+ } else
+ System.err.println("Received unexpected object");
+ }
+ });
+ client.connect(5000, "localhost", port);
+ System.out.println("Client connected");
+
+ // Catching exception
+ Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ public void uncaughtException (Thread t, Throwable e) {
+ e.printStackTrace();
+ received.set(true);
+ // Stopping it all
+ System.out.println("Stopping client and server");
+ client.stop();
+ server.stop();
+ }
+ });
+
+ // Sending small messages
+ for (int i = 0; i < 5; i++) {
+ String smallMessage = "RandomStringUtils.randomAlphanumeric(256)";
+ System.out.println("Sending: " + smallMessage);
+ received.set(false);
+ server.sendToAllTCP(smallMessage);
+ while (!received.get()) {
+ Thread.sleep(100);
+ }
+ }
+
+ // Sending large message
+ String bigMessage = "RandomStringUtils.randomAlphanumeric(532)RandomStringUtils.randomAlphanumeric(532)RandomStringUtils.randomAlphanumeric(532)RandomStringUtils.randomAlphanumeric(532)RandomStringUtils.randomAlphanumeric(532)RandomStringUtils.randomAlphanumeric(532)RandomStringUtils.randomAlphanumeric(532)";
+ bigMessage = bigMessage + bigMessage + bigMessage + bigMessage + bigMessage + bigMessage + bigMessage;
+ System.out.println("Sending: " + bigMessage);
+ received.set(false);
+ server.sendToAllTCP(bigMessage);
+ while (!received.get()) {
+ Thread.sleep(100);
+ }
+ }
+}
diff --git a/test/com/esotericsoftware/kryonet/KryoNetTestCase.java b/test/com/esotericsoftware/kryonet/KryoNetTestCase.java
index 036a742b..908b8855 100644
--- a/test/com/esotericsoftware/kryonet/KryoNetTestCase.java
+++ b/test/com/esotericsoftware/kryonet/KryoNetTestCase.java
@@ -1,116 +1,116 @@
-/* Copyright (c) 2008, Nathan Sweet
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
- * conditions are met:
- *
- * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
- * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
- * disclaimer in the documentation and/or other materials provided with the distribution.
- * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
- * from this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
- * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
- * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
- * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
- * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
-
-package com.esotericsoftware.kryonet;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.Timer;
-import java.util.TimerTask;
-
-import junit.framework.TestCase;
-
-import com.esotericsoftware.minlog.Log;
-import com.esotericsoftware.minlog.Log.Logger;
-
-abstract public class KryoNetTestCase extends TestCase {
- static public String host = "localhost";
- static public int tcpPort = 54555, udpPort = 54777;
-
- private ArrayList threads = new ArrayList();
- ArrayList endPoints = new ArrayList();
- private Timer timer;
- boolean fail;
-
- public KryoNetTestCase () {
- // Log.TRACE();
- // Log.DEBUG();
- Log.setLogger(new Logger() {
- public void log (int level, String category, String message, Throwable ex) {
- // if (category == null || category.equals("kryonet")) //
- super.log(level, category, message, ex);
- }
- });
- }
-
- protected void setUp () throws Exception {
- System.out.println("---- " + getClass().getSimpleName());
- timer = new Timer();
- }
-
- protected void tearDown () throws Exception {
- timer.cancel();
- }
-
- public void startEndPoint (EndPoint endPoint) {
- endPoints.add(endPoint);
- Thread thread = new Thread(endPoint, endPoint.getClass().getSimpleName());
- threads.add(thread);
- thread.start();
- }
-
- public void stopEndPoints () {
- stopEndPoints(0);
- }
-
- public void stopEndPoints (int stopAfterMillis) {
- timer.schedule(new TimerTask() {
- public void run () {
- for (EndPoint endPoint : endPoints)
- endPoint.stop();
- endPoints.clear();
- }
- }, stopAfterMillis);
- }
-
- public void waitForThreads (int stopAfterMillis) {
- if (stopAfterMillis > 10000) throw new IllegalArgumentException("stopAfterMillis must be < 10000");
- stopEndPoints(stopAfterMillis);
- waitForThreads();
- }
-
- public void waitForThreads () {
- fail = false;
- TimerTask failTask = new TimerTask() {
- public void run () {
- stopEndPoints();
- fail = true;
- }
- };
- timer.schedule(failTask, 11000);
- while (true) {
- for (Iterator iter = threads.iterator(); iter.hasNext();) {
- Thread thread = (Thread)iter.next();
- if (!thread.isAlive()) iter.remove();
- }
- if (threads.isEmpty()) break;
- try {
- Thread.sleep(100);
- } catch (InterruptedException ignored) {
- }
- }
- failTask.cancel();
- if (fail) fail("Test did not complete in a timely manner.");
- // Give sockets a chance to close before starting the next test.
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignored) {
- }
- }
-}
+/* Copyright (c) 2008, Nathan Sweet
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following
+ * disclaimer in the documentation and/or other materials provided with the distribution.
+ * - Neither the name of Esoteric Software nor the names of its contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING,
+ * BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT
+ * SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */
+
+package com.esotericsoftware.kryonet;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import junit.framework.TestCase;
+
+import com.esotericsoftware.minlog.Log;
+import com.esotericsoftware.minlog.Log.Logger;
+
+abstract public class KryoNetTestCase extends TestCase {
+ static public String host = "localhost";
+ static public int tcpPort = 54555, udpPort = 54777;
+
+ private ArrayList threads = new ArrayList();
+ ArrayList endPoints = new ArrayList();
+ private Timer timer;
+ boolean fail;
+
+ public KryoNetTestCase () {
+ // Log.TRACE();
+ // Log.DEBUG();
+ Log.setLogger(new Logger() {
+ public void log (int level, String category, String message, Throwable ex) {
+ // if (category == null || category.equals("kryonet")) //
+ super.log(level, category, message, ex);
+ }
+ });
+ }
+
+ protected void setUp () throws Exception {
+ System.out.println("---- " + getClass().getSimpleName());
+ timer = new Timer();
+ }
+
+ protected void tearDown () throws Exception {
+ timer.cancel();
+ }
+
+ public void startEndPoint (EndPoint endPoint) {
+ endPoints.add(endPoint);
+ Thread thread = new Thread(endPoint, endPoint.getClass().getSimpleName());
+ threads.add(thread);
+ thread.start();
+ }
+
+ public void stopEndPoints () {
+ stopEndPoints(0);
+ }
+
+ public void stopEndPoints (int stopAfterMillis) {
+ timer.schedule(new TimerTask() {
+ public void run () {
+ for (EndPoint endPoint : endPoints)
+ endPoint.stop();
+ endPoints.clear();
+ }
+ }, stopAfterMillis);
+ }
+
+ public void waitForThreads (int stopAfterMillis) {
+ if (stopAfterMillis > 10000) throw new IllegalArgumentException("stopAfterMillis must be < 10000");
+ stopEndPoints(stopAfterMillis);
+ waitForThreads();
+ }
+
+ public void waitForThreads () {
+ fail = false;
+ TimerTask failTask = new TimerTask() {
+ public void run () {
+ stopEndPoints();
+ fail = true;
+ }
+ };
+ timer.schedule(failTask, 11000);
+ while (true) {
+ for (Iterator iter = threads.iterator(); iter.hasNext();) {
+ Thread thread = (Thread)iter.next();
+ if (!thread.isAlive()) iter.remove();
+ }
+ if (threads.isEmpty()) break;
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException ignored) {
+ }
+ }
+ failTask.cancel();
+ if (fail) fail("Test did not complete in a timely manner.");
+ // Give sockets a chance to close before starting the next test.
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {
+ }
+ }
+}