1 /*
2 * Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.FileDescriptor;
29 import java.io.IOException;
30 import java.net.InetAddress;
31 import java.net.InetSocketAddress;
32 import java.net.ProtocolFamily;
33 import java.net.Socket;
34 import java.net.SocketAddress;
35 import java.net.SocketException;
36 import java.net.SocketOption;
37 import java.net.SocketTimeoutException;
38 import java.net.StandardProtocolFamily;
39 import java.net.StandardSocketOptions;
40 import java.nio.ByteBuffer;
41 import java.nio.channels.AlreadyBoundException;
42 import java.nio.channels.AlreadyConnectedException;
43 import java.nio.channels.AsynchronousCloseException;
44 import java.nio.channels.ClosedChannelException;
45 import java.nio.channels.ConnectionPendingException;
46 import java.nio.channels.IllegalBlockingModeException;
47 import java.nio.channels.NoConnectionPendingException;
48 import java.nio.channels.NotYetConnectedException;
49 import java.nio.channels.SelectionKey;
50 import java.nio.channels.SocketChannel;
51 import java.nio.channels.spi.SelectorProvider;
52 import java.util.Collections;
53 import java.util.HashSet;
54 import java.util.Objects;
55 import java.util.Set;
56 import java.util.concurrent.locks.ReentrantLock;
57
58 import sun.net.ConnectionResetException;
59 import sun.net.NetHooks;
60 import sun.net.ext.ExtendedSocketOptions;
61 import sun.net.util.SocketExceptions;
62
63 /**
64 * An implementation of SocketChannels
65 */
66
67 class SocketChannelImpl
68 extends SocketChannel
69 implements SelChImpl
70 {
71 // Used to make native read and write calls
72 private static final NativeDispatcher nd = new SocketDispatcher();
73
74 // Our file descriptor object
75 private final FileDescriptor fd;
76 private final int fdVal;
77
78 // Lock held by current reading or connecting thread
79 private final ReentrantLock readLock = new ReentrantLock();
80
81 // Lock held by current writing or connecting thread
82 private final ReentrantLock writeLock = new ReentrantLock();
83
84 // Lock held by any thread that modifies the state fields declared below
85 // DO NOT invoke a blocking I/O operation while holding this lock!
86 private final Object stateLock = new Object();
87
88 // Input/Output closed
89 private volatile boolean isInputClosed;
90 private volatile boolean isOutputClosed;
91
92 // Connection reset protected by readLock
93 private boolean connectionReset;
94
95 // -- The following fields are protected by stateLock
96
97 // set true when exclusive binding is on and SO_REUSEADDR is emulated
98 private boolean isReuseAddress;
99
100 // State, increases monotonically
101 private static final int ST_UNCONNECTED = 0;
102 private static final int ST_CONNECTIONPENDING = 1;
103 private static final int ST_CONNECTED = 2;
104 private static final int ST_CLOSING = 3;
105 private static final int ST_CLOSED = 4;
106 private volatile int state; // need stateLock to change
107
108 // IDs of native threads doing reads and writes, for signalling
109 private long readerThread;
110 private long writerThread;
111
112 // Binding
113 private InetSocketAddress localAddress;
114 private InetSocketAddress remoteAddress;
115
116 // Socket adaptor, created on demand
117 private Socket socket;
118
119 // -- End of fields protected by stateLock
120
121
122 // Constructor for normal connecting sockets
123 //
124 SocketChannelImpl(SelectorProvider sp) throws IOException {
125 super(sp);
126 this.fd = Net.socket(true);
127 this.fdVal = IOUtil.fdVal(fd);
128 }
129
130 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
131 throws IOException
132 {
133 super(sp);
134 this.fd = fd;
135 this.fdVal = IOUtil.fdVal(fd);
136 if (bound) {
137 synchronized (stateLock) {
138 this.localAddress = Net.localAddress(fd);
139 }
140 }
141 }
142
143 // Constructor for sockets obtained from server sockets
144 //
145 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa)
146 throws IOException
147 {
148 super(sp);
149 this.fd = fd;
150 this.fdVal = IOUtil.fdVal(fd);
151 synchronized (stateLock) {
152 this.localAddress = Net.localAddress(fd);
153 this.remoteAddress = isa;
154 this.state = ST_CONNECTED;
155 }
156 }
157
158 /**
159 * Checks that the channel is open.
160 *
161 * @throws ClosedChannelException if channel is closed (or closing)
162 */
163 private void ensureOpen() throws ClosedChannelException {
164 if (!isOpen())
165 throw new ClosedChannelException();
166 }
167
168 /**
169 * Checks that the channel is open and connected.
170 *
171 * @apiNote This method uses the "state" field to check if the channel is
172 * open. It should never be used in conjuncion with isOpen or ensureOpen
173 * as these methods check AbstractInterruptibleChannel's closed field - that
174 * field is set before implCloseSelectableChannel is called and so before
175 * the state is changed.
176 *
177 * @throws ClosedChannelException if channel is closed (or closing)
178 * @throws NotYetConnectedException if open and not connected
179 */
180 private void ensureOpenAndConnected() throws ClosedChannelException {
181 int state = this.state;
182 if (state < ST_CONNECTED) {
183 throw new NotYetConnectedException();
184 } else if (state > ST_CONNECTED) {
185 throw new ClosedChannelException();
186 }
187 }
188
189 @Override
190 public Socket socket() {
191 synchronized (stateLock) {
192 if (socket == null)
193 socket = SocketAdaptor.create(this);
194 return socket;
195 }
196 }
197
198 @Override
199 public SocketAddress getLocalAddress() throws IOException {
200 synchronized (stateLock) {
201 ensureOpen();
202 return Net.getRevealedLocalAddress(localAddress);
203 }
204 }
205
206 @Override
207 public SocketAddress getRemoteAddress() throws IOException {
208 synchronized (stateLock) {
209 ensureOpen();
210 return remoteAddress;
211 }
212 }
213
214 @Override
215 public <T> SocketChannel setOption(SocketOption<T> name, T value)
216 throws IOException
217 {
218 Objects.requireNonNull(name);
219 if (!supportedOptions().contains(name))
220 throw new UnsupportedOperationException("'" + name + "' not supported");
221 if (!name.type().isInstance(value))
222 throw new IllegalArgumentException("Invalid value '" + value + "'");
223
224 synchronized (stateLock) {
225 ensureOpen();
226
227 if (name == StandardSocketOptions.IP_TOS) {
228 ProtocolFamily family = Net.isIPv6Available() ?
229 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
230 Net.setSocketOption(fd, family, name, value);
231 return this;
232 }
233
234 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
235 // SO_REUSEADDR emulated when using exclusive bind
236 isReuseAddress = (Boolean)value;
237 return this;
238 }
239
240 // no options that require special handling
241 Net.setSocketOption(fd, name, value);
242 return this;
243 }
244 }
245
246 @Override
247 @SuppressWarnings("unchecked")
248 public <T> T getOption(SocketOption<T> name)
249 throws IOException
250 {
251 Objects.requireNonNull(name);
252 if (!supportedOptions().contains(name))
253 throw new UnsupportedOperationException("'" + name + "' not supported");
254
255 synchronized (stateLock) {
256 ensureOpen();
257
258 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
259 // SO_REUSEADDR emulated when using exclusive bind
260 return (T)Boolean.valueOf(isReuseAddress);
261 }
262
263 // special handling for IP_TOS: always return 0 when IPv6
264 if (name == StandardSocketOptions.IP_TOS) {
265 ProtocolFamily family = Net.isIPv6Available() ?
266 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
267 return (T) Net.getSocketOption(fd, family, name);
268 }
269
270 // no options that require special handling
271 return (T) Net.getSocketOption(fd, name);
272 }
273 }
274
275 private static class DefaultOptionsHolder {
276 static final Set<SocketOption<?>> defaultOptions = defaultOptions();
277
278 private static Set<SocketOption<?>> defaultOptions() {
279 HashSet<SocketOption<?>> set = new HashSet<>();
280 set.add(StandardSocketOptions.SO_SNDBUF);
281 set.add(StandardSocketOptions.SO_RCVBUF);
282 set.add(StandardSocketOptions.SO_KEEPALIVE);
283 set.add(StandardSocketOptions.SO_REUSEADDR);
284 if (Net.isReusePortAvailable()) {
285 set.add(StandardSocketOptions.SO_REUSEPORT);
286 }
287 set.add(StandardSocketOptions.SO_LINGER);
288 set.add(StandardSocketOptions.TCP_NODELAY);
289 // additional options required by socket adaptor
290 set.add(StandardSocketOptions.IP_TOS);
291 set.add(ExtendedSocketOption.SO_OOBINLINE);
292 set.addAll(ExtendedSocketOptions.clientSocketOptions());
293 return Collections.unmodifiableSet(set);
294 }
295 }
296
297 @Override
298 public final Set<SocketOption<?>> supportedOptions() {
299 return DefaultOptionsHolder.defaultOptions;
300 }
301
302 /**
303 * Marks the beginning of a read operation that might block.
304 *
305 * @throws ClosedChannelException if the channel is closed
306 * @throws NotYetConnectedException if the channel is not yet connected
307 */
308 private void beginRead(boolean blocking) throws ClosedChannelException {
309 if (blocking) {
310 // set hook for Thread.interrupt
311 begin();
312
313 synchronized (stateLock) {
314 ensureOpenAndConnected();
315 // record thread so it can be signalled if needed
316 readerThread = NativeThread.current();
317 }
318 } else {
319 ensureOpenAndConnected();
320 }
321 }
322
323 /**
324 * Marks the end of a read operation that may have blocked.
325 *
326 * @throws AsynchronousCloseException if the channel was closed due to this
327 * thread being interrupted on a blocking read operation.
328 */
329 private void endRead(boolean blocking, boolean completed)
330 throws AsynchronousCloseException
331 {
332 if (blocking) {
333 synchronized (stateLock) {
334 readerThread = 0;
335 if (state == ST_CLOSING) {
336 tryFinishClose();
337 }
338 }
339 // remove hook for Thread.interrupt
340 end(completed);
341 }
342 }
343
344 private void throwConnectionReset() throws SocketException {
345 throw new SocketException("Connection reset");
346 }
347
348 @Override
349 public int read(ByteBuffer buf) throws IOException {
350 Objects.requireNonNull(buf);
351
352 readLock.lock();
353 try {
354 boolean blocking = isBlocking();
355 int n = 0;
356 try {
357 beginRead(blocking);
358
359 // check if connection has been reset
360 if (connectionReset)
361 throwConnectionReset();
362
363 // check if input is shutdown
364 if (isInputClosed)
365 return IOStatus.EOF;
366
367 n = IOUtil.read(fd, buf, -1, nd);
368 if (blocking) {
369 while (IOStatus.okayToRetry(n) && isOpen()) {
370 park(Net.POLLIN);
371 n = IOUtil.read(fd, buf, -1, nd);
372 }
373 }
374 } catch (ConnectionResetException e) {
375 connectionReset = true;
376 throwConnectionReset();
377 } finally {
378 endRead(blocking, n > 0);
379 if (n <= 0 && isInputClosed)
380 return IOStatus.EOF;
381 }
382 return IOStatus.normalize(n);
383 } finally {
384 readLock.unlock();
385 }
386 }
387
388 @Override
389 public long read(ByteBuffer[] dsts, int offset, int length)
390 throws IOException
391 {
392 Objects.checkFromIndexSize(offset, length, dsts.length);
393
394 readLock.lock();
395 try {
396 boolean blocking = isBlocking();
397 long n = 0;
398 try {
399 beginRead(blocking);
400
401 // check if connection has been reset
402 if (connectionReset)
403 throwConnectionReset();
404
405 // check if input is shutdown
406 if (isInputClosed)
407 return IOStatus.EOF;
408
409 n = IOUtil.read(fd, dsts, offset, length, nd);
410 if (blocking) {
411 while (IOStatus.okayToRetry(n) && isOpen()) {
412 park(Net.POLLIN);
413 n = IOUtil.read(fd, dsts, offset, length, nd);
414 }
415 }
416 } catch (ConnectionResetException e) {
417 connectionReset = true;
418 throwConnectionReset();
419 } finally {
420 endRead(blocking, n > 0);
421 if (n <= 0 && isInputClosed)
422 return IOStatus.EOF;
423 }
424 return IOStatus.normalize(n);
425 } finally {
426 readLock.unlock();
427 }
428 }
429
430 /**
431 * Marks the beginning of a write operation that might block.
432 *
433 * @throws ClosedChannelException if the channel is closed or output shutdown
434 * @throws NotYetConnectedException if the channel is not yet connected
435 */
436 private void beginWrite(boolean blocking) throws ClosedChannelException {
437 if (blocking) {
438 // set hook for Thread.interrupt
439 begin();
440
441 synchronized (stateLock) {
442 ensureOpenAndConnected();
443 if (isOutputClosed)
444 throw new ClosedChannelException();
445 // record thread so it can be signalled if needed
446 writerThread = NativeThread.current();
447 }
448 } else {
449 ensureOpenAndConnected();
450 }
451 }
452
453 /**
454 * Marks the end of a write operation that may have blocked.
455 *
456 * @throws AsynchronousCloseException if the channel was closed due to this
457 * thread being interrupted on a blocking write operation.
458 */
459 private void endWrite(boolean blocking, boolean completed)
460 throws AsynchronousCloseException
461 {
462 if (blocking) {
463 synchronized (stateLock) {
464 writerThread = 0;
465 if (state == ST_CLOSING) {
466 tryFinishClose();
467 }
468 }
469 // remove hook for Thread.interrupt
470 end(completed);
471 }
472 }
473
474 @Override
475 public int write(ByteBuffer buf) throws IOException {
476 Objects.requireNonNull(buf);
477
478 writeLock.lock();
479 try {
480 boolean blocking = isBlocking();
481 int n = 0;
482 try {
483 beginWrite(blocking);
484 n = IOUtil.write(fd, buf, -1, nd);
485 if (blocking) {
486 while (IOStatus.okayToRetry(n) && isOpen()) {
487 park(Net.POLLOUT);
488 n = IOUtil.write(fd, buf, -1, nd);
489 }
490 }
491 } finally {
492 endWrite(blocking, n > 0);
493 if (n <= 0 && isOutputClosed)
494 throw new AsynchronousCloseException();
495 }
496 return IOStatus.normalize(n);
497 } finally {
498 writeLock.unlock();
499 }
500 }
501
502 @Override
503 public long write(ByteBuffer[] srcs, int offset, int length)
504 throws IOException
505 {
506 Objects.checkFromIndexSize(offset, length, srcs.length);
507
508 writeLock.lock();
509 try {
510 boolean blocking = isBlocking();
511 long n = 0;
512 try {
513 beginWrite(blocking);
514 n = IOUtil.write(fd, srcs, offset, length, nd);
515 if (blocking) {
516 while (IOStatus.okayToRetry(n) && isOpen()) {
517 park(Net.POLLOUT);
518 n = IOUtil.write(fd, srcs, offset, length, nd);
519 }
520 }
521 } finally {
522 endWrite(blocking, n > 0);
523 if (n <= 0 && isOutputClosed)
524 throw new AsynchronousCloseException();
525 }
526 return IOStatus.normalize(n);
527 } finally {
528 writeLock.unlock();
529 }
530 }
531
532 /**
533 * Writes a byte of out of band data.
534 */
535 int sendOutOfBandData(byte b) throws IOException {
536 writeLock.lock();
537 try {
538 boolean blocking = isBlocking();
539 int n = 0;
540 try {
541 beginWrite(blocking);
542 if (blocking) {
543 do {
544 n = Net.sendOOB(fd, b);
545 } while (n == IOStatus.INTERRUPTED && isOpen());
546 } else {
547 n = Net.sendOOB(fd, b);
548 }
549 } finally {
550 endWrite(blocking, n > 0);
551 if (n <= 0 && isOutputClosed)
552 throw new AsynchronousCloseException();
553 }
554 return IOStatus.normalize(n);
555 } finally {
556 writeLock.unlock();
557 }
558 }
559
560 @Override
561 protected void implConfigureBlocking(boolean block) throws IOException {
562 readLock.lock();
563 try {
564 writeLock.lock();
565 try {
566 lockedConfigureBlocking(block);
567 } finally {
568 writeLock.unlock();
569 }
570 } finally {
571 readLock.unlock();
572 }
573 }
574
575 /**
576 * Adjust the blocking mode while holding the readLock or writeLock.
577 */
578 private void lockedConfigureBlocking(boolean block) throws IOException {
579 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
580 synchronized (stateLock) {
581 ensureOpen();
582 IOUtil.configureBlocking(fd, block);
583 }
584 }
585
586 /**
587 * Returns the local address, or null if not bound
588 */
589 InetSocketAddress localAddress() {
590 synchronized (stateLock) {
591 return localAddress;
592 }
593 }
594
595 /**
596 * Returns the remote address, or null if not connected
597 */
598 InetSocketAddress remoteAddress() {
599 synchronized (stateLock) {
600 return remoteAddress;
601 }
602 }
603
604 @Override
605 public SocketChannel bind(SocketAddress local) throws IOException {
606 readLock.lock();
607 try {
608 writeLock.lock();
609 try {
610 synchronized (stateLock) {
611 ensureOpen();
612 if (state == ST_CONNECTIONPENDING)
613 throw new ConnectionPendingException();
614 if (localAddress != null)
615 throw new AlreadyBoundException();
616 InetSocketAddress isa = (local == null) ?
617 new InetSocketAddress(0) : Net.checkAddress(local);
618 SecurityManager sm = System.getSecurityManager();
619 if (sm != null) {
620 sm.checkListen(isa.getPort());
621 }
622 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
623 Net.bind(fd, isa.getAddress(), isa.getPort());
624 localAddress = Net.localAddress(fd);
625 }
626 } finally {
627 writeLock.unlock();
628 }
629 } finally {
630 readLock.unlock();
631 }
632 return this;
633 }
634
635 @Override
636 public boolean isConnected() {
637 return (state == ST_CONNECTED);
638 }
639
640 @Override
641 public boolean isConnectionPending() {
642 return (state == ST_CONNECTIONPENDING);
643 }
644
645 /**
646 * Marks the beginning of a connect operation that might block.
647 * @param blocking true if configured blocking
648 * @param isa the remote address
649 * @throws ClosedChannelException if the channel is closed
650 * @throws AlreadyConnectedException if already connected
651 * @throws ConnectionPendingException is a connection is pending
652 * @throws IOException if the pre-connect hook fails
653 */
654 private void beginConnect(boolean blocking, InetSocketAddress isa)
655 throws IOException
656 {
657 if (blocking) {
658 // set hook for Thread.interrupt
659 begin();
660 }
661 synchronized (stateLock) {
662 ensureOpen();
663 int state = this.state;
664 if (state == ST_CONNECTED)
665 throw new AlreadyConnectedException();
666 if (state == ST_CONNECTIONPENDING)
667 throw new ConnectionPendingException();
668 assert state == ST_UNCONNECTED;
669 this.state = ST_CONNECTIONPENDING;
670
671 if (localAddress == null)
672 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
673 remoteAddress = isa;
674
675 if (blocking) {
676 // record thread so it can be signalled if needed
677 readerThread = NativeThread.current();
678 }
679 }
680 }
681
682 /**
683 * Marks the end of a connect operation that may have blocked.
684 *
685 * @throws AsynchronousCloseException if the channel was closed due to this
686 * thread being interrupted on a blocking connect operation.
687 * @throws IOException if completed and unable to obtain the local address
688 */
689 private void endConnect(boolean blocking, boolean completed)
690 throws IOException
691 {
692 endRead(blocking, completed);
693
694 if (completed) {
695 synchronized (stateLock) {
696 if (state == ST_CONNECTIONPENDING) {
697 localAddress = Net.localAddress(fd);
698 state = ST_CONNECTED;
699 }
700 }
701 }
702 }
703
704 /**
705 * Checks the remote address to which this channel is to be connected.
706 */
707 private InetSocketAddress checkRemote(SocketAddress sa) throws IOException {
708 InetSocketAddress isa = Net.checkAddress(sa);
709 SecurityManager sm = System.getSecurityManager();
710 if (sm != null) {
711 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
712 }
713 if (isa.getAddress().isAnyLocalAddress()) {
714 return new InetSocketAddress(InetAddress.getLocalHost(), isa.getPort());
715 } else {
716 return isa;
717 }
718 }
719
720 @Override
721 public boolean connect(SocketAddress remote) throws IOException {
722 InetSocketAddress isa = checkRemote(remote);
723 try {
724 readLock.lock();
725 try {
726 writeLock.lock();
727 try {
728 boolean blocking = isBlocking();
729 boolean connected = false;
730 try {
731 beginConnect(blocking, isa);
732 int n = Net.connect(fd, isa.getAddress(), isa.getPort());
733 if (n > 0) {
734 connected = true;
735 } else if (blocking) {
736 assert IOStatus.okayToRetry(n);
737 boolean polled = false;
738 while (!polled && isOpen()) {
739 park(Net.POLLOUT);
740 polled = Net.pollConnectNow(fd);
741 }
742 connected = polled && isOpen();
743 }
744 } finally {
745 endConnect(blocking, connected);
746 }
747 return connected;
748 } finally {
749 writeLock.unlock();
750 }
751 } finally {
752 readLock.unlock();
753 }
754 } catch (IOException ioe) {
755 // connect failed, close the channel
756 close();
757 throw SocketExceptions.of(ioe, isa);
758 }
759 }
760
761 /**
762 * Marks the beginning of a finishConnect operation that might block.
763 *
764 * @throws ClosedChannelException if the channel is closed
765 * @throws NoConnectionPendingException if no connection is pending
766 */
767 private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
768 if (blocking) {
769 // set hook for Thread.interrupt
770 begin();
771 }
772 synchronized (stateLock) {
773 ensureOpen();
774 if (state != ST_CONNECTIONPENDING)
775 throw new NoConnectionPendingException();
776 if (blocking) {
777 // record thread so it can be signalled if needed
778 readerThread = NativeThread.current();
779 }
780 }
781 }
782
783 /**
784 * Marks the end of a finishConnect operation that may have blocked.
785 *
786 * @throws AsynchronousCloseException if the channel was closed due to this
787 * thread being interrupted on a blocking connect operation.
788 * @throws IOException if completed and unable to obtain the local address
789 */
790 private void endFinishConnect(boolean blocking, boolean completed)
791 throws IOException
792 {
793 endRead(blocking, completed);
794
795 if (completed) {
796 synchronized (stateLock) {
797 if (state == ST_CONNECTIONPENDING) {
798 localAddress = Net.localAddress(fd);
799 state = ST_CONNECTED;
800 }
801 }
802 }
803 }
804
805 @Override
806 public boolean finishConnect() throws IOException {
807 try {
808 readLock.lock();
809 try {
810 writeLock.lock();
811 try {
812 // no-op if already connected
813 if (isConnected())
814 return true;
815
816 boolean blocking = isBlocking();
817 boolean connected = false;
818 try {
819 beginFinishConnect(blocking);
820 boolean polled = Net.pollConnectNow(fd);
821 if (blocking) {
822 while (!polled && isOpen()) {
823 park(Net.POLLOUT);
824 polled = Net.pollConnectNow(fd);
825 }
826 }
827 connected = polled && isOpen();
828 } finally {
829 endFinishConnect(blocking, connected);
830 }
831 assert (blocking && connected) ^ !blocking;
832 return connected;
833 } finally {
834 writeLock.unlock();
835 }
836 } finally {
837 readLock.unlock();
838 }
839 } catch (IOException ioe) {
840 // connect failed, close the channel
841 close();
842 throw SocketExceptions.of(ioe, remoteAddress);
843 }
844 }
845
846 /**
847 * Closes the socket if there are no I/O operations in progress and the
848 * channel is not registered with a Selector.
849 */
850 private boolean tryClose() throws IOException {
851 assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
852 if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) {
853 state = ST_CLOSED;
854 nd.close(fd);
855 return true;
856 } else {
857 return false;
858 }
859 }
860
861 /**
862 * Invokes tryClose to attempt to close the socket.
863 *
864 * This method is used for deferred closing by I/O and Selector operations.
865 */
866 private void tryFinishClose() {
867 try {
868 tryClose();
869 } catch (IOException ignore) { }
870 }
871
872 /**
873 * Closes this channel when configured in blocking mode.
874 *
875 * If there is an I/O operation in progress then the socket is pre-closed
876 * and the I/O threads signalled, in which case the final close is deferred
877 * until all I/O operations complete.
878 *
879 * Note that a channel configured blocking may be registered with a Selector
880 * This arises when a key is canceled and the channel configured to blocking
881 * mode before the key is flushed from the Selector.
882 */
883 private void implCloseBlockingMode() throws IOException {
884 synchronized (stateLock) {
885 assert state < ST_CLOSING;
886 state = ST_CLOSING;
887 if (!tryClose()) {
888 long reader = readerThread;
889 long writer = writerThread;
890 if (reader != 0 || writer != 0) {
891 nd.preClose(fd);
892 if (reader != 0)
893 NativeThread.signal(reader);
894 if (writer != 0)
895 NativeThread.signal(writer);
896 }
897 }
898 }
899 }
900
901 /**
902 * Closes this channel when configured in non-blocking mode.
903 *
904 * If the channel is registered with a Selector then the close is deferred
905 * until the channel is flushed from all Selectors.
906 *
907 * If the socket is connected and the channel is registered with a Selector
908 * then the socket is shutdown for writing so that the peer reads EOF. In
909 * addition, if SO_LINGER is set to a non-zero value then it is disabled so
910 * that the deferred close does not wait.
911 */
912 private void implCloseNonBlockingMode() throws IOException {
913 boolean connected;
914 synchronized (stateLock) {
915 assert state < ST_CLOSING;
916 connected = (state == ST_CONNECTED);
917 state = ST_CLOSING;
918 }
919
920 // wait for any read/write operations to complete
921 readLock.lock();
922 readLock.unlock();
923 writeLock.lock();
924 writeLock.unlock();
925
926 // if the socket cannot be closed because it's registered with a Selector
927 // then shutdown the socket for writing.
928 synchronized (stateLock) {
929 if (state == ST_CLOSING && !tryClose() && connected && isRegistered()) {
930 try {
931 SocketOption<Integer> opt = StandardSocketOptions.SO_LINGER;
932 int interval = (int) Net.getSocketOption(fd, Net.UNSPEC, opt);
933 if (interval != 0) {
934 if (interval > 0) {
935 // disable SO_LINGER
936 Net.setSocketOption(fd, Net.UNSPEC, opt, -1);
937 }
938 Net.shutdown(fd, Net.SHUT_WR);
939 }
940 } catch (IOException ignore) { }
941 }
942 }
943 }
944
945 /**
946 * Invoked by implCloseChannel to close the channel.
947 */
948 @Override
949 protected void implCloseSelectableChannel() throws IOException {
950 assert !isOpen();
951 if (isBlocking()) {
952 implCloseBlockingMode();
953 } else {
954 implCloseNonBlockingMode();
955 }
956 }
957
958 @Override
959 public void kill() {
960 synchronized (stateLock) {
961 if (state == ST_CLOSING) {
962 tryFinishClose();
963 }
964 }
965 }
966
967 @Override
968 public SocketChannel shutdownInput() throws IOException {
969 synchronized (stateLock) {
970 ensureOpen();
971 if (!isConnected())
972 throw new NotYetConnectedException();
973 if (!isInputClosed) {
974 Net.shutdown(fd, Net.SHUT_RD);
975 long thread = readerThread;
976 if (thread != 0)
977 NativeThread.signal(thread);
978 isInputClosed = true;
979 }
980 return this;
981 }
982 }
983
984 @Override
985 public SocketChannel shutdownOutput() throws IOException {
986 synchronized (stateLock) {
987 ensureOpen();
988 if (!isConnected())
989 throw new NotYetConnectedException();
990 if (!isOutputClosed) {
991 Net.shutdown(fd, Net.SHUT_WR);
992 long thread = writerThread;
993 if (thread != 0)
994 NativeThread.signal(thread);
995 isOutputClosed = true;
996 }
997 return this;
998 }
999 }
1000
1001 boolean isInputOpen() {
1002 return !isInputClosed;
1003 }
1004
1005 boolean isOutputOpen() {
1006 return !isOutputClosed;
1007 }
1008
1009 /**
1010 * Waits for a connection attempt to finish with a timeout
1011 * @throws SocketTimeoutException if the connect timeout elapses
1012 */
1013 private boolean finishTimedConnect(long nanos) throws IOException {
1014 long startNanos = System.nanoTime();
1015 boolean polled = Net.pollConnectNow(fd);
1016 while (!polled && isOpen()) {
1017 long remainingNanos = nanos - (System.nanoTime() - startNanos);
1018 if (remainingNanos <= 0) {
1019 throw new SocketTimeoutException("Connect timed out");
1020 }
1021 park(Net.POLLOUT, remainingNanos);
1022 polled = Net.pollConnectNow(fd);
1023 }
1024 return polled && isOpen();
1025 }
1026
1027 /**
1028 * Attempts to establish a connection to the given socket address with a
1029 * timeout. Closes the socket if connection cannot be established.
1030 *
1031 * @apiNote This method is for use by the socket adaptor.
1032 *
1033 * @throws IllegalBlockingModeException if the channel is non-blocking
1034 * @throws SocketTimeoutException if the read timeout elapses
1035 */
1036 void blockingConnect(SocketAddress remote, long nanos) throws IOException {
1037 InetSocketAddress isa = checkRemote(remote);
1038 try {
1039 readLock.lock();
1040 try {
1041 writeLock.lock();
1042 try {
1043 if (!isBlocking())
1044 throw new IllegalBlockingModeException();
1045 boolean connected = false;
1046 try {
1047 beginConnect(true, isa);
1048 // change socket to non-blocking
1049 lockedConfigureBlocking(false);
1050 try {
1051 int n = Net.connect(fd, isa.getAddress(), isa.getPort());
1052 connected = (n > 0) ? true : finishTimedConnect(nanos);
1053 } finally {
1054 // restore socket to blocking mode
1055 lockedConfigureBlocking(true);
1056 }
1057 } finally {
1058 endConnect(true, connected);
1059 }
1060 } finally {
1061 writeLock.unlock();
1062 }
1063 } finally {
1064 readLock.unlock();
1065 }
1066 } catch (IOException ioe) {
1067 // connect failed, close the channel
1068 close();
1069 throw SocketExceptions.of(ioe, isa);
1070 }
1071 }
1072
1073 /**
1074 * Attempts to read bytes from the socket into the given byte array.
1075 */
1076 private int tryRead(byte[] b, int off, int len) throws IOException {
1077 ByteBuffer dst = Util.getTemporaryDirectBuffer(len);
1078 assert dst.position() == 0;
1079 try {
1080 int n = nd.read(fd, ((DirectBuffer)dst).address(), len);
1081 if (n > 0) {
1082 dst.get(b, off, n);
1083 }
1084 return n;
1085 } finally{
1086 Util.offerFirstTemporaryDirectBuffer(dst);
1087 }
1088 }
1089
1090 /**
1091 * Reads bytes from the socket into the given byte array with a timeout.
1092 * @throws SocketTimeoutException if the read timeout elapses
1093 */
1094 private int timedRead(byte[] b, int off, int len, long nanos) throws IOException {
1095 long startNanos = System.nanoTime();
1096 int n = tryRead(b, off, len);
1097 while (n == IOStatus.UNAVAILABLE && isOpen()) {
1098 long remainingNanos = nanos - (System.nanoTime() - startNanos);
1099 if (remainingNanos <= 0) {
1100 throw new SocketTimeoutException("Read timed out");
1101 }
1102 park(Net.POLLIN, remainingNanos);
1103 n = tryRead(b, off, len);
1104 }
1105 return n;
1106 }
1107
1108 /**
1109 * Reads bytes from the socket into the given byte array.
1110 *
1111 * @apiNote This method is for use by the socket adaptor.
1112 *
1113 * @throws IllegalBlockingModeException if the channel is non-blocking
1114 * @throws SocketTimeoutException if the read timeout elapses
1115 */
1116 int blockingRead(byte[] b, int off, int len, long nanos) throws IOException {
1117 Objects.checkFromIndexSize(off, len, b.length);
1118 if (len == 0) {
1119 // nothing to do
1120 return 0;
1121 }
1122
1123 readLock.lock();
1124 try {
1125 // check that channel is configured blocking
1126 if (!isBlocking())
1127 throw new IllegalBlockingModeException();
1128
1129 int n = 0;
1130 try {
1131 beginRead(true);
1132
1133 // check if connection has been reset
1134 if (connectionReset)
1135 throwConnectionReset();
1136
1137 // check if input is shutdown
1138 if (isInputClosed)
1139 return IOStatus.EOF;
1140
1141 if (nanos > 0) {
1142 // change socket to non-blocking
1143 lockedConfigureBlocking(false);
1144 try {
1145 n = timedRead(b, off, len, nanos);
1146 } finally {
1147 // restore socket to blocking mode
1148 lockedConfigureBlocking(true);
1149 }
1150 } else {
1151 // read, no timeout
1152 n = tryRead(b, off, len);
1153 while (IOStatus.okayToRetry(n) && isOpen()) {
1154 park(Net.POLLIN);
1155 n = tryRead(b, off, len);
1156 }
1157 }
1158 } catch (ConnectionResetException e) {
1159 connectionReset = true;
1160 throwConnectionReset();
1161 } finally {
1162 endRead(true, n > 0);
1163 if (n <= 0 && isInputClosed)
1164 return IOStatus.EOF;
1165 }
1166 assert n > 0 || n == -1;
1167 return n;
1168 } finally {
1169 readLock.unlock();
1170 }
1171 }
1172
1173 /**
1174 * Attempts to write a sequence of bytes to the socket from the given
1175 * byte array.
1176 */
1177 private int tryWrite(byte[] b, int off, int len) throws IOException {
1178 ByteBuffer src = Util.getTemporaryDirectBuffer(len);
1179 assert src.position() == 0;
1180 try {
1181 src.put(b, off, len);
1182 return nd.write(fd, ((DirectBuffer)src).address(), len);
1183 } finally {
1184 Util.offerFirstTemporaryDirectBuffer(src);
1185 }
1186 }
1187
1188 /**
1189 * Writes a sequence of bytes to the socket from the given byte array.
1190 *
1191 * @apiNote This method is for use by the socket adaptor.
1192 */
1193 void blockingWriteFully(byte[] b, int off, int len) throws IOException {
1194 Objects.checkFromIndexSize(off, len, b.length);
1195 if (len == 0) {
1196 // nothing to do
1197 return;
1198 }
1199
1200 writeLock.lock();
1201 try {
1202 // check that channel is configured blocking
1203 if (!isBlocking())
1204 throw new IllegalBlockingModeException();
1205
1206 // loop until all bytes have been written
1207 int pos = off;
1208 int end = off + len;
1209 beginWrite(true);
1210 try {
1211 while (pos < end && isOpen()) {
1212 int size = end - pos;
1213 int n = tryWrite(b, pos, size);
1214 while (IOStatus.okayToRetry(n) && isOpen()) {
1215 park(Net.POLLOUT);
1216 n = tryWrite(b, pos, size);
1217 }
1218 if (n > 0) {
1219 pos += n;
1220 }
1221 }
1222 } finally {
1223 endWrite(true, pos >= end);
1224 }
1225 } finally {
1226 writeLock.unlock();
1227 }
1228 }
1229
1230 /**
1231 * Return the number of bytes in the socket input buffer.
1232 */
1233 int available() throws IOException {
1234 synchronized (stateLock) {
1235 ensureOpenAndConnected();
1236 if (isInputClosed) {
1237 return 0;
1238 } else {
1239 return Net.available(fd);
1240 }
1241 }
1242 }
1243
1244 /**
1245 * Translates native poll revent ops into a ready operation ops
1246 */
1247 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
1248 int intOps = ski.nioInterestOps();
1249 int oldOps = ski.nioReadyOps();
1250 int newOps = initialOps;
1251
1252 if ((ops & Net.POLLNVAL) != 0) {
1253 // This should only happen if this channel is pre-closed while a
1254 // selection operation is in progress
1255 // ## Throw an error if this channel has not been pre-closed
1256 return false;
1257 }
1258
1259 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
1260 newOps = intOps;
1261 ski.nioReadyOps(newOps);
1262 return (newOps & ~oldOps) != 0;
1263 }
1264
1265 boolean connected = isConnected();
1266 if (((ops & Net.POLLIN) != 0) &&
1267 ((intOps & SelectionKey.OP_READ) != 0) && connected)
1268 newOps |= SelectionKey.OP_READ;
1269
1270 if (((ops & Net.POLLCONN) != 0) &&
1271 ((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending())
1272 newOps |= SelectionKey.OP_CONNECT;
1273
1274 if (((ops & Net.POLLOUT) != 0) &&
1275 ((intOps & SelectionKey.OP_WRITE) != 0) && connected)
1276 newOps |= SelectionKey.OP_WRITE;
1277
1278 ski.nioReadyOps(newOps);
1279 return (newOps & ~oldOps) != 0;
1280 }
1281
1282 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
1283 return translateReadyOps(ops, ski.nioReadyOps(), ski);
1284 }
1285
1286 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
1287 return translateReadyOps(ops, 0, ski);
1288 }
1289
1290 /**
1291 * Translates an interest operation set into a native poll event set
1292 */
1293 public int translateInterestOps(int ops) {
1294 int newOps = 0;
1295 if ((ops & SelectionKey.OP_READ) != 0)
1296 newOps |= Net.POLLIN;
1297 if ((ops & SelectionKey.OP_WRITE) != 0)
1298 newOps |= Net.POLLOUT;
1299 if ((ops & SelectionKey.OP_CONNECT) != 0)
1300 newOps |= Net.POLLCONN;
1301 return newOps;
1302 }
1303
1304 public FileDescriptor getFD() {
1305 return fd;
1306 }
1307
1308 public int getFDVal() {
1309 return fdVal;
1310 }
1311
1312 @Override
1313 public String toString() {
1314 StringBuilder sb = new StringBuilder();
1315 sb.append(this.getClass().getSuperclass().getName());
1316 sb.append('[');
1317 if (!isOpen())
1318 sb.append("closed");
1319 else {
1320 synchronized (stateLock) {
1321 switch (state) {
1322 case ST_UNCONNECTED:
1323 sb.append("unconnected");
1324 break;
1325 case ST_CONNECTIONPENDING:
1326 sb.append("connection-pending");
1327 break;
1328 case ST_CONNECTED:
1329 sb.append("connected");
1330 if (isInputClosed)
1331 sb.append(" ishut");
1332 if (isOutputClosed)
1333 sb.append(" oshut");
1334 break;
1335 }
1336 InetSocketAddress addr = localAddress();
1337 if (addr != null) {
1338 sb.append(" local=");
1339 sb.append(Net.getRevealedLocalAddressAsString(addr));
1340 }
1341 if (remoteAddress() != null) {
1342 sb.append(" remote=");
1343 sb.append(remoteAddress().toString());
1344 }
1345 }
1346 }
1347 sb.append(']');
1348 return sb.toString();
1349 }
1350 }