| /* |
| * Copyright (c) 2011 Oracle and/or its affiliates. All rights reserved. |
| * |
| * Redistribution and use in source and binary forms, with or without |
| * modification, are permitted provided that the following conditions |
| * are met: |
| * |
| * - Redistributions of source code must retain the above copyright |
| * notice, this list of conditions and the following disclaimer. |
| * |
| * - Redistributions in binary form must reproduce the above copyright |
| * notice, this list of conditions and the following disclaimer in the |
| * documentation and/or other materials provided with the distribution. |
| * |
| * - Neither the name of Oracle nor the names of its |
| * contributors may be used to endorse or promote products derived |
| * from this software without specific prior written permission. |
| * |
| * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS |
| * IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, |
| * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR |
| * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR |
| * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
| * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
| * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR |
| * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF |
| * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING |
| * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS |
| * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. |
| */ |
| |
| /* |
| * This source code is provided to illustrate the usage of a given feature |
| * or technique and has been deliberately simplified. Additional steps |
| * required for a production-quality application, such as security checks, |
| * input validation and proper error handling, might not be present in |
| * this sample code. |
| */ |
| |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.AsynchronousSocketChannel; |
| import java.nio.channels.CompletionHandler; |
| import java.util.LinkedList; |
| import java.util.Queue; |
| import java.util.concurrent.atomic.AtomicReference; |
| |
| /** |
| * Client represents a remote connection to the chat server. |
| * It contains methods for reading and writing messages from the |
| * channel. |
| * Messages are considered to be separated by newline, so incomplete |
| * messages are buffered in the {@code Client}. |
| * |
| * All reads and writes are asynchronous and uses the nio2 asynchronous |
| * elements. |
| */ |
| class Client { |
| private final AsynchronousSocketChannel channel; |
| private AtomicReference<ClientReader> reader; |
| private String userName; |
| private final StringBuilder messageBuffer = new StringBuilder(); |
| |
| private final Queue<ByteBuffer> queue = new LinkedList<ByteBuffer>(); |
| private boolean writing = false; |
| |
| public Client(AsynchronousSocketChannel channel, ClientReader reader) { |
| this.channel = channel; |
| this.reader = new AtomicReference<ClientReader>(reader); |
| } |
| |
| /** |
| * Enqueues a write of the buffer to the channel. |
| * The call is asynchronous so the buffer is not safe to modify after |
| * passing the buffer here. |
| * |
| * @param buffer the buffer to send to the channel |
| */ |
| private void writeMessage(final ByteBuffer buffer) { |
| boolean threadShouldWrite = false; |
| |
| synchronized(queue) { |
| queue.add(buffer); |
| // Currently no thread writing, make this thread dispatch a write |
| if (!writing) { |
| writing = true; |
| threadShouldWrite = true; |
| } |
| } |
| |
| if (threadShouldWrite) { |
| writeFromQueue(); |
| } |
| } |
| |
| private void writeFromQueue() { |
| ByteBuffer buffer; |
| |
| synchronized (queue) { |
| buffer = queue.poll(); |
| if (buffer == null) { |
| writing = false; |
| } |
| } |
| |
| // No new data in buffer to write |
| if (writing) { |
| writeBuffer(buffer); |
| } |
| } |
| |
| private void writeBuffer(ByteBuffer buffer) { |
| channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { |
| @Override |
| public void completed(Integer result, ByteBuffer buffer) { |
| if (buffer.hasRemaining()) { |
| channel.write(buffer, buffer, this); |
| } else { |
| // Go back and check if there is new data to write |
| writeFromQueue(); |
| } |
| } |
| |
| @Override |
| public void failed(Throwable exc, ByteBuffer attachment) { |
| } |
| }); |
| } |
| |
| /** |
| * Sends a message |
| * @param string the message |
| */ |
| public void writeStringMessage(String string) { |
| writeMessage(ByteBuffer.wrap(string.getBytes())); |
| } |
| |
| /** |
| * Send a message from a specific client |
| * @param client the message is sent from |
| * @param message to send |
| */ |
| public void writeMessageFrom(Client client, String message) { |
| if (reader.get().acceptsMessages()) { |
| writeStringMessage(client.getUserName() + ": " + message); |
| } |
| } |
| |
| /** |
| * Enqueue a read |
| * @param completionHandler callback on completed read |
| */ |
| public void read(CompletionHandler<Integer, ? super ByteBuffer> completionHandler) { |
| ByteBuffer input = ByteBuffer.allocate(256); |
| if (!channel.isOpen()) { |
| return; |
| } |
| channel.read(input, input, completionHandler); |
| } |
| |
| /** |
| * Closes the channel |
| */ |
| public void close() { |
| try { |
| channel.close(); |
| } catch (IOException e) { |
| e.printStackTrace(); |
| } |
| } |
| |
| /** |
| * Run the current states actions. |
| */ |
| public void run() { |
| reader.get().run(this); |
| } |
| |
| public void setUserName(String userName) { |
| this.userName = userName; |
| } |
| |
| public void setReader(ClientReader reader) { |
| this.reader.set(reader); |
| } |
| |
| public String getUserName() { |
| return userName; |
| } |
| |
| public void appendMessage(String message) { |
| synchronized (messageBuffer) { |
| messageBuffer.append(message); |
| } |
| } |
| |
| /** |
| * @return the next newline separated message in the buffer. null is returned if the buffer |
| * doesn't contain any newline. |
| */ |
| public String nextMessage() { |
| synchronized(messageBuffer) { |
| int nextNewline = messageBuffer.indexOf("\n"); |
| if (nextNewline == -1) { |
| return null; |
| } |
| String message = messageBuffer.substring(0, nextNewline + 1); |
| messageBuffer.delete(0, nextNewline + 1); |
| return message; |
| } |
| } |
| } |