1 /* 2 * Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.net.InetAddress; 31 import java.net.InetSocketAddress; 32 import java.net.ProtocolFamily; 33 import java.net.Socket; 34 import java.net.SocketAddress; 35 import java.net.SocketException; 36 import java.net.SocketOption; 37 import java.net.SocketTimeoutException; 38 import java.net.StandardProtocolFamily; 39 import java.net.StandardSocketOptions; 40 import java.nio.ByteBuffer; 41 import java.nio.channels.AlreadyBoundException; 42 import java.nio.channels.AlreadyConnectedException; 43 import java.nio.channels.AsynchronousCloseException; 44 import java.nio.channels.ClosedChannelException; 45 import java.nio.channels.ConnectionPendingException; 46 import java.nio.channels.IllegalBlockingModeException; 47 import java.nio.channels.NoConnectionPendingException; 48 import java.nio.channels.NotYetConnectedException; 49 import java.nio.channels.SelectionKey; 50 import java.nio.channels.SocketChannel; 51 import java.nio.channels.spi.SelectorProvider; 52 import java.util.Collections; 53 import java.util.HashSet; 54 import java.util.Objects; 55 import java.util.Set; 56 import java.util.concurrent.locks.ReentrantLock; 57 58 import sun.net.ConnectionResetException; 59 import sun.net.NetHooks; 60 import sun.net.ext.ExtendedSocketOptions; 61 import sun.net.util.SocketExceptions; 62 63 /** 64 * An implementation of SocketChannels 65 */ 66 67 class SocketChannelImpl 68 extends SocketChannel 69 implements SelChImpl 70 { 71 // Used to make native read and write calls 72 private static final NativeDispatcher nd = new SocketDispatcher(); 73 74 // Our file descriptor object 75 private final FileDescriptor fd; 76 private final int fdVal; 77 78 // Lock held by current reading or connecting thread 79 private final ReentrantLock readLock = new ReentrantLock(); 80 81 // Lock held by current writing or connecting thread 82 private final ReentrantLock writeLock = new ReentrantLock(); 83 84 // Lock held by any thread that modifies the state fields declared below 85 // DO NOT invoke a blocking I/O operation while holding this lock! 86 private final Object stateLock = new Object(); 87 88 // Input/Output closed 89 private volatile boolean isInputClosed; 90 private volatile boolean isOutputClosed; 91 92 // Connection reset protected by readLock 93 private boolean connectionReset; 94 95 // -- The following fields are protected by stateLock 96 97 // set true when exclusive binding is on and SO_REUSEADDR is emulated 98 private boolean isReuseAddress; 99 100 // State, increases monotonically 101 private static final int ST_UNCONNECTED = 0; 102 private static final int ST_CONNECTIONPENDING = 1; 103 private static final int ST_CONNECTED = 2; 104 private static final int ST_CLOSING = 3; 105 private static final int ST_CLOSED = 4; 106 private volatile int state; // need stateLock to change 107 108 // IDs of native threads doing reads and writes, for signalling 109 private long readerThread; 110 private long writerThread; 111 112 // Binding 113 private InetSocketAddress localAddress; 114 private InetSocketAddress remoteAddress; 115 116 // Socket adaptor, created on demand 117 private Socket socket; 118 119 // -- End of fields protected by stateLock 120 121 122 // Constructor for normal connecting sockets 123 // 124 SocketChannelImpl(SelectorProvider sp) throws IOException { 125 super(sp); 126 this.fd = Net.socket(true); 127 this.fdVal = IOUtil.fdVal(fd); 128 } 129 130 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) 131 throws IOException 132 { 133 super(sp); 134 this.fd = fd; 135 this.fdVal = IOUtil.fdVal(fd); 136 if (bound) { 137 synchronized (stateLock) { 138 this.localAddress = Net.localAddress(fd); 139 } 140 } 141 } 142 143 // Constructor for sockets obtained from server sockets 144 // 145 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, InetSocketAddress isa) 146 throws IOException 147 { 148 super(sp); 149 this.fd = fd; 150 this.fdVal = IOUtil.fdVal(fd); 151 synchronized (stateLock) { 152 this.localAddress = Net.localAddress(fd); 153 this.remoteAddress = isa; 154 this.state = ST_CONNECTED; 155 } 156 } 157 158 /** 159 * Checks that the channel is open. 160 * 161 * @throws ClosedChannelException if channel is closed (or closing) 162 */ 163 private void ensureOpen() throws ClosedChannelException { 164 if (!isOpen()) 165 throw new ClosedChannelException(); 166 } 167 168 /** 169 * Checks that the channel is open and connected. 170 * 171 * @apiNote This method uses the "state" field to check if the channel is 172 * open. It should never be used in conjuncion with isOpen or ensureOpen 173 * as these methods check AbstractInterruptibleChannel's closed field - that 174 * field is set before implCloseSelectableChannel is called and so before 175 * the state is changed. 176 * 177 * @throws ClosedChannelException if channel is closed (or closing) 178 * @throws NotYetConnectedException if open and not connected 179 */ 180 private void ensureOpenAndConnected() throws ClosedChannelException { 181 int state = this.state; 182 if (state < ST_CONNECTED) { 183 throw new NotYetConnectedException(); 184 } else if (state > ST_CONNECTED) { 185 throw new ClosedChannelException(); 186 } 187 } 188 189 @Override 190 public Socket socket() { 191 synchronized (stateLock) { 192 if (socket == null) 193 socket = SocketAdaptor.create(this); 194 return socket; 195 } 196 } 197 198 @Override 199 public SocketAddress getLocalAddress() throws IOException { 200 synchronized (stateLock) { 201 ensureOpen(); 202 return Net.getRevealedLocalAddress(localAddress); 203 } 204 } 205 206 @Override 207 public SocketAddress getRemoteAddress() throws IOException { 208 synchronized (stateLock) { 209 ensureOpen(); 210 return remoteAddress; 211 } 212 } 213 214 @Override 215 public <T> SocketChannel setOption(SocketOption<T> name, T value) 216 throws IOException 217 { 218 Objects.requireNonNull(name); 219 if (!supportedOptions().contains(name)) 220 throw new UnsupportedOperationException("'" + name + "' not supported"); 221 if (!name.type().isInstance(value)) 222 throw new IllegalArgumentException("Invalid value '" + value + "'"); 223 224 synchronized (stateLock) { 225 ensureOpen(); 226 227 if (name == StandardSocketOptions.IP_TOS) { 228 ProtocolFamily family = Net.isIPv6Available() ? 229 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET; 230 Net.setSocketOption(fd, family, name, value); 231 return this; 232 } 233 234 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) { 235 // SO_REUSEADDR emulated when using exclusive bind 236 isReuseAddress = (Boolean)value; 237 return this; 238 } 239 240 // no options that require special handling 241 Net.setSocketOption(fd, name, value); 242 return this; 243 } 244 } 245 246 @Override 247 @SuppressWarnings("unchecked") 248 public <T> T getOption(SocketOption<T> name) 249 throws IOException 250 { 251 Objects.requireNonNull(name); 252 if (!supportedOptions().contains(name)) 253 throw new UnsupportedOperationException("'" + name + "' not supported"); 254 255 synchronized (stateLock) { 256 ensureOpen(); 257 258 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) { 259 // SO_REUSEADDR emulated when using exclusive bind 260 return (T)Boolean.valueOf(isReuseAddress); 261 } 262 263 // special handling for IP_TOS: always return 0 when IPv6 264 if (name == StandardSocketOptions.IP_TOS) { 265 ProtocolFamily family = Net.isIPv6Available() ? 266 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET; 267 return (T) Net.getSocketOption(fd, family, name); 268 } 269 270 // no options that require special handling 271 return (T) Net.getSocketOption(fd, name); 272 } 273 } 274 275 private static class DefaultOptionsHolder { 276 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 277 278 private static Set<SocketOption<?>> defaultOptions() { 279 HashSet<SocketOption<?>> set = new HashSet<>(); 280 set.add(StandardSocketOptions.SO_SNDBUF); 281 set.add(StandardSocketOptions.SO_RCVBUF); 282 set.add(StandardSocketOptions.SO_KEEPALIVE); 283 set.add(StandardSocketOptions.SO_REUSEADDR); 284 if (Net.isReusePortAvailable()) { 285 set.add(StandardSocketOptions.SO_REUSEPORT); 286 } 287 set.add(StandardSocketOptions.SO_LINGER); 288 set.add(StandardSocketOptions.TCP_NODELAY); 289 // additional options required by socket adaptor 290 set.add(StandardSocketOptions.IP_TOS); 291 set.add(ExtendedSocketOption.SO_OOBINLINE); 292 set.addAll(ExtendedSocketOptions.clientSocketOptions()); 293 return Collections.unmodifiableSet(set); 294 } 295 } 296 297 @Override 298 public final Set<SocketOption<?>> supportedOptions() { 299 return DefaultOptionsHolder.defaultOptions; 300 } 301 302 /** 303 * Marks the beginning of a read operation that might block. 304 * 305 * @throws ClosedChannelException if the channel is closed 306 * @throws NotYetConnectedException if the channel is not yet connected 307 */ 308 private void beginRead(boolean blocking) throws ClosedChannelException { 309 if (blocking) { 310 // set hook for Thread.interrupt 311 begin(); 312 313 synchronized (stateLock) { 314 ensureOpenAndConnected(); 315 // record thread so it can be signalled if needed 316 readerThread = NativeThread.current(); 317 } 318 } else { 319 ensureOpenAndConnected(); 320 } 321 } 322 323 /** 324 * Marks the end of a read operation that may have blocked. 325 * 326 * @throws AsynchronousCloseException if the channel was closed due to this 327 * thread being interrupted on a blocking read operation. 328 */ 329 private void endRead(boolean blocking, boolean completed) 330 throws AsynchronousCloseException 331 { 332 if (blocking) { 333 synchronized (stateLock) { 334 readerThread = 0; 335 if (state == ST_CLOSING) { 336 tryFinishClose(); 337 } 338 } 339 // remove hook for Thread.interrupt 340 end(completed); 341 } 342 } 343 344 private void throwConnectionReset() throws SocketException { 345 throw new SocketException("Connection reset"); 346 } 347 348 @Override 349 public int read(ByteBuffer buf) throws IOException { 350 Objects.requireNonNull(buf); 351 352 readLock.lock(); 353 try { 354 boolean blocking = isBlocking(); 355 int n = 0; 356 try { 357 beginRead(blocking); 358 359 // check if connection has been reset 360 if (connectionReset) 361 throwConnectionReset(); 362 363 // check if input is shutdown 364 if (isInputClosed) 365 return IOStatus.EOF; 366 367 n = IOUtil.read(fd, buf, -1, nd); 368 if (blocking) { 369 while (IOStatus.okayToRetry(n) && isOpen()) { 370 park(Net.POLLIN); 371 n = IOUtil.read(fd, buf, -1, nd); 372 } 373 } 374 } catch (ConnectionResetException e) { 375 connectionReset = true; 376 throwConnectionReset(); 377 } finally { 378 endRead(blocking, n > 0); 379 if (n <= 0 && isInputClosed) 380 return IOStatus.EOF; 381 } 382 return IOStatus.normalize(n); 383 } finally { 384 readLock.unlock(); 385 } 386 } 387 388 @Override 389 public long read(ByteBuffer[] dsts, int offset, int length) 390 throws IOException 391 { 392 Objects.checkFromIndexSize(offset, length, dsts.length); 393 394 readLock.lock(); 395 try { 396 boolean blocking = isBlocking(); 397 long n = 0; 398 try { 399 beginRead(blocking); 400 401 // check if connection has been reset 402 if (connectionReset) 403 throwConnectionReset(); 404 405 // check if input is shutdown 406 if (isInputClosed) 407 return IOStatus.EOF; 408 409 n = IOUtil.read(fd, dsts, offset, length, nd); 410 if (blocking) { 411 while (IOStatus.okayToRetry(n) && isOpen()) { 412 park(Net.POLLIN); 413 n = IOUtil.read(fd, dsts, offset, length, nd); 414 } 415 } 416 } catch (ConnectionResetException e) { 417 connectionReset = true; 418 throwConnectionReset(); 419 } finally { 420 endRead(blocking, n > 0); 421 if (n <= 0 && isInputClosed) 422 return IOStatus.EOF; 423 } 424 return IOStatus.normalize(n); 425 } finally { 426 readLock.unlock(); 427 } 428 } 429 430 /** 431 * Marks the beginning of a write operation that might block. 432 * 433 * @throws ClosedChannelException if the channel is closed or output shutdown 434 * @throws NotYetConnectedException if the channel is not yet connected 435 */ 436 private void beginWrite(boolean blocking) throws ClosedChannelException { 437 if (blocking) { 438 // set hook for Thread.interrupt 439 begin(); 440 441 synchronized (stateLock) { 442 ensureOpenAndConnected(); 443 if (isOutputClosed) 444 throw new ClosedChannelException(); 445 // record thread so it can be signalled if needed 446 writerThread = NativeThread.current(); 447 } 448 } else { 449 ensureOpenAndConnected(); 450 } 451 } 452 453 /** 454 * Marks the end of a write operation that may have blocked. 455 * 456 * @throws AsynchronousCloseException if the channel was closed due to this 457 * thread being interrupted on a blocking write operation. 458 */ 459 private void endWrite(boolean blocking, boolean completed) 460 throws AsynchronousCloseException 461 { 462 if (blocking) { 463 synchronized (stateLock) { 464 writerThread = 0; 465 if (state == ST_CLOSING) { 466 tryFinishClose(); 467 } 468 } 469 // remove hook for Thread.interrupt 470 end(completed); 471 } 472 } 473 474 @Override 475 public int write(ByteBuffer buf) throws IOException { 476 Objects.requireNonNull(buf); 477 478 writeLock.lock(); 479 try { 480 boolean blocking = isBlocking(); 481 int n = 0; 482 try { 483 beginWrite(blocking); 484 n = IOUtil.write(fd, buf, -1, nd); 485 if (blocking) { 486 while (IOStatus.okayToRetry(n) && isOpen()) { 487 park(Net.POLLOUT); 488 n = IOUtil.write(fd, buf, -1, nd); 489 } 490 } 491 } finally { 492 endWrite(blocking, n > 0); 493 if (n <= 0 && isOutputClosed) 494 throw new AsynchronousCloseException(); 495 } 496 return IOStatus.normalize(n); 497 } finally { 498 writeLock.unlock(); 499 } 500 } 501 502 @Override 503 public long write(ByteBuffer[] srcs, int offset, int length) 504 throws IOException 505 { 506 Objects.checkFromIndexSize(offset, length, srcs.length); 507 508 writeLock.lock(); 509 try { 510 boolean blocking = isBlocking(); 511 long n = 0; 512 try { 513 beginWrite(blocking); 514 n = IOUtil.write(fd, srcs, offset, length, nd); 515 if (blocking) { 516 while (IOStatus.okayToRetry(n) && isOpen()) { 517 park(Net.POLLOUT); 518 n = IOUtil.write(fd, srcs, offset, length, nd); 519 } 520 } 521 } finally { 522 endWrite(blocking, n > 0); 523 if (n <= 0 && isOutputClosed) 524 throw new AsynchronousCloseException(); 525 } 526 return IOStatus.normalize(n); 527 } finally { 528 writeLock.unlock(); 529 } 530 } 531 532 /** 533 * Writes a byte of out of band data. 534 */ 535 int sendOutOfBandData(byte b) throws IOException { 536 writeLock.lock(); 537 try { 538 boolean blocking = isBlocking(); 539 int n = 0; 540 try { 541 beginWrite(blocking); 542 if (blocking) { 543 do { 544 n = Net.sendOOB(fd, b); 545 } while (n == IOStatus.INTERRUPTED && isOpen()); 546 } else { 547 n = Net.sendOOB(fd, b); 548 } 549 } finally { 550 endWrite(blocking, n > 0); 551 if (n <= 0 && isOutputClosed) 552 throw new AsynchronousCloseException(); 553 } 554 return IOStatus.normalize(n); 555 } finally { 556 writeLock.unlock(); 557 } 558 } 559 560 @Override 561 protected void implConfigureBlocking(boolean block) throws IOException { 562 readLock.lock(); 563 try { 564 writeLock.lock(); 565 try { 566 lockedConfigureBlocking(block); 567 } finally { 568 writeLock.unlock(); 569 } 570 } finally { 571 readLock.unlock(); 572 } 573 } 574 575 /** 576 * Adjust the blocking mode while holding the readLock or writeLock. 577 */ 578 private void lockedConfigureBlocking(boolean block) throws IOException { 579 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); 580 synchronized (stateLock) { 581 ensureOpen(); 582 IOUtil.configureBlocking(fd, block); 583 } 584 } 585 586 /** 587 * Returns the local address, or null if not bound 588 */ 589 InetSocketAddress localAddress() { 590 synchronized (stateLock) { 591 return localAddress; 592 } 593 } 594 595 /** 596 * Returns the remote address, or null if not connected 597 */ 598 InetSocketAddress remoteAddress() { 599 synchronized (stateLock) { 600 return remoteAddress; 601 } 602 } 603 604 @Override 605 public SocketChannel bind(SocketAddress local) throws IOException { 606 readLock.lock(); 607 try { 608 writeLock.lock(); 609 try { 610 synchronized (stateLock) { 611 ensureOpen(); 612 if (state == ST_CONNECTIONPENDING) 613 throw new ConnectionPendingException(); 614 if (localAddress != null) 615 throw new AlreadyBoundException(); 616 InetSocketAddress isa = (local == null) ? 617 new InetSocketAddress(0) : Net.checkAddress(local); 618 SecurityManager sm = System.getSecurityManager(); 619 if (sm != null) { 620 sm.checkListen(isa.getPort()); 621 } 622 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort()); 623 Net.bind(fd, isa.getAddress(), isa.getPort()); 624 localAddress = Net.localAddress(fd); 625 } 626 } finally { 627 writeLock.unlock(); 628 } 629 } finally { 630 readLock.unlock(); 631 } 632 return this; 633 } 634 635 @Override 636 public boolean isConnected() { 637 return (state == ST_CONNECTED); 638 } 639 640 @Override 641 public boolean isConnectionPending() { 642 return (state == ST_CONNECTIONPENDING); 643 } 644 645 /** 646 * Marks the beginning of a connect operation that might block. 647 * @param blocking true if configured blocking 648 * @param isa the remote address 649 * @throws ClosedChannelException if the channel is closed 650 * @throws AlreadyConnectedException if already connected 651 * @throws ConnectionPendingException is a connection is pending 652 * @throws IOException if the pre-connect hook fails 653 */ 654 private void beginConnect(boolean blocking, InetSocketAddress isa) 655 throws IOException 656 { 657 if (blocking) { 658 // set hook for Thread.interrupt 659 begin(); 660 } 661 synchronized (stateLock) { 662 ensureOpen(); 663 int state = this.state; 664 if (state == ST_CONNECTED) 665 throw new AlreadyConnectedException(); 666 if (state == ST_CONNECTIONPENDING) 667 throw new ConnectionPendingException(); 668 assert state == ST_UNCONNECTED; 669 this.state = ST_CONNECTIONPENDING; 670 671 if (localAddress == null) 672 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort()); 673 remoteAddress = isa; 674 675 if (blocking) { 676 // record thread so it can be signalled if needed 677 readerThread = NativeThread.current(); 678 } 679 } 680 } 681 682 /** 683 * Marks the end of a connect operation that may have blocked. 684 * 685 * @throws AsynchronousCloseException if the channel was closed due to this 686 * thread being interrupted on a blocking connect operation. 687 * @throws IOException if completed and unable to obtain the local address 688 */ 689 private void endConnect(boolean blocking, boolean completed) 690 throws IOException 691 { 692 endRead(blocking, completed); 693 694 if (completed) { 695 synchronized (stateLock) { 696 if (state == ST_CONNECTIONPENDING) { 697 localAddress = Net.localAddress(fd); 698 state = ST_CONNECTED; 699 } 700 } 701 } 702 } 703 704 /** 705 * Checks the remote address to which this channel is to be connected. 706 */ 707 private InetSocketAddress checkRemote(SocketAddress sa) throws IOException { 708 InetSocketAddress isa = Net.checkAddress(sa); 709 SecurityManager sm = System.getSecurityManager(); 710 if (sm != null) { 711 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 712 } 713 if (isa.getAddress().isAnyLocalAddress()) { 714 return new InetSocketAddress(InetAddress.getLocalHost(), isa.getPort()); 715 } else { 716 return isa; 717 } 718 } 719 720 @Override 721 public boolean connect(SocketAddress remote) throws IOException { 722 InetSocketAddress isa = checkRemote(remote); 723 try { 724 readLock.lock(); 725 try { 726 writeLock.lock(); 727 try { 728 boolean blocking = isBlocking(); 729 boolean connected = false; 730 try { 731 beginConnect(blocking, isa); 732 int n = Net.connect(fd, isa.getAddress(), isa.getPort()); 733 if (n > 0) { 734 connected = true; 735 } else if (blocking) { 736 assert IOStatus.okayToRetry(n); 737 boolean polled = false; 738 while (!polled && isOpen()) { 739 park(Net.POLLOUT); 740 polled = Net.pollConnectNow(fd); 741 } 742 connected = polled && isOpen(); 743 } 744 } finally { 745 endConnect(blocking, connected); 746 } 747 return connected; 748 } finally { 749 writeLock.unlock(); 750 } 751 } finally { 752 readLock.unlock(); 753 } 754 } catch (IOException ioe) { 755 // connect failed, close the channel 756 close(); 757 throw SocketExceptions.of(ioe, isa); 758 } 759 } 760 761 /** 762 * Marks the beginning of a finishConnect operation that might block. 763 * 764 * @throws ClosedChannelException if the channel is closed 765 * @throws NoConnectionPendingException if no connection is pending 766 */ 767 private void beginFinishConnect(boolean blocking) throws ClosedChannelException { 768 if (blocking) { 769 // set hook for Thread.interrupt 770 begin(); 771 } 772 synchronized (stateLock) { 773 ensureOpen(); 774 if (state != ST_CONNECTIONPENDING) 775 throw new NoConnectionPendingException(); 776 if (blocking) { 777 // record thread so it can be signalled if needed 778 readerThread = NativeThread.current(); 779 } 780 } 781 } 782 783 /** 784 * Marks the end of a finishConnect operation that may have blocked. 785 * 786 * @throws AsynchronousCloseException if the channel was closed due to this 787 * thread being interrupted on a blocking connect operation. 788 * @throws IOException if completed and unable to obtain the local address 789 */ 790 private void endFinishConnect(boolean blocking, boolean completed) 791 throws IOException 792 { 793 endRead(blocking, completed); 794 795 if (completed) { 796 synchronized (stateLock) { 797 if (state == ST_CONNECTIONPENDING) { 798 localAddress = Net.localAddress(fd); 799 state = ST_CONNECTED; 800 } 801 } 802 } 803 } 804 805 @Override 806 public boolean finishConnect() throws IOException { 807 try { 808 readLock.lock(); 809 try { 810 writeLock.lock(); 811 try { 812 // no-op if already connected 813 if (isConnected()) 814 return true; 815 816 boolean blocking = isBlocking(); 817 boolean connected = false; 818 try { 819 beginFinishConnect(blocking); 820 boolean polled = Net.pollConnectNow(fd); 821 if (blocking) { 822 while (!polled && isOpen()) { 823 park(Net.POLLOUT); 824 polled = Net.pollConnectNow(fd); 825 } 826 } 827 connected = polled && isOpen(); 828 } finally { 829 endFinishConnect(blocking, connected); 830 } 831 assert (blocking && connected) ^ !blocking; 832 return connected; 833 } finally { 834 writeLock.unlock(); 835 } 836 } finally { 837 readLock.unlock(); 838 } 839 } catch (IOException ioe) { 840 // connect failed, close the channel 841 close(); 842 throw SocketExceptions.of(ioe, remoteAddress); 843 } 844 } 845 846 /** 847 * Closes the socket if there are no I/O operations in progress and the 848 * channel is not registered with a Selector. 849 */ 850 private boolean tryClose() throws IOException { 851 assert Thread.holdsLock(stateLock) && state == ST_CLOSING; 852 if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) { 853 state = ST_CLOSED; 854 nd.close(fd); 855 return true; 856 } else { 857 return false; 858 } 859 } 860 861 /** 862 * Invokes tryClose to attempt to close the socket. 863 * 864 * This method is used for deferred closing by I/O and Selector operations. 865 */ 866 private void tryFinishClose() { 867 try { 868 tryClose(); 869 } catch (IOException ignore) { } 870 } 871 872 /** 873 * Closes this channel when configured in blocking mode. 874 * 875 * If there is an I/O operation in progress then the socket is pre-closed 876 * and the I/O threads signalled, in which case the final close is deferred 877 * until all I/O operations complete. 878 * 879 * Note that a channel configured blocking may be registered with a Selector 880 * This arises when a key is canceled and the channel configured to blocking 881 * mode before the key is flushed from the Selector. 882 */ 883 private void implCloseBlockingMode() throws IOException { 884 synchronized (stateLock) { 885 assert state < ST_CLOSING; 886 state = ST_CLOSING; 887 if (!tryClose()) { 888 long reader = readerThread; 889 long writer = writerThread; 890 if (reader != 0 || writer != 0) { 891 nd.preClose(fd); 892 if (reader != 0) 893 NativeThread.signal(reader); 894 if (writer != 0) 895 NativeThread.signal(writer); 896 } 897 } 898 } 899 } 900 901 /** 902 * Closes this channel when configured in non-blocking mode. 903 * 904 * If the channel is registered with a Selector then the close is deferred 905 * until the channel is flushed from all Selectors. 906 * 907 * If the socket is connected and the channel is registered with a Selector 908 * then the socket is shutdown for writing so that the peer reads EOF. In 909 * addition, if SO_LINGER is set to a non-zero value then it is disabled so 910 * that the deferred close does not wait. 911 */ 912 private void implCloseNonBlockingMode() throws IOException { 913 boolean connected; 914 synchronized (stateLock) { 915 assert state < ST_CLOSING; 916 connected = (state == ST_CONNECTED); 917 state = ST_CLOSING; 918 } 919 920 // wait for any read/write operations to complete 921 readLock.lock(); 922 readLock.unlock(); 923 writeLock.lock(); 924 writeLock.unlock(); 925 926 // if the socket cannot be closed because it's registered with a Selector 927 // then shutdown the socket for writing. 928 synchronized (stateLock) { 929 if (state == ST_CLOSING && !tryClose() && connected && isRegistered()) { 930 try { 931 SocketOption<Integer> opt = StandardSocketOptions.SO_LINGER; 932 int interval = (int) Net.getSocketOption(fd, Net.UNSPEC, opt); 933 if (interval != 0) { 934 if (interval > 0) { 935 // disable SO_LINGER 936 Net.setSocketOption(fd, Net.UNSPEC, opt, -1); 937 } 938 Net.shutdown(fd, Net.SHUT_WR); 939 } 940 } catch (IOException ignore) { } 941 } 942 } 943 } 944 945 /** 946 * Invoked by implCloseChannel to close the channel. 947 */ 948 @Override 949 protected void implCloseSelectableChannel() throws IOException { 950 assert !isOpen(); 951 if (isBlocking()) { 952 implCloseBlockingMode(); 953 } else { 954 implCloseNonBlockingMode(); 955 } 956 } 957 958 @Override 959 public void kill() { 960 synchronized (stateLock) { 961 if (state == ST_CLOSING) { 962 tryFinishClose(); 963 } 964 } 965 } 966 967 @Override 968 public SocketChannel shutdownInput() throws IOException { 969 synchronized (stateLock) { 970 ensureOpen(); 971 if (!isConnected()) 972 throw new NotYetConnectedException(); 973 if (!isInputClosed) { 974 Net.shutdown(fd, Net.SHUT_RD); 975 long thread = readerThread; 976 if (thread != 0) 977 NativeThread.signal(thread); 978 isInputClosed = true; 979 } 980 return this; 981 } 982 } 983 984 @Override 985 public SocketChannel shutdownOutput() throws IOException { 986 synchronized (stateLock) { 987 ensureOpen(); 988 if (!isConnected()) 989 throw new NotYetConnectedException(); 990 if (!isOutputClosed) { 991 Net.shutdown(fd, Net.SHUT_WR); 992 long thread = writerThread; 993 if (thread != 0) 994 NativeThread.signal(thread); 995 isOutputClosed = true; 996 } 997 return this; 998 } 999 } 1000 1001 boolean isInputOpen() { 1002 return !isInputClosed; 1003 } 1004 1005 boolean isOutputOpen() { 1006 return !isOutputClosed; 1007 } 1008 1009 /** 1010 * Waits for a connection attempt to finish with a timeout 1011 * @throws SocketTimeoutException if the connect timeout elapses 1012 */ 1013 private boolean finishTimedConnect(long nanos) throws IOException { 1014 long startNanos = System.nanoTime(); 1015 boolean polled = Net.pollConnectNow(fd); 1016 while (!polled && isOpen()) { 1017 long remainingNanos = nanos - (System.nanoTime() - startNanos); 1018 if (remainingNanos <= 0) { 1019 throw new SocketTimeoutException("Connect timed out"); 1020 } 1021 park(Net.POLLOUT, remainingNanos); 1022 polled = Net.pollConnectNow(fd); 1023 } 1024 return polled && isOpen(); 1025 } 1026 1027 /** 1028 * Attempts to establish a connection to the given socket address with a 1029 * timeout. Closes the socket if connection cannot be established. 1030 * 1031 * @apiNote This method is for use by the socket adaptor. 1032 * 1033 * @throws IllegalBlockingModeException if the channel is non-blocking 1034 * @throws SocketTimeoutException if the read timeout elapses 1035 */ 1036 void blockingConnect(SocketAddress remote, long nanos) throws IOException { 1037 InetSocketAddress isa = checkRemote(remote); 1038 try { 1039 readLock.lock(); 1040 try { 1041 writeLock.lock(); 1042 try { 1043 if (!isBlocking()) 1044 throw new IllegalBlockingModeException(); 1045 boolean connected = false; 1046 try { 1047 beginConnect(true, isa); 1048 // change socket to non-blocking 1049 lockedConfigureBlocking(false); 1050 try { 1051 int n = Net.connect(fd, isa.getAddress(), isa.getPort()); 1052 connected = (n > 0) ? true : finishTimedConnect(nanos); 1053 } finally { 1054 // restore socket to blocking mode 1055 lockedConfigureBlocking(true); 1056 } 1057 } finally { 1058 endConnect(true, connected); 1059 } 1060 } finally { 1061 writeLock.unlock(); 1062 } 1063 } finally { 1064 readLock.unlock(); 1065 } 1066 } catch (IOException ioe) { 1067 // connect failed, close the channel 1068 close(); 1069 throw SocketExceptions.of(ioe, isa); 1070 } 1071 } 1072 1073 /** 1074 * Attempts to read bytes from the socket into the given byte array. 1075 */ 1076 private int tryRead(byte[] b, int off, int len) throws IOException { 1077 ByteBuffer dst = Util.getTemporaryDirectBuffer(len); 1078 assert dst.position() == 0; 1079 try { 1080 int n = nd.read(fd, ((DirectBuffer)dst).address(), len); 1081 if (n > 0) { 1082 dst.get(b, off, n); 1083 } 1084 return n; 1085 } finally{ 1086 Util.offerFirstTemporaryDirectBuffer(dst); 1087 } 1088 } 1089 1090 /** 1091 * Reads bytes from the socket into the given byte array with a timeout. 1092 * @throws SocketTimeoutException if the read timeout elapses 1093 */ 1094 private int timedRead(byte[] b, int off, int len, long nanos) throws IOException { 1095 long startNanos = System.nanoTime(); 1096 int n = tryRead(b, off, len); 1097 while (n == IOStatus.UNAVAILABLE && isOpen()) { 1098 long remainingNanos = nanos - (System.nanoTime() - startNanos); 1099 if (remainingNanos <= 0) { 1100 throw new SocketTimeoutException("Read timed out"); 1101 } 1102 park(Net.POLLIN, remainingNanos); 1103 n = tryRead(b, off, len); 1104 } 1105 return n; 1106 } 1107 1108 /** 1109 * Reads bytes from the socket into the given byte array. 1110 * 1111 * @apiNote This method is for use by the socket adaptor. 1112 * 1113 * @throws IllegalBlockingModeException if the channel is non-blocking 1114 * @throws SocketTimeoutException if the read timeout elapses 1115 */ 1116 int blockingRead(byte[] b, int off, int len, long nanos) throws IOException { 1117 Objects.checkFromIndexSize(off, len, b.length); 1118 if (len == 0) { 1119 // nothing to do 1120 return 0; 1121 } 1122 1123 readLock.lock(); 1124 try { 1125 // check that channel is configured blocking 1126 if (!isBlocking()) 1127 throw new IllegalBlockingModeException(); 1128 1129 int n = 0; 1130 try { 1131 beginRead(true); 1132 1133 // check if connection has been reset 1134 if (connectionReset) 1135 throwConnectionReset(); 1136 1137 // check if input is shutdown 1138 if (isInputClosed) 1139 return IOStatus.EOF; 1140 1141 if (nanos > 0) { 1142 // change socket to non-blocking 1143 lockedConfigureBlocking(false); 1144 try { 1145 n = timedRead(b, off, len, nanos); 1146 } finally { 1147 // restore socket to blocking mode 1148 lockedConfigureBlocking(true); 1149 } 1150 } else { 1151 // read, no timeout 1152 n = tryRead(b, off, len); 1153 while (IOStatus.okayToRetry(n) && isOpen()) { 1154 park(Net.POLLIN); 1155 n = tryRead(b, off, len); 1156 } 1157 } 1158 } catch (ConnectionResetException e) { 1159 connectionReset = true; 1160 throwConnectionReset(); 1161 } finally { 1162 endRead(true, n > 0); 1163 if (n <= 0 && isInputClosed) 1164 return IOStatus.EOF; 1165 } 1166 assert n > 0 || n == -1; 1167 return n; 1168 } finally { 1169 readLock.unlock(); 1170 } 1171 } 1172 1173 /** 1174 * Attempts to write a sequence of bytes to the socket from the given 1175 * byte array. 1176 */ 1177 private int tryWrite(byte[] b, int off, int len) throws IOException { 1178 ByteBuffer src = Util.getTemporaryDirectBuffer(len); 1179 assert src.position() == 0; 1180 try { 1181 src.put(b, off, len); 1182 return nd.write(fd, ((DirectBuffer)src).address(), len); 1183 } finally { 1184 Util.offerFirstTemporaryDirectBuffer(src); 1185 } 1186 } 1187 1188 /** 1189 * Writes a sequence of bytes to the socket from the given byte array. 1190 * 1191 * @apiNote This method is for use by the socket adaptor. 1192 */ 1193 void blockingWriteFully(byte[] b, int off, int len) throws IOException { 1194 Objects.checkFromIndexSize(off, len, b.length); 1195 if (len == 0) { 1196 // nothing to do 1197 return; 1198 } 1199 1200 writeLock.lock(); 1201 try { 1202 // check that channel is configured blocking 1203 if (!isBlocking()) 1204 throw new IllegalBlockingModeException(); 1205 1206 // loop until all bytes have been written 1207 int pos = off; 1208 int end = off + len; 1209 beginWrite(true); 1210 try { 1211 while (pos < end && isOpen()) { 1212 int size = end - pos; 1213 int n = tryWrite(b, pos, size); 1214 while (IOStatus.okayToRetry(n) && isOpen()) { 1215 park(Net.POLLOUT); 1216 n = tryWrite(b, pos, size); 1217 } 1218 if (n > 0) { 1219 pos += n; 1220 } 1221 } 1222 } finally { 1223 endWrite(true, pos >= end); 1224 } 1225 } finally { 1226 writeLock.unlock(); 1227 } 1228 } 1229 1230 /** 1231 * Return the number of bytes in the socket input buffer. 1232 */ 1233 int available() throws IOException { 1234 synchronized (stateLock) { 1235 ensureOpenAndConnected(); 1236 if (isInputClosed) { 1237 return 0; 1238 } else { 1239 return Net.available(fd); 1240 } 1241 } 1242 } 1243 1244 /** 1245 * Translates native poll revent ops into a ready operation ops 1246 */ 1247 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { 1248 int intOps = ski.nioInterestOps(); 1249 int oldOps = ski.nioReadyOps(); 1250 int newOps = initialOps; 1251 1252 if ((ops & Net.POLLNVAL) != 0) { 1253 // This should only happen if this channel is pre-closed while a 1254 // selection operation is in progress 1255 // ## Throw an error if this channel has not been pre-closed 1256 return false; 1257 } 1258 1259 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 1260 newOps = intOps; 1261 ski.nioReadyOps(newOps); 1262 return (newOps & ~oldOps) != 0; 1263 } 1264 1265 boolean connected = isConnected(); 1266 if (((ops & Net.POLLIN) != 0) && 1267 ((intOps & SelectionKey.OP_READ) != 0) && connected) 1268 newOps |= SelectionKey.OP_READ; 1269 1270 if (((ops & Net.POLLCONN) != 0) && 1271 ((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending()) 1272 newOps |= SelectionKey.OP_CONNECT; 1273 1274 if (((ops & Net.POLLOUT) != 0) && 1275 ((intOps & SelectionKey.OP_WRITE) != 0) && connected) 1276 newOps |= SelectionKey.OP_WRITE; 1277 1278 ski.nioReadyOps(newOps); 1279 return (newOps & ~oldOps) != 0; 1280 } 1281 1282 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 1283 return translateReadyOps(ops, ski.nioReadyOps(), ski); 1284 } 1285 1286 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 1287 return translateReadyOps(ops, 0, ski); 1288 } 1289 1290 /** 1291 * Translates an interest operation set into a native poll event set 1292 */ 1293 public int translateInterestOps(int ops) { 1294 int newOps = 0; 1295 if ((ops & SelectionKey.OP_READ) != 0) 1296 newOps |= Net.POLLIN; 1297 if ((ops & SelectionKey.OP_WRITE) != 0) 1298 newOps |= Net.POLLOUT; 1299 if ((ops & SelectionKey.OP_CONNECT) != 0) 1300 newOps |= Net.POLLCONN; 1301 return newOps; 1302 } 1303 1304 public FileDescriptor getFD() { 1305 return fd; 1306 } 1307 1308 public int getFDVal() { 1309 return fdVal; 1310 } 1311 1312 @Override 1313 public String toString() { 1314 StringBuilder sb = new StringBuilder(); 1315 sb.append(this.getClass().getSuperclass().getName()); 1316 sb.append('['); 1317 if (!isOpen()) 1318 sb.append("closed"); 1319 else { 1320 synchronized (stateLock) { 1321 switch (state) { 1322 case ST_UNCONNECTED: 1323 sb.append("unconnected"); 1324 break; 1325 case ST_CONNECTIONPENDING: 1326 sb.append("connection-pending"); 1327 break; 1328 case ST_CONNECTED: 1329 sb.append("connected"); 1330 if (isInputClosed) 1331 sb.append(" ishut"); 1332 if (isOutputClosed) 1333 sb.append(" oshut"); 1334 break; 1335 } 1336 InetSocketAddress addr = localAddress(); 1337 if (addr != null) { 1338 sb.append(" local="); 1339 sb.append(Net.getRevealedLocalAddressAsString(addr)); 1340 } 1341 if (remoteAddress() != null) { 1342 sb.append(" remote="); 1343 sb.append(remoteAddress().toString()); 1344 } 1345 } 1346 } 1347 sb.append(']'); 1348 return sb.toString(); 1349 } 1350 }