1 /*
2 * Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved.
3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4 *
5 * This code is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License version 2 only, as
7 * published by the Free Software Foundation. Oracle designates this
8 * particular file as subject to the "Classpath" exception as provided
9 * by Oracle in the LICENSE file that accompanied this code.
10 *
11 * This code is distributed in the hope that it will be useful, but WITHOUT
12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14 * version 2 for more details (a copy is included in the LICENSE file that
15 * accompanied this code).
16 *
17 * You should have received a copy of the GNU General Public License version
18 * 2 along with this work; if not, write to the Free Software Foundation,
19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20 *
21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22 * or visit www.oracle.com if you need additional information or have any
23 * questions.
24 */
25
26 package sun.nio.ch;
27
28 import java.io.FileDescriptor;
29 import java.io.IOException;
30 import java.net.InetAddress;
31 import java.net.InetSocketAddress;
32 import java.net.ProtocolFamily;
33 import java.net.Socket;
34 import java.net.SocketAddress;
35 import java.net.SocketException;
36 import java.net.SocketOption;
37 import java.net.SocketTimeoutException;
38 import java.net.StandardProtocolFamily;
39 import java.net.StandardSocketOptions;
40 import java.net.UnixSocketAddress;
41 import java.nio.ByteBuffer;
42 import java.nio.channels.AlreadyBoundException;
43 import java.nio.channels.AlreadyConnectedException;
44 import java.nio.channels.AsynchronousCloseException;
45 import java.nio.channels.ClosedChannelException;
46 import java.nio.channels.ConnectionPendingException;
47 import java.nio.channels.IllegalBlockingModeException;
48 import java.nio.channels.NoConnectionPendingException;
49 import java.nio.channels.NotYetConnectedException;
50 import java.nio.channels.SelectionKey;
51 import java.nio.channels.SocketChannel;
52 import java.nio.channels.spi.SelectorProvider;
53 import java.util.Collections;
54 import java.util.HashSet;
55 import java.util.Objects;
56 import java.util.Set;
57 import java.util.concurrent.locks.ReentrantLock;
58
59 import sun.net.ConnectionResetException;
60 import sun.net.NetHooks;
61 import sun.net.ext.ExtendedSocketOptions;
62 import sun.net.util.SocketExceptions;
63
64 /**
65 * An implementation of SocketChannels
66 */
67
68 class SocketChannelImpl
69 extends SocketChannel
70 implements SelChImpl
71 {
72 // Used to make native read and write calls
73 private static final NativeDispatcher nd = new SocketDispatcher();
74
75 // Our file descriptor object
76 private final FileDescriptor fd;
77 private final int fdVal;
78
79 // Lock held by current reading or connecting thread
80 private final ReentrantLock readLock = new ReentrantLock();
81
82 // Lock held by current writing or connecting thread
83 private final ReentrantLock writeLock = new ReentrantLock();
84
85 // Lock held by any thread that modifies the state fields declared below
86 // DO NOT invoke a blocking I/O operation while holding this lock!
87 private final Object stateLock = new Object();
88
89 // Input/Output closed
90 private volatile boolean isInputClosed;
91 private volatile boolean isOutputClosed;
92
93 // Connection reset protected by readLock
94 private boolean connectionReset;
95
96 // -- The following fields are protected by stateLock
97
98 // set true when exclusive binding is on and SO_REUSEADDR is emulated
99 private boolean isReuseAddress;
100
101 // State, increases monotonically
102 private static final int ST_UNCONNECTED = 0;
103 private static final int ST_CONNECTIONPENDING = 1;
104 private static final int ST_CONNECTED = 2;
105 private static final int ST_CLOSING = 3;
106 private static final int ST_CLOSED = 4;
107 private volatile int state; // need stateLock to change
108
109 // IDs of native threads doing reads and writes, for signalling
110 private long readerThread;
111 private long writerThread;
112
113 // Binding
114 private SocketAddress localAddress;
115 private SocketAddress remoteAddress;
116
117 // Socket adaptor, created on demand
118 private Socket socket;
119
120 // -- End of fields protected by stateLock
121
122 SocketChannelImpl(SelectorProvider sp, ProtocolFamily family) throws IOException {
123 super(sp);
124 this.fd = Net.socket(family, true);
125 this.fdVal = IOUtil.fdVal(fd);
126 }
127
128 // Constructor for normal connecting sockets
129 //
130 SocketChannelImpl(SelectorProvider sp) throws IOException {
131 super(sp);
132 this.fd = Net.socket(true);
133 this.fdVal = IOUtil.fdVal(fd);
134 }
135
136 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound)
137 throws IOException
138 {
139 super(sp);
140 this.fd = fd;
141 this.fdVal = IOUtil.fdVal(fd);
142 if (bound) {
143 synchronized (stateLock) {
144 this.localAddress = Net.localAddress(fd);
145 }
146 }
147 }
148
149 // Constructor for sockets obtained from server sockets
150 //
151 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, SocketAddress isa)
152 throws IOException
153 {
154 super(sp);
155 this.fd = fd;
156 this.fdVal = IOUtil.fdVal(fd);
157 synchronized (stateLock) {
158 this.localAddress = Net.localSocketAddress(fd);
159 this.remoteAddress = isa;
160 this.state = ST_CONNECTED;
161 }
162 }
163
164 /**
165 * Checks that the channel is open.
166 *
167 * @throws ClosedChannelException if channel is closed (or closing)
168 */
169 private void ensureOpen() throws ClosedChannelException {
170 if (!isOpen())
171 throw new ClosedChannelException();
172 }
173
174 /**
175 * Checks that the channel is open and connected.
176 *
177 * @apiNote This method uses the "state" field to check if the channel is
178 * open. It should never be used in conjuncion with isOpen or ensureOpen
179 * as these methods check AbstractInterruptibleChannel's closed field - that
180 * field is set before implCloseSelectableChannel is called and so before
181 * the state is changed.
182 *
183 * @throws ClosedChannelException if channel is closed (or closing)
184 * @throws NotYetConnectedException if open and not connected
185 */
186 private void ensureOpenAndConnected() throws ClosedChannelException {
187 int state = this.state;
188 if (state < ST_CONNECTED) {
189 throw new NotYetConnectedException();
190 } else if (state > ST_CONNECTED) {
191 throw new ClosedChannelException();
192 }
193 }
194
195 @Override
196 public Socket socket() {
197 synchronized (stateLock) {
198 if (socket == null)
199 socket = SocketAdaptor.create(this);
200 return socket;
201 }
202 }
203
204 @Override
205 public SocketAddress getLocalAddress() throws IOException {
206 synchronized (stateLock) {
207 ensureOpen();
208 return localAddress instanceof InetSocketAddress ? Net.getRevealedLocalAddress((InetSocketAddress) localAddress) : localAddress;
209 }
210 }
211
212 @Override
213 public SocketAddress getRemoteAddress() throws IOException {
214 synchronized (stateLock) {
215 ensureOpen();
216 return remoteAddress;
217 }
218 }
219
220 @Override
221 public <T> SocketChannel setOption(SocketOption<T> name, T value)
222 throws IOException
223 {
224 Objects.requireNonNull(name);
225 if (!supportedOptions().contains(name))
226 throw new UnsupportedOperationException("'" + name + "' not supported");
227 if (!name.type().isInstance(value))
228 throw new IllegalArgumentException("Invalid value '" + value + "'");
229
230 synchronized (stateLock) {
231 ensureOpen();
232
233 if (name == StandardSocketOptions.IP_TOS) {
234 ProtocolFamily family = Net.isIPv6Available() ?
235 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
236 Net.setSocketOption(fd, family, name, value);
237 return this;
238 }
239
240 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
241 // SO_REUSEADDR emulated when using exclusive bind
242 isReuseAddress = (Boolean)value;
243 return this;
244 }
245
246 // no options that require special handling
247 Net.setSocketOption(fd, name, value);
248 return this;
249 }
250 }
251
252 @Override
253 @SuppressWarnings("unchecked")
254 public <T> T getOption(SocketOption<T> name)
255 throws IOException
256 {
257 Objects.requireNonNull(name);
258 if (!supportedOptions().contains(name))
259 throw new UnsupportedOperationException("'" + name + "' not supported");
260
261 synchronized (stateLock) {
262 ensureOpen();
263
264 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) {
265 // SO_REUSEADDR emulated when using exclusive bind
266 return (T)Boolean.valueOf(isReuseAddress);
267 }
268
269 // special handling for IP_TOS: always return 0 when IPv6
270 if (name == StandardSocketOptions.IP_TOS) {
271 ProtocolFamily family = Net.isIPv6Available() ?
272 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET;
273 return (T) Net.getSocketOption(fd, family, name);
274 }
275
276 // no options that require special handling
277 return (T) Net.getSocketOption(fd, name);
278 }
279 }
280
281 private static class DefaultOptionsHolder {
282 static final Set<SocketOption<?>> defaultOptions = defaultOptions();
283
284 private static Set<SocketOption<?>> defaultOptions() {
285 HashSet<SocketOption<?>> set = new HashSet<>();
286 set.add(StandardSocketOptions.SO_SNDBUF);
287 set.add(StandardSocketOptions.SO_RCVBUF);
288 set.add(StandardSocketOptions.SO_KEEPALIVE);
289 set.add(StandardSocketOptions.SO_REUSEADDR);
290 if (Net.isReusePortAvailable()) {
291 set.add(StandardSocketOptions.SO_REUSEPORT);
292 }
293 set.add(StandardSocketOptions.SO_LINGER);
294 set.add(StandardSocketOptions.TCP_NODELAY);
295 // additional options required by socket adaptor
296 set.add(StandardSocketOptions.IP_TOS);
297 set.add(ExtendedSocketOption.SO_OOBINLINE);
298 set.addAll(ExtendedSocketOptions.clientSocketOptions());
299 return Collections.unmodifiableSet(set);
300 }
301 }
302
303 @Override
304 public final Set<SocketOption<?>> supportedOptions() {
305 return DefaultOptionsHolder.defaultOptions;
306 }
307
308 /**
309 * Marks the beginning of a read operation that might block.
310 *
311 * @throws ClosedChannelException if the channel is closed
312 * @throws NotYetConnectedException if the channel is not yet connected
313 */
314 private void beginRead(boolean blocking) throws ClosedChannelException {
315 if (blocking) {
316 // set hook for Thread.interrupt
317 begin();
318
319 synchronized (stateLock) {
320 ensureOpenAndConnected();
321 // record thread so it can be signalled if needed
322 readerThread = NativeThread.current();
323 }
324 } else {
325 ensureOpenAndConnected();
326 }
327 }
328
329 /**
330 * Marks the end of a read operation that may have blocked.
331 *
332 * @throws AsynchronousCloseException if the channel was closed due to this
333 * thread being interrupted on a blocking read operation.
334 */
335 private void endRead(boolean blocking, boolean completed)
336 throws AsynchronousCloseException
337 {
338 if (blocking) {
339 synchronized (stateLock) {
340 readerThread = 0;
341 if (state == ST_CLOSING) {
342 tryFinishClose();
343 }
344 }
345 // remove hook for Thread.interrupt
346 end(completed);
347 }
348 }
349
350 private void throwConnectionReset() throws SocketException {
351 throw new SocketException("Connection reset");
352 }
353
354 @Override
355 public int read(ByteBuffer buf) throws IOException {
356 Objects.requireNonNull(buf);
357
358 readLock.lock();
359 try {
360 boolean blocking = isBlocking();
361 int n = 0;
362 try {
363 beginRead(blocking);
364
365 // check if connection has been reset
366 if (connectionReset)
367 throwConnectionReset();
368
369 // check if input is shutdown
370 if (isInputClosed)
371 return IOStatus.EOF;
372
373 n = IOUtil.read(fd, buf, -1, nd);
374 if (blocking) {
375 while (IOStatus.okayToRetry(n) && isOpen()) {
376 park(Net.POLLIN);
377 n = IOUtil.read(fd, buf, -1, nd);
378 }
379 }
380 } catch (ConnectionResetException e) {
381 connectionReset = true;
382 throwConnectionReset();
383 } finally {
384 endRead(blocking, n > 0);
385 if (n <= 0 && isInputClosed)
386 return IOStatus.EOF;
387 }
388 return IOStatus.normalize(n);
389 } finally {
390 readLock.unlock();
391 }
392 }
393
394 @Override
395 public long read(ByteBuffer[] dsts, int offset, int length)
396 throws IOException
397 {
398 Objects.checkFromIndexSize(offset, length, dsts.length);
399
400 readLock.lock();
401 try {
402 boolean blocking = isBlocking();
403 long n = 0;
404 try {
405 beginRead(blocking);
406
407 // check if connection has been reset
408 if (connectionReset)
409 throwConnectionReset();
410
411 // check if input is shutdown
412 if (isInputClosed)
413 return IOStatus.EOF;
414
415 n = IOUtil.read(fd, dsts, offset, length, nd);
416 if (blocking) {
417 while (IOStatus.okayToRetry(n) && isOpen()) {
418 park(Net.POLLIN);
419 n = IOUtil.read(fd, dsts, offset, length, nd);
420 }
421 }
422 } catch (ConnectionResetException e) {
423 connectionReset = true;
424 throwConnectionReset();
425 } finally {
426 endRead(blocking, n > 0);
427 if (n <= 0 && isInputClosed)
428 return IOStatus.EOF;
429 }
430 return IOStatus.normalize(n);
431 } finally {
432 readLock.unlock();
433 }
434 }
435
436 /**
437 * Marks the beginning of a write operation that might block.
438 *
439 * @throws ClosedChannelException if the channel is closed or output shutdown
440 * @throws NotYetConnectedException if the channel is not yet connected
441 */
442 private void beginWrite(boolean blocking) throws ClosedChannelException {
443 if (blocking) {
444 // set hook for Thread.interrupt
445 begin();
446
447 synchronized (stateLock) {
448 ensureOpenAndConnected();
449 if (isOutputClosed)
450 throw new ClosedChannelException();
451 // record thread so it can be signalled if needed
452 writerThread = NativeThread.current();
453 }
454 } else {
455 ensureOpenAndConnected();
456 }
457 }
458
459 /**
460 * Marks the end of a write operation that may have blocked.
461 *
462 * @throws AsynchronousCloseException if the channel was closed due to this
463 * thread being interrupted on a blocking write operation.
464 */
465 private void endWrite(boolean blocking, boolean completed)
466 throws AsynchronousCloseException
467 {
468 if (blocking) {
469 synchronized (stateLock) {
470 writerThread = 0;
471 if (state == ST_CLOSING) {
472 tryFinishClose();
473 }
474 }
475 // remove hook for Thread.interrupt
476 end(completed);
477 }
478 }
479
480 @Override
481 public int write(ByteBuffer buf) throws IOException {
482 Objects.requireNonNull(buf);
483
484 writeLock.lock();
485 try {
486 boolean blocking = isBlocking();
487 int n = 0;
488 try {
489 beginWrite(blocking);
490 n = IOUtil.write(fd, buf, -1, nd);
491 if (blocking) {
492 while (IOStatus.okayToRetry(n) && isOpen()) {
493 park(Net.POLLOUT);
494 n = IOUtil.write(fd, buf, -1, nd);
495 }
496 }
497 } finally {
498 endWrite(blocking, n > 0);
499 if (n <= 0 && isOutputClosed)
500 throw new AsynchronousCloseException();
501 }
502 return IOStatus.normalize(n);
503 } finally {
504 writeLock.unlock();
505 }
506 }
507
508 @Override
509 public long write(ByteBuffer[] srcs, int offset, int length)
510 throws IOException
511 {
512 Objects.checkFromIndexSize(offset, length, srcs.length);
513
514 writeLock.lock();
515 try {
516 boolean blocking = isBlocking();
517 long n = 0;
518 try {
519 beginWrite(blocking);
520 n = IOUtil.write(fd, srcs, offset, length, nd);
521 if (blocking) {
522 while (IOStatus.okayToRetry(n) && isOpen()) {
523 park(Net.POLLOUT);
524 n = IOUtil.write(fd, srcs, offset, length, nd);
525 }
526 }
527 } finally {
528 endWrite(blocking, n > 0);
529 if (n <= 0 && isOutputClosed)
530 throw new AsynchronousCloseException();
531 }
532 return IOStatus.normalize(n);
533 } finally {
534 writeLock.unlock();
535 }
536 }
537
538 /**
539 * Writes a byte of out of band data.
540 */
541 int sendOutOfBandData(byte b) throws IOException {
542 writeLock.lock();
543 try {
544 boolean blocking = isBlocking();
545 int n = 0;
546 try {
547 beginWrite(blocking);
548 if (blocking) {
549 do {
550 n = Net.sendOOB(fd, b);
551 } while (n == IOStatus.INTERRUPTED && isOpen());
552 } else {
553 n = Net.sendOOB(fd, b);
554 }
555 } finally {
556 endWrite(blocking, n > 0);
557 if (n <= 0 && isOutputClosed)
558 throw new AsynchronousCloseException();
559 }
560 return IOStatus.normalize(n);
561 } finally {
562 writeLock.unlock();
563 }
564 }
565
566 @Override
567 protected void implConfigureBlocking(boolean block) throws IOException {
568 readLock.lock();
569 try {
570 writeLock.lock();
571 try {
572 lockedConfigureBlocking(block);
573 } finally {
574 writeLock.unlock();
575 }
576 } finally {
577 readLock.unlock();
578 }
579 }
580
581 /**
582 * Adjust the blocking mode while holding the readLock or writeLock.
583 */
584 private void lockedConfigureBlocking(boolean block) throws IOException {
585 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread();
586 synchronized (stateLock) {
587 ensureOpen();
588 IOUtil.configureBlocking(fd, block);
589 }
590 }
591
592 /**
593 * Returns the local address, or null if not bound
594 */
595 InetSocketAddress localAddress() {
596 synchronized (stateLock) {
597 return localAddress instanceof InetSocketAddress ? (InetSocketAddress) localAddress : null;
598 }
599 }
600
601 /**
602 * Returns the remote address, or null if not connected
603 */
604 InetSocketAddress remoteAddress() {
605 synchronized (stateLock) {
606 return remoteAddress instanceof InetSocketAddress ? (InetSocketAddress) remoteAddress : null;
607 }
608 }
609
610 @Override
611 public SocketChannel bind(SocketAddress local) throws IOException {
612 readLock.lock();
613 try {
614 writeLock.lock();
615 try {
616 synchronized (stateLock) {
617 ensureOpen();
618 if (state == ST_CONNECTIONPENDING)
619 throw new ConnectionPendingException();
620 if (localAddress != null)
621 throw new AlreadyBoundException();
622 InetSocketAddress isa = (local == null) ?
623 new InetSocketAddress(0) : Net.checkAddress(local);
624 SecurityManager sm = System.getSecurityManager();
625 if (sm != null) {
626 sm.checkListen(isa.getPort());
627 }
628 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort());
629 Net.bind(fd, isa.getAddress(), isa.getPort());
630 localAddress = Net.localAddress(fd);
631 }
632 } finally {
633 writeLock.unlock();
634 }
635 } finally {
636 readLock.unlock();
637 }
638 return this;
639 }
640
641 @Override
642 public boolean isConnected() {
643 return (state == ST_CONNECTED);
644 }
645
646 @Override
647 public boolean isConnectionPending() {
648 return (state == ST_CONNECTIONPENDING);
649 }
650
651 /**
652 * Marks the beginning of a connect operation that might block.
653 * @param blocking true if configured blocking
654 * @param isa the remote address
655 * @throws ClosedChannelException if the channel is closed
656 * @throws AlreadyConnectedException if already connected
657 * @throws ConnectionPendingException is a connection is pending
658 * @throws IOException if the pre-connect hook fails
659 */
660 private void beginConnect(boolean blocking, InetSocketAddress isa)
661 throws IOException
662 {
663 if (blocking) {
664 // set hook for Thread.interrupt
665 begin();
666 }
667 synchronized (stateLock) {
668 ensureOpen();
669 int state = this.state;
670 if (state == ST_CONNECTED)
671 throw new AlreadyConnectedException();
672 if (state == ST_CONNECTIONPENDING)
673 throw new ConnectionPendingException();
674 assert state == ST_UNCONNECTED;
675 this.state = ST_CONNECTIONPENDING;
676
677 if (localAddress == null)
678 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
679 remoteAddress = isa;
680
681 if (blocking) {
682 // record thread so it can be signalled if needed
683 readerThread = NativeThread.current();
684 }
685 }
686 }
687
688 /**
689 * TODO: refactoring, combine with InetSocketAddress method?
690 *
691 * @param blocking
692 * @param usa
693 * @throws IOException
694 */
695 private void beginConnect(boolean blocking, UnixSocketAddress usa)
696 throws IOException
697 {
698 if (blocking) {
699 // set hook for Thread.interrupt
700 begin();
701 }
702 synchronized (stateLock) {
703 ensureOpen();
704 int state = this.state;
705 if (state == ST_CONNECTED)
706 throw new AlreadyConnectedException();
707 if (state == ST_CONNECTIONPENDING)
708 throw new ConnectionPendingException();
709 assert state == ST_UNCONNECTED;
710 this.state = ST_CONNECTIONPENDING;
711
712 remoteAddress = usa;
713
714 if (blocking) {
715 // record thread so it can be signalled if needed
716 readerThread = NativeThread.current();
717 }
718 }
719 }
720
721 /**
722 * Marks the end of a connect operation that may have blocked.
723 *
724 * @throws AsynchronousCloseException if the channel was closed due to this
725 * thread being interrupted on a blocking connect operation.
726 * @throws IOException if completed and unable to obtain the local address
727 */
728 private void endConnect(boolean blocking, boolean completed)
729 throws IOException
730 {
731 endRead(blocking, completed);
732
733 if (completed) {
734 synchronized (stateLock) {
735 if (state == ST_CONNECTIONPENDING) {
736 localAddress = Net.localAddress(fd);
737 state = ST_CONNECTED;
738 }
739 }
740 }
741 }
742
743 /**
744 * Checks the remote address to which this channel is to be connected.
745 */
746 private InetSocketAddress checkRemote(SocketAddress sa) throws IOException {
747 if (sa instanceof UnixSocketAddress) {
748 // TODO: SecurityManager, refactor null
749 return null;
750 }
751
752 InetSocketAddress isa = Net.checkAddress(sa);
753 SecurityManager sm = System.getSecurityManager();
754 if (sm != null) {
755 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
756 }
757 if (isa.getAddress().isAnyLocalAddress()) {
758 return new InetSocketAddress(InetAddress.getLocalHost(), isa.getPort());
759 } else {
760 return isa;
761 }
762 }
763
764 @Override
765 public boolean connect(SocketAddress remote) throws IOException {
766 InetSocketAddress isa = checkRemote(remote);
767 try {
768 readLock.lock();
769 try {
770 writeLock.lock();
771 try {
772 boolean blocking = isBlocking();
773 boolean connected = false;
774 try {
775 int n;
776 if (remote instanceof UnixSocketAddress) {
777 UnixSocketAddress usa = (UnixSocketAddress)remote;
778 beginConnect(blocking, usa);
779 n = Net.connect(fd, usa);
780 } else {
781 beginConnect(blocking, isa);
782 n = Net.connect(fd, isa.getAddress(), isa.getPort());
783 }
784 if (n > 0) {
785 connected = true;
786 } else if (blocking) {
787 assert IOStatus.okayToRetry(n);
788 boolean polled = false;
789 while (!polled && isOpen()) {
790 park(Net.POLLOUT);
791 polled = Net.pollConnectNow(fd);
792 }
793 connected = polled && isOpen();
794 }
795 } finally {
796 endConnect(blocking, connected);
797 }
798 return connected;
799 } finally {
800 writeLock.unlock();
801 }
802 } finally {
803 readLock.unlock();
804 }
805 } catch (IOException ioe) {
806 // connect failed, close the channel
807 close();
808 throw SocketExceptions.of(ioe, remote);
809 }
810 }
811
812 /**
813 * Marks the beginning of a finishConnect operation that might block.
814 *
815 * @throws ClosedChannelException if the channel is closed
816 * @throws NoConnectionPendingException if no connection is pending
817 */
818 private void beginFinishConnect(boolean blocking) throws ClosedChannelException {
819 if (blocking) {
820 // set hook for Thread.interrupt
821 begin();
822 }
823 synchronized (stateLock) {
824 ensureOpen();
825 if (state != ST_CONNECTIONPENDING)
826 throw new NoConnectionPendingException();
827 if (blocking) {
828 // record thread so it can be signalled if needed
829 readerThread = NativeThread.current();
830 }
831 }
832 }
833
834 /**
835 * Marks the end of a finishConnect operation that may have blocked.
836 *
837 * @throws AsynchronousCloseException if the channel was closed due to this
838 * thread being interrupted on a blocking connect operation.
839 * @throws IOException if completed and unable to obtain the local address
840 */
841 private void endFinishConnect(boolean blocking, boolean completed)
842 throws IOException
843 {
844 endRead(blocking, completed);
845
846 if (completed) {
847 synchronized (stateLock) {
848 if (state == ST_CONNECTIONPENDING) {
849 localAddress = Net.localAddress(fd);
850 state = ST_CONNECTED;
851 }
852 }
853 }
854 }
855
856 @Override
857 public boolean finishConnect() throws IOException {
858 try {
859 readLock.lock();
860 try {
861 writeLock.lock();
862 try {
863 // no-op if already connected
864 if (isConnected())
865 return true;
866
867 boolean blocking = isBlocking();
868 boolean connected = false;
869 try {
870 beginFinishConnect(blocking);
871 boolean polled = Net.pollConnectNow(fd);
872 if (blocking) {
873 while (!polled && isOpen()) {
874 park(Net.POLLOUT);
875 polled = Net.pollConnectNow(fd);
876 }
877 }
878 connected = polled && isOpen();
879 } finally {
880 endFinishConnect(blocking, connected);
881 }
882 assert (blocking && connected) ^ !blocking;
883 return connected;
884 } finally {
885 writeLock.unlock();
886 }
887 } finally {
888 readLock.unlock();
889 }
890 } catch (IOException ioe) {
891 // connect failed, close the channel
892 close();
893 throw SocketExceptions.of(ioe, remoteAddress);
894 }
895 }
896
897 /**
898 * Closes the socket if there are no I/O operations in progress and the
899 * channel is not registered with a Selector.
900 */
901 private boolean tryClose() throws IOException {
902 assert Thread.holdsLock(stateLock) && state == ST_CLOSING;
903 if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) {
904 state = ST_CLOSED;
905 nd.close(fd);
906 return true;
907 } else {
908 return false;
909 }
910 }
911
912 /**
913 * Invokes tryClose to attempt to close the socket.
914 *
915 * This method is used for deferred closing by I/O and Selector operations.
916 */
917 private void tryFinishClose() {
918 try {
919 tryClose();
920 } catch (IOException ignore) { }
921 }
922
923 /**
924 * Closes this channel when configured in blocking mode.
925 *
926 * If there is an I/O operation in progress then the socket is pre-closed
927 * and the I/O threads signalled, in which case the final close is deferred
928 * until all I/O operations complete.
929 *
930 * Note that a channel configured blocking may be registered with a Selector
931 * This arises when a key is canceled and the channel configured to blocking
932 * mode before the key is flushed from the Selector.
933 */
934 private void implCloseBlockingMode() throws IOException {
935 synchronized (stateLock) {
936 assert state < ST_CLOSING;
937 state = ST_CLOSING;
938 if (!tryClose()) {
939 long reader = readerThread;
940 long writer = writerThread;
941 if (reader != 0 || writer != 0) {
942 nd.preClose(fd);
943 if (reader != 0)
944 NativeThread.signal(reader);
945 if (writer != 0)
946 NativeThread.signal(writer);
947 }
948 }
949 }
950 }
951
952 /**
953 * Closes this channel when configured in non-blocking mode.
954 *
955 * If the channel is registered with a Selector then the close is deferred
956 * until the channel is flushed from all Selectors.
957 *
958 * If the socket is connected and the channel is registered with a Selector
959 * then the socket is shutdown for writing so that the peer reads EOF. In
960 * addition, if SO_LINGER is set to a non-zero value then it is disabled so
961 * that the deferred close does not wait.
962 */
963 private void implCloseNonBlockingMode() throws IOException {
964 boolean connected;
965 synchronized (stateLock) {
966 assert state < ST_CLOSING;
967 connected = (state == ST_CONNECTED);
968 state = ST_CLOSING;
969 }
970
971 // wait for any read/write operations to complete
972 readLock.lock();
973 readLock.unlock();
974 writeLock.lock();
975 writeLock.unlock();
976
977 // if the socket cannot be closed because it's registered with a Selector
978 // then shutdown the socket for writing.
979 synchronized (stateLock) {
980 if (state == ST_CLOSING && !tryClose() && connected && isRegistered()) {
981 try {
982 SocketOption<Integer> opt = StandardSocketOptions.SO_LINGER;
983 int interval = (int) Net.getSocketOption(fd, Net.UNSPEC, opt);
984 if (interval != 0) {
985 if (interval > 0) {
986 // disable SO_LINGER
987 Net.setSocketOption(fd, Net.UNSPEC, opt, -1);
988 }
989 Net.shutdown(fd, Net.SHUT_WR);
990 }
991 } catch (IOException ignore) { }
992 }
993 }
994 }
995
996 /**
997 * Invoked by implCloseChannel to close the channel.
998 */
999 @Override
1000 protected void implCloseSelectableChannel() throws IOException {
1001 assert !isOpen();
1002 if (isBlocking()) {
1003 implCloseBlockingMode();
1004 } else {
1005 implCloseNonBlockingMode();
1006 }
1007 }
1008
1009 @Override
1010 public void kill() {
1011 synchronized (stateLock) {
1012 if (state == ST_CLOSING) {
1013 tryFinishClose();
1014 }
1015 }
1016 }
1017
1018 @Override
1019 public SocketChannel shutdownInput() throws IOException {
1020 synchronized (stateLock) {
1021 ensureOpen();
1022 if (!isConnected())
1023 throw new NotYetConnectedException();
1024 if (!isInputClosed) {
1025 Net.shutdown(fd, Net.SHUT_RD);
1026 long thread = readerThread;
1027 if (thread != 0)
1028 NativeThread.signal(thread);
1029 isInputClosed = true;
1030 }
1031 return this;
1032 }
1033 }
1034
1035 @Override
1036 public SocketChannel shutdownOutput() throws IOException {
1037 synchronized (stateLock) {
1038 ensureOpen();
1039 if (!isConnected())
1040 throw new NotYetConnectedException();
1041 if (!isOutputClosed) {
1042 Net.shutdown(fd, Net.SHUT_WR);
1043 long thread = writerThread;
1044 if (thread != 0)
1045 NativeThread.signal(thread);
1046 isOutputClosed = true;
1047 }
1048 return this;
1049 }
1050 }
1051
1052 boolean isInputOpen() {
1053 return !isInputClosed;
1054 }
1055
1056 boolean isOutputOpen() {
1057 return !isOutputClosed;
1058 }
1059
1060 /**
1061 * Waits for a connection attempt to finish with a timeout
1062 * @throws SocketTimeoutException if the connect timeout elapses
1063 */
1064 private boolean finishTimedConnect(long nanos) throws IOException {
1065 long startNanos = System.nanoTime();
1066 boolean polled = Net.pollConnectNow(fd);
1067 while (!polled && isOpen()) {
1068 long remainingNanos = nanos - (System.nanoTime() - startNanos);
1069 if (remainingNanos <= 0) {
1070 throw new SocketTimeoutException("Connect timed out");
1071 }
1072 park(Net.POLLOUT, remainingNanos);
1073 polled = Net.pollConnectNow(fd);
1074 }
1075 return polled && isOpen();
1076 }
1077
1078 /**
1079 * Attempts to establish a connection to the given socket address with a
1080 * timeout. Closes the socket if connection cannot be established.
1081 *
1082 * @apiNote This method is for use by the socket adaptor.
1083 *
1084 * @throws IllegalBlockingModeException if the channel is non-blocking
1085 * @throws SocketTimeoutException if the read timeout elapses
1086 */
1087 void blockingConnect(SocketAddress remote, long nanos) throws IOException {
1088 InetSocketAddress isa = checkRemote(remote);
1089 try {
1090 readLock.lock();
1091 try {
1092 writeLock.lock();
1093 try {
1094 if (!isBlocking())
1095 throw new IllegalBlockingModeException();
1096 boolean connected = false;
1097 try {
1098 if (remote instanceof UnixSocketAddress) {
1099 UnixSocketAddress usa = (UnixSocketAddress)remote;
1100 beginConnect(true, usa);
1101 } else {
1102 beginConnect(true, isa);
1103 }
1104 // change socket to non-blocking
1105 lockedConfigureBlocking(false);
1106 try {
1107 int n;
1108 if (remote instanceof UnixSocketAddress) {
1109 n = Net.connect(fd, (UnixSocketAddress) remote); // TODO: refactor null/remote/isa
1110 } else {
1111 n = Net.connect(fd, isa.getAddress(), isa.getPort());
1112 }
1113 connected = (n > 0) ? true : finishTimedConnect(nanos);
1114 } finally {
1115 // restore socket to blocking mode
1116 lockedConfigureBlocking(true);
1117 }
1118 } finally {
1119 endConnect(true, connected);
1120 }
1121 } finally {
1122 writeLock.unlock();
1123 }
1124 } finally {
1125 readLock.unlock();
1126 }
1127 } catch (IOException ioe) {
1128 // connect failed, close the channel
1129 close();
1130 throw SocketExceptions.of(ioe, isa);
1131 }
1132 }
1133
1134 /**
1135 * Attempts to read bytes from the socket into the given byte array.
1136 */
1137 private int tryRead(byte[] b, int off, int len) throws IOException {
1138 ByteBuffer dst = Util.getTemporaryDirectBuffer(len);
1139 assert dst.position() == 0;
1140 try {
1141 int n = nd.read(fd, ((DirectBuffer)dst).address(), len);
1142 if (n > 0) {
1143 dst.get(b, off, n);
1144 }
1145 return n;
1146 } finally{
1147 Util.offerFirstTemporaryDirectBuffer(dst);
1148 }
1149 }
1150
1151 /**
1152 * Reads bytes from the socket into the given byte array with a timeout.
1153 * @throws SocketTimeoutException if the read timeout elapses
1154 */
1155 private int timedRead(byte[] b, int off, int len, long nanos) throws IOException {
1156 long startNanos = System.nanoTime();
1157 int n = tryRead(b, off, len);
1158 while (n == IOStatus.UNAVAILABLE && isOpen()) {
1159 long remainingNanos = nanos - (System.nanoTime() - startNanos);
1160 if (remainingNanos <= 0) {
1161 throw new SocketTimeoutException("Read timed out");
1162 }
1163 park(Net.POLLIN, remainingNanos);
1164 n = tryRead(b, off, len);
1165 }
1166 return n;
1167 }
1168
1169 /**
1170 * Reads bytes from the socket into the given byte array.
1171 *
1172 * @apiNote This method is for use by the socket adaptor.
1173 *
1174 * @throws IllegalBlockingModeException if the channel is non-blocking
1175 * @throws SocketTimeoutException if the read timeout elapses
1176 */
1177 int blockingRead(byte[] b, int off, int len, long nanos) throws IOException {
1178 Objects.checkFromIndexSize(off, len, b.length);
1179 if (len == 0) {
1180 // nothing to do
1181 return 0;
1182 }
1183
1184 readLock.lock();
1185 try {
1186 // check that channel is configured blocking
1187 if (!isBlocking())
1188 throw new IllegalBlockingModeException();
1189
1190 int n = 0;
1191 try {
1192 beginRead(true);
1193
1194 // check if connection has been reset
1195 if (connectionReset)
1196 throwConnectionReset();
1197
1198 // check if input is shutdown
1199 if (isInputClosed)
1200 return IOStatus.EOF;
1201
1202 if (nanos > 0) {
1203 // change socket to non-blocking
1204 lockedConfigureBlocking(false);
1205 try {
1206 n = timedRead(b, off, len, nanos);
1207 } finally {
1208 // restore socket to blocking mode
1209 lockedConfigureBlocking(true);
1210 }
1211 } else {
1212 // read, no timeout
1213 n = tryRead(b, off, len);
1214 while (IOStatus.okayToRetry(n) && isOpen()) {
1215 park(Net.POLLIN);
1216 n = tryRead(b, off, len);
1217 }
1218 }
1219 } catch (ConnectionResetException e) {
1220 connectionReset = true;
1221 throwConnectionReset();
1222 } finally {
1223 endRead(true, n > 0);
1224 if (n <= 0 && isInputClosed)
1225 return IOStatus.EOF;
1226 }
1227 assert n > 0 || n == -1;
1228 return n;
1229 } finally {
1230 readLock.unlock();
1231 }
1232 }
1233
1234 /**
1235 * Attempts to write a sequence of bytes to the socket from the given
1236 * byte array.
1237 */
1238 private int tryWrite(byte[] b, int off, int len) throws IOException {
1239 ByteBuffer src = Util.getTemporaryDirectBuffer(len);
1240 assert src.position() == 0;
1241 try {
1242 src.put(b, off, len);
1243 return nd.write(fd, ((DirectBuffer)src).address(), len);
1244 } finally {
1245 Util.offerFirstTemporaryDirectBuffer(src);
1246 }
1247 }
1248
1249 /**
1250 * Writes a sequence of bytes to the socket from the given byte array.
1251 *
1252 * @apiNote This method is for use by the socket adaptor.
1253 */
1254 void blockingWriteFully(byte[] b, int off, int len) throws IOException {
1255 Objects.checkFromIndexSize(off, len, b.length);
1256 if (len == 0) {
1257 // nothing to do
1258 return;
1259 }
1260
1261 writeLock.lock();
1262 try {
1263 // check that channel is configured blocking
1264 if (!isBlocking())
1265 throw new IllegalBlockingModeException();
1266
1267 // loop until all bytes have been written
1268 int pos = off;
1269 int end = off + len;
1270 beginWrite(true);
1271 try {
1272 while (pos < end && isOpen()) {
1273 int size = end - pos;
1274 int n = tryWrite(b, pos, size);
1275 while (IOStatus.okayToRetry(n) && isOpen()) {
1276 park(Net.POLLOUT);
1277 n = tryWrite(b, pos, size);
1278 }
1279 if (n > 0) {
1280 pos += n;
1281 }
1282 }
1283 } finally {
1284 endWrite(true, pos >= end);
1285 }
1286 } finally {
1287 writeLock.unlock();
1288 }
1289 }
1290
1291 /**
1292 * Return the number of bytes in the socket input buffer.
1293 */
1294 int available() throws IOException {
1295 synchronized (stateLock) {
1296 ensureOpenAndConnected();
1297 if (isInputClosed) {
1298 return 0;
1299 } else {
1300 return Net.available(fd);
1301 }
1302 }
1303 }
1304
1305 /**
1306 * Translates native poll revent ops into a ready operation ops
1307 */
1308 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {
1309 int intOps = ski.nioInterestOps();
1310 int oldOps = ski.nioReadyOps();
1311 int newOps = initialOps;
1312
1313 if ((ops & Net.POLLNVAL) != 0) {
1314 // This should only happen if this channel is pre-closed while a
1315 // selection operation is in progress
1316 // ## Throw an error if this channel has not been pre-closed
1317 return false;
1318 }
1319
1320 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {
1321 newOps = intOps;
1322 ski.nioReadyOps(newOps);
1323 return (newOps & ~oldOps) != 0;
1324 }
1325
1326 boolean connected = isConnected();
1327 if (((ops & Net.POLLIN) != 0) &&
1328 ((intOps & SelectionKey.OP_READ) != 0) && connected)
1329 newOps |= SelectionKey.OP_READ;
1330
1331 if (((ops & Net.POLLCONN) != 0) &&
1332 ((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending())
1333 newOps |= SelectionKey.OP_CONNECT;
1334
1335 if (((ops & Net.POLLOUT) != 0) &&
1336 ((intOps & SelectionKey.OP_WRITE) != 0) && connected)
1337 newOps |= SelectionKey.OP_WRITE;
1338
1339 ski.nioReadyOps(newOps);
1340 return (newOps & ~oldOps) != 0;
1341 }
1342
1343 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) {
1344 return translateReadyOps(ops, ski.nioReadyOps(), ski);
1345 }
1346
1347 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {
1348 return translateReadyOps(ops, 0, ski);
1349 }
1350
1351 /**
1352 * Translates an interest operation set into a native poll event set
1353 */
1354 public int translateInterestOps(int ops) {
1355 int newOps = 0;
1356 if ((ops & SelectionKey.OP_READ) != 0)
1357 newOps |= Net.POLLIN;
1358 if ((ops & SelectionKey.OP_WRITE) != 0)
1359 newOps |= Net.POLLOUT;
1360 if ((ops & SelectionKey.OP_CONNECT) != 0)
1361 newOps |= Net.POLLCONN;
1362 return newOps;
1363 }
1364
1365 public FileDescriptor getFD() {
1366 return fd;
1367 }
1368
1369 public int getFDVal() {
1370 return fdVal;
1371 }
1372
1373 @Override
1374 public String toString() {
1375 StringBuilder sb = new StringBuilder();
1376 sb.append(this.getClass().getSuperclass().getName());
1377 sb.append('[');
1378 if (!isOpen())
1379 sb.append("closed");
1380 else {
1381 synchronized (stateLock) {
1382 switch (state) {
1383 case ST_UNCONNECTED:
1384 sb.append("unconnected");
1385 break;
1386 case ST_CONNECTIONPENDING:
1387 sb.append("connection-pending");
1388 break;
1389 case ST_CONNECTED:
1390 sb.append("connected");
1391 if (isInputClosed)
1392 sb.append(" ishut");
1393 if (isOutputClosed)
1394 sb.append(" oshut");
1395 break;
1396 }
1397 InetSocketAddress addr = localAddress();
1398 if (addr != null) {
1399 sb.append(" local=");
1400 sb.append(Net.getRevealedLocalAddressAsString(addr));
1401 }
1402 if (remoteAddress() != null) {
1403 sb.append(" remote=");
1404 sb.append(remoteAddress().toString());
1405 }
1406 }
1407 }
1408 sb.append(']');
1409 return sb.toString();
1410 }
1411 }