1 /* 2 * Copyright (c) 2000, 2019, Oracle and/or its affiliates. All rights reserved. 3 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. 4 * 5 * This code is free software; you can redistribute it and/or modify it 6 * under the terms of the GNU General Public License version 2 only, as 7 * published by the Free Software Foundation. Oracle designates this 8 * particular file as subject to the "Classpath" exception as provided 9 * by Oracle in the LICENSE file that accompanied this code. 10 * 11 * This code is distributed in the hope that it will be useful, but WITHOUT 12 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or 13 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License 14 * version 2 for more details (a copy is included in the LICENSE file that 15 * accompanied this code). 16 * 17 * You should have received a copy of the GNU General Public License version 18 * 2 along with this work; if not, write to the Free Software Foundation, 19 * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA. 20 * 21 * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA 22 * or visit www.oracle.com if you need additional information or have any 23 * questions. 24 */ 25 26 package sun.nio.ch; 27 28 import java.io.FileDescriptor; 29 import java.io.IOException; 30 import java.net.InetAddress; 31 import java.net.InetSocketAddress; 32 import java.net.ProtocolFamily; 33 import java.net.Socket; 34 import java.net.SocketAddress; 35 import java.net.SocketException; 36 import java.net.SocketOption; 37 import java.net.SocketTimeoutException; 38 import java.net.StandardProtocolFamily; 39 import java.net.StandardSocketOptions; 40 import java.net.UnixSocketAddress; 41 import java.nio.ByteBuffer; 42 import java.nio.channels.AlreadyBoundException; 43 import java.nio.channels.AlreadyConnectedException; 44 import java.nio.channels.AsynchronousCloseException; 45 import java.nio.channels.ClosedChannelException; 46 import java.nio.channels.ConnectionPendingException; 47 import java.nio.channels.IllegalBlockingModeException; 48 import java.nio.channels.NoConnectionPendingException; 49 import java.nio.channels.NotYetConnectedException; 50 import java.nio.channels.SelectionKey; 51 import java.nio.channels.SocketChannel; 52 import java.nio.channels.spi.SelectorProvider; 53 import java.util.Collections; 54 import java.util.HashSet; 55 import java.util.Objects; 56 import java.util.Set; 57 import java.util.concurrent.locks.ReentrantLock; 58 59 import sun.net.ConnectionResetException; 60 import sun.net.NetHooks; 61 import sun.net.ext.ExtendedSocketOptions; 62 import sun.net.util.SocketExceptions; 63 64 /** 65 * An implementation of SocketChannels 66 */ 67 68 class SocketChannelImpl 69 extends SocketChannel 70 implements SelChImpl 71 { 72 // Used to make native read and write calls 73 private static final NativeDispatcher nd = new SocketDispatcher(); 74 75 // Our file descriptor object 76 private final FileDescriptor fd; 77 private final int fdVal; 78 79 // Lock held by current reading or connecting thread 80 private final ReentrantLock readLock = new ReentrantLock(); 81 82 // Lock held by current writing or connecting thread 83 private final ReentrantLock writeLock = new ReentrantLock(); 84 85 // Lock held by any thread that modifies the state fields declared below 86 // DO NOT invoke a blocking I/O operation while holding this lock! 87 private final Object stateLock = new Object(); 88 89 // Input/Output closed 90 private volatile boolean isInputClosed; 91 private volatile boolean isOutputClosed; 92 93 // Connection reset protected by readLock 94 private boolean connectionReset; 95 96 // -- The following fields are protected by stateLock 97 98 // set true when exclusive binding is on and SO_REUSEADDR is emulated 99 private boolean isReuseAddress; 100 101 // State, increases monotonically 102 private static final int ST_UNCONNECTED = 0; 103 private static final int ST_CONNECTIONPENDING = 1; 104 private static final int ST_CONNECTED = 2; 105 private static final int ST_CLOSING = 3; 106 private static final int ST_CLOSED = 4; 107 private volatile int state; // need stateLock to change 108 109 // IDs of native threads doing reads and writes, for signalling 110 private long readerThread; 111 private long writerThread; 112 113 // Binding 114 private SocketAddress localAddress; 115 private SocketAddress remoteAddress; 116 117 // Socket adaptor, created on demand 118 private Socket socket; 119 120 // -- End of fields protected by stateLock 121 122 SocketChannelImpl(SelectorProvider sp, ProtocolFamily family) throws IOException { 123 super(sp); 124 this.fd = Net.socket(family, true); 125 this.fdVal = IOUtil.fdVal(fd); 126 } 127 128 // Constructor for normal connecting sockets 129 // 130 SocketChannelImpl(SelectorProvider sp) throws IOException { 131 super(sp); 132 this.fd = Net.socket(true); 133 this.fdVal = IOUtil.fdVal(fd); 134 } 135 136 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, boolean bound) 137 throws IOException 138 { 139 super(sp); 140 this.fd = fd; 141 this.fdVal = IOUtil.fdVal(fd); 142 if (bound) { 143 synchronized (stateLock) { 144 this.localAddress = Net.localAddress(fd); 145 } 146 } 147 } 148 149 // Constructor for sockets obtained from server sockets 150 // 151 SocketChannelImpl(SelectorProvider sp, FileDescriptor fd, SocketAddress isa) 152 throws IOException 153 { 154 super(sp); 155 this.fd = fd; 156 this.fdVal = IOUtil.fdVal(fd); 157 synchronized (stateLock) { 158 this.localAddress = Net.localSocketAddress(fd); 159 this.remoteAddress = isa; 160 this.state = ST_CONNECTED; 161 } 162 } 163 164 /** 165 * Checks that the channel is open. 166 * 167 * @throws ClosedChannelException if channel is closed (or closing) 168 */ 169 private void ensureOpen() throws ClosedChannelException { 170 if (!isOpen()) 171 throw new ClosedChannelException(); 172 } 173 174 /** 175 * Checks that the channel is open and connected. 176 * 177 * @apiNote This method uses the "state" field to check if the channel is 178 * open. It should never be used in conjuncion with isOpen or ensureOpen 179 * as these methods check AbstractInterruptibleChannel's closed field - that 180 * field is set before implCloseSelectableChannel is called and so before 181 * the state is changed. 182 * 183 * @throws ClosedChannelException if channel is closed (or closing) 184 * @throws NotYetConnectedException if open and not connected 185 */ 186 private void ensureOpenAndConnected() throws ClosedChannelException { 187 int state = this.state; 188 if (state < ST_CONNECTED) { 189 throw new NotYetConnectedException(); 190 } else if (state > ST_CONNECTED) { 191 throw new ClosedChannelException(); 192 } 193 } 194 195 @Override 196 public Socket socket() { 197 synchronized (stateLock) { 198 if (socket == null) 199 socket = SocketAdaptor.create(this); 200 return socket; 201 } 202 } 203 204 @Override 205 public SocketAddress getLocalAddress() throws IOException { 206 synchronized (stateLock) { 207 ensureOpen(); 208 return localAddress instanceof InetSocketAddress ? Net.getRevealedLocalAddress((InetSocketAddress) localAddress) : localAddress; 209 } 210 } 211 212 @Override 213 public SocketAddress getRemoteAddress() throws IOException { 214 synchronized (stateLock) { 215 ensureOpen(); 216 return remoteAddress; 217 } 218 } 219 220 @Override 221 public <T> SocketChannel setOption(SocketOption<T> name, T value) 222 throws IOException 223 { 224 Objects.requireNonNull(name); 225 if (!supportedOptions().contains(name)) 226 throw new UnsupportedOperationException("'" + name + "' not supported"); 227 if (!name.type().isInstance(value)) 228 throw new IllegalArgumentException("Invalid value '" + value + "'"); 229 230 synchronized (stateLock) { 231 ensureOpen(); 232 233 if (name == StandardSocketOptions.IP_TOS) { 234 ProtocolFamily family = Net.isIPv6Available() ? 235 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET; 236 Net.setSocketOption(fd, family, name, value); 237 return this; 238 } 239 240 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) { 241 // SO_REUSEADDR emulated when using exclusive bind 242 isReuseAddress = (Boolean)value; 243 return this; 244 } 245 246 // no options that require special handling 247 Net.setSocketOption(fd, name, value); 248 return this; 249 } 250 } 251 252 @Override 253 @SuppressWarnings("unchecked") 254 public <T> T getOption(SocketOption<T> name) 255 throws IOException 256 { 257 Objects.requireNonNull(name); 258 if (!supportedOptions().contains(name)) 259 throw new UnsupportedOperationException("'" + name + "' not supported"); 260 261 synchronized (stateLock) { 262 ensureOpen(); 263 264 if (name == StandardSocketOptions.SO_REUSEADDR && Net.useExclusiveBind()) { 265 // SO_REUSEADDR emulated when using exclusive bind 266 return (T)Boolean.valueOf(isReuseAddress); 267 } 268 269 // special handling for IP_TOS: always return 0 when IPv6 270 if (name == StandardSocketOptions.IP_TOS) { 271 ProtocolFamily family = Net.isIPv6Available() ? 272 StandardProtocolFamily.INET6 : StandardProtocolFamily.INET; 273 return (T) Net.getSocketOption(fd, family, name); 274 } 275 276 // no options that require special handling 277 return (T) Net.getSocketOption(fd, name); 278 } 279 } 280 281 private static class DefaultOptionsHolder { 282 static final Set<SocketOption<?>> defaultOptions = defaultOptions(); 283 284 private static Set<SocketOption<?>> defaultOptions() { 285 HashSet<SocketOption<?>> set = new HashSet<>(); 286 set.add(StandardSocketOptions.SO_SNDBUF); 287 set.add(StandardSocketOptions.SO_RCVBUF); 288 set.add(StandardSocketOptions.SO_KEEPALIVE); 289 set.add(StandardSocketOptions.SO_REUSEADDR); 290 if (Net.isReusePortAvailable()) { 291 set.add(StandardSocketOptions.SO_REUSEPORT); 292 } 293 set.add(StandardSocketOptions.SO_LINGER); 294 set.add(StandardSocketOptions.TCP_NODELAY); 295 // additional options required by socket adaptor 296 set.add(StandardSocketOptions.IP_TOS); 297 set.add(ExtendedSocketOption.SO_OOBINLINE); 298 set.addAll(ExtendedSocketOptions.clientSocketOptions()); 299 return Collections.unmodifiableSet(set); 300 } 301 } 302 303 @Override 304 public final Set<SocketOption<?>> supportedOptions() { 305 return DefaultOptionsHolder.defaultOptions; 306 } 307 308 /** 309 * Marks the beginning of a read operation that might block. 310 * 311 * @throws ClosedChannelException if the channel is closed 312 * @throws NotYetConnectedException if the channel is not yet connected 313 */ 314 private void beginRead(boolean blocking) throws ClosedChannelException { 315 if (blocking) { 316 // set hook for Thread.interrupt 317 begin(); 318 319 synchronized (stateLock) { 320 ensureOpenAndConnected(); 321 // record thread so it can be signalled if needed 322 readerThread = NativeThread.current(); 323 } 324 } else { 325 ensureOpenAndConnected(); 326 } 327 } 328 329 /** 330 * Marks the end of a read operation that may have blocked. 331 * 332 * @throws AsynchronousCloseException if the channel was closed due to this 333 * thread being interrupted on a blocking read operation. 334 */ 335 private void endRead(boolean blocking, boolean completed) 336 throws AsynchronousCloseException 337 { 338 if (blocking) { 339 synchronized (stateLock) { 340 readerThread = 0; 341 if (state == ST_CLOSING) { 342 tryFinishClose(); 343 } 344 } 345 // remove hook for Thread.interrupt 346 end(completed); 347 } 348 } 349 350 private void throwConnectionReset() throws SocketException { 351 throw new SocketException("Connection reset"); 352 } 353 354 @Override 355 public int read(ByteBuffer buf) throws IOException { 356 Objects.requireNonNull(buf); 357 358 readLock.lock(); 359 try { 360 boolean blocking = isBlocking(); 361 int n = 0; 362 try { 363 beginRead(blocking); 364 365 // check if connection has been reset 366 if (connectionReset) 367 throwConnectionReset(); 368 369 // check if input is shutdown 370 if (isInputClosed) 371 return IOStatus.EOF; 372 373 n = IOUtil.read(fd, buf, -1, nd); 374 if (blocking) { 375 while (IOStatus.okayToRetry(n) && isOpen()) { 376 park(Net.POLLIN); 377 n = IOUtil.read(fd, buf, -1, nd); 378 } 379 } 380 } catch (ConnectionResetException e) { 381 connectionReset = true; 382 throwConnectionReset(); 383 } finally { 384 endRead(blocking, n > 0); 385 if (n <= 0 && isInputClosed) 386 return IOStatus.EOF; 387 } 388 return IOStatus.normalize(n); 389 } finally { 390 readLock.unlock(); 391 } 392 } 393 394 @Override 395 public long read(ByteBuffer[] dsts, int offset, int length) 396 throws IOException 397 { 398 Objects.checkFromIndexSize(offset, length, dsts.length); 399 400 readLock.lock(); 401 try { 402 boolean blocking = isBlocking(); 403 long n = 0; 404 try { 405 beginRead(blocking); 406 407 // check if connection has been reset 408 if (connectionReset) 409 throwConnectionReset(); 410 411 // check if input is shutdown 412 if (isInputClosed) 413 return IOStatus.EOF; 414 415 n = IOUtil.read(fd, dsts, offset, length, nd); 416 if (blocking) { 417 while (IOStatus.okayToRetry(n) && isOpen()) { 418 park(Net.POLLIN); 419 n = IOUtil.read(fd, dsts, offset, length, nd); 420 } 421 } 422 } catch (ConnectionResetException e) { 423 connectionReset = true; 424 throwConnectionReset(); 425 } finally { 426 endRead(blocking, n > 0); 427 if (n <= 0 && isInputClosed) 428 return IOStatus.EOF; 429 } 430 return IOStatus.normalize(n); 431 } finally { 432 readLock.unlock(); 433 } 434 } 435 436 /** 437 * Marks the beginning of a write operation that might block. 438 * 439 * @throws ClosedChannelException if the channel is closed or output shutdown 440 * @throws NotYetConnectedException if the channel is not yet connected 441 */ 442 private void beginWrite(boolean blocking) throws ClosedChannelException { 443 if (blocking) { 444 // set hook for Thread.interrupt 445 begin(); 446 447 synchronized (stateLock) { 448 ensureOpenAndConnected(); 449 if (isOutputClosed) 450 throw new ClosedChannelException(); 451 // record thread so it can be signalled if needed 452 writerThread = NativeThread.current(); 453 } 454 } else { 455 ensureOpenAndConnected(); 456 } 457 } 458 459 /** 460 * Marks the end of a write operation that may have blocked. 461 * 462 * @throws AsynchronousCloseException if the channel was closed due to this 463 * thread being interrupted on a blocking write operation. 464 */ 465 private void endWrite(boolean blocking, boolean completed) 466 throws AsynchronousCloseException 467 { 468 if (blocking) { 469 synchronized (stateLock) { 470 writerThread = 0; 471 if (state == ST_CLOSING) { 472 tryFinishClose(); 473 } 474 } 475 // remove hook for Thread.interrupt 476 end(completed); 477 } 478 } 479 480 @Override 481 public int write(ByteBuffer buf) throws IOException { 482 Objects.requireNonNull(buf); 483 484 writeLock.lock(); 485 try { 486 boolean blocking = isBlocking(); 487 int n = 0; 488 try { 489 beginWrite(blocking); 490 n = IOUtil.write(fd, buf, -1, nd); 491 if (blocking) { 492 while (IOStatus.okayToRetry(n) && isOpen()) { 493 park(Net.POLLOUT); 494 n = IOUtil.write(fd, buf, -1, nd); 495 } 496 } 497 } finally { 498 endWrite(blocking, n > 0); 499 if (n <= 0 && isOutputClosed) 500 throw new AsynchronousCloseException(); 501 } 502 return IOStatus.normalize(n); 503 } finally { 504 writeLock.unlock(); 505 } 506 } 507 508 @Override 509 public long write(ByteBuffer[] srcs, int offset, int length) 510 throws IOException 511 { 512 Objects.checkFromIndexSize(offset, length, srcs.length); 513 514 writeLock.lock(); 515 try { 516 boolean blocking = isBlocking(); 517 long n = 0; 518 try { 519 beginWrite(blocking); 520 n = IOUtil.write(fd, srcs, offset, length, nd); 521 if (blocking) { 522 while (IOStatus.okayToRetry(n) && isOpen()) { 523 park(Net.POLLOUT); 524 n = IOUtil.write(fd, srcs, offset, length, nd); 525 } 526 } 527 } finally { 528 endWrite(blocking, n > 0); 529 if (n <= 0 && isOutputClosed) 530 throw new AsynchronousCloseException(); 531 } 532 return IOStatus.normalize(n); 533 } finally { 534 writeLock.unlock(); 535 } 536 } 537 538 /** 539 * Writes a byte of out of band data. 540 */ 541 int sendOutOfBandData(byte b) throws IOException { 542 writeLock.lock(); 543 try { 544 boolean blocking = isBlocking(); 545 int n = 0; 546 try { 547 beginWrite(blocking); 548 if (blocking) { 549 do { 550 n = Net.sendOOB(fd, b); 551 } while (n == IOStatus.INTERRUPTED && isOpen()); 552 } else { 553 n = Net.sendOOB(fd, b); 554 } 555 } finally { 556 endWrite(blocking, n > 0); 557 if (n <= 0 && isOutputClosed) 558 throw new AsynchronousCloseException(); 559 } 560 return IOStatus.normalize(n); 561 } finally { 562 writeLock.unlock(); 563 } 564 } 565 566 @Override 567 protected void implConfigureBlocking(boolean block) throws IOException { 568 readLock.lock(); 569 try { 570 writeLock.lock(); 571 try { 572 lockedConfigureBlocking(block); 573 } finally { 574 writeLock.unlock(); 575 } 576 } finally { 577 readLock.unlock(); 578 } 579 } 580 581 /** 582 * Adjust the blocking mode while holding the readLock or writeLock. 583 */ 584 private void lockedConfigureBlocking(boolean block) throws IOException { 585 assert readLock.isHeldByCurrentThread() || writeLock.isHeldByCurrentThread(); 586 synchronized (stateLock) { 587 ensureOpen(); 588 IOUtil.configureBlocking(fd, block); 589 } 590 } 591 592 /** 593 * Returns the local address, or null if not bound 594 */ 595 InetSocketAddress localAddress() { 596 synchronized (stateLock) { 597 return localAddress instanceof InetSocketAddress ? (InetSocketAddress) localAddress : null; 598 } 599 } 600 601 /** 602 * Returns the remote address, or null if not connected 603 */ 604 InetSocketAddress remoteAddress() { 605 synchronized (stateLock) { 606 return remoteAddress instanceof InetSocketAddress ? (InetSocketAddress) remoteAddress : null; 607 } 608 } 609 610 @Override 611 public SocketChannel bind(SocketAddress local) throws IOException { 612 readLock.lock(); 613 try { 614 writeLock.lock(); 615 try { 616 synchronized (stateLock) { 617 ensureOpen(); 618 if (state == ST_CONNECTIONPENDING) 619 throw new ConnectionPendingException(); 620 if (localAddress != null) 621 throw new AlreadyBoundException(); 622 InetSocketAddress isa = (local == null) ? 623 new InetSocketAddress(0) : Net.checkAddress(local); 624 SecurityManager sm = System.getSecurityManager(); 625 if (sm != null) { 626 sm.checkListen(isa.getPort()); 627 } 628 NetHooks.beforeTcpBind(fd, isa.getAddress(), isa.getPort()); 629 Net.bind(fd, isa.getAddress(), isa.getPort()); 630 localAddress = Net.localAddress(fd); 631 } 632 } finally { 633 writeLock.unlock(); 634 } 635 } finally { 636 readLock.unlock(); 637 } 638 return this; 639 } 640 641 @Override 642 public boolean isConnected() { 643 return (state == ST_CONNECTED); 644 } 645 646 @Override 647 public boolean isConnectionPending() { 648 return (state == ST_CONNECTIONPENDING); 649 } 650 651 /** 652 * Marks the beginning of a connect operation that might block. 653 * @param blocking true if configured blocking 654 * @param isa the remote address 655 * @throws ClosedChannelException if the channel is closed 656 * @throws AlreadyConnectedException if already connected 657 * @throws ConnectionPendingException is a connection is pending 658 * @throws IOException if the pre-connect hook fails 659 */ 660 private void beginConnect(boolean blocking, InetSocketAddress isa) 661 throws IOException 662 { 663 if (blocking) { 664 // set hook for Thread.interrupt 665 begin(); 666 } 667 synchronized (stateLock) { 668 ensureOpen(); 669 int state = this.state; 670 if (state == ST_CONNECTED) 671 throw new AlreadyConnectedException(); 672 if (state == ST_CONNECTIONPENDING) 673 throw new ConnectionPendingException(); 674 assert state == ST_UNCONNECTED; 675 this.state = ST_CONNECTIONPENDING; 676 677 if (localAddress == null) 678 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort()); 679 remoteAddress = isa; 680 681 if (blocking) { 682 // record thread so it can be signalled if needed 683 readerThread = NativeThread.current(); 684 } 685 } 686 } 687 688 /** 689 * TODO: refactoring, combine with InetSocketAddress method? 690 * 691 * @param blocking 692 * @param usa 693 * @throws IOException 694 */ 695 private void beginConnect(boolean blocking, UnixSocketAddress usa) 696 throws IOException 697 { 698 if (blocking) { 699 // set hook for Thread.interrupt 700 begin(); 701 } 702 synchronized (stateLock) { 703 ensureOpen(); 704 int state = this.state; 705 if (state == ST_CONNECTED) 706 throw new AlreadyConnectedException(); 707 if (state == ST_CONNECTIONPENDING) 708 throw new ConnectionPendingException(); 709 assert state == ST_UNCONNECTED; 710 this.state = ST_CONNECTIONPENDING; 711 712 remoteAddress = usa; 713 714 if (blocking) { 715 // record thread so it can be signalled if needed 716 readerThread = NativeThread.current(); 717 } 718 } 719 } 720 721 /** 722 * Marks the end of a connect operation that may have blocked. 723 * 724 * @throws AsynchronousCloseException if the channel was closed due to this 725 * thread being interrupted on a blocking connect operation. 726 * @throws IOException if completed and unable to obtain the local address 727 */ 728 private void endConnect(boolean blocking, boolean completed) 729 throws IOException 730 { 731 endRead(blocking, completed); 732 733 if (completed) { 734 synchronized (stateLock) { 735 if (state == ST_CONNECTIONPENDING) { 736 localAddress = Net.localAddress(fd); 737 state = ST_CONNECTED; 738 } 739 } 740 } 741 } 742 743 /** 744 * Checks the remote address to which this channel is to be connected. 745 */ 746 private InetSocketAddress checkRemote(SocketAddress sa) throws IOException { 747 if (sa instanceof UnixSocketAddress) { 748 // TODO: SecurityManager, refactor null 749 return null; 750 } 751 752 InetSocketAddress isa = Net.checkAddress(sa); 753 SecurityManager sm = System.getSecurityManager(); 754 if (sm != null) { 755 sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort()); 756 } 757 if (isa.getAddress().isAnyLocalAddress()) { 758 return new InetSocketAddress(InetAddress.getLocalHost(), isa.getPort()); 759 } else { 760 return isa; 761 } 762 } 763 764 @Override 765 public boolean connect(SocketAddress remote) throws IOException { 766 InetSocketAddress isa = checkRemote(remote); 767 try { 768 readLock.lock(); 769 try { 770 writeLock.lock(); 771 try { 772 boolean blocking = isBlocking(); 773 boolean connected = false; 774 try { 775 int n; 776 if (remote instanceof UnixSocketAddress) { 777 UnixSocketAddress usa = (UnixSocketAddress)remote; 778 beginConnect(blocking, usa); 779 n = Net.connect(fd, usa); 780 } else { 781 beginConnect(blocking, isa); 782 n = Net.connect(fd, isa.getAddress(), isa.getPort()); 783 } 784 if (n > 0) { 785 connected = true; 786 } else if (blocking) { 787 assert IOStatus.okayToRetry(n); 788 boolean polled = false; 789 while (!polled && isOpen()) { 790 park(Net.POLLOUT); 791 polled = Net.pollConnectNow(fd); 792 } 793 connected = polled && isOpen(); 794 } 795 } finally { 796 endConnect(blocking, connected); 797 } 798 return connected; 799 } finally { 800 writeLock.unlock(); 801 } 802 } finally { 803 readLock.unlock(); 804 } 805 } catch (IOException ioe) { 806 // connect failed, close the channel 807 close(); 808 throw SocketExceptions.of(ioe, remote); 809 } 810 } 811 812 /** 813 * Marks the beginning of a finishConnect operation that might block. 814 * 815 * @throws ClosedChannelException if the channel is closed 816 * @throws NoConnectionPendingException if no connection is pending 817 */ 818 private void beginFinishConnect(boolean blocking) throws ClosedChannelException { 819 if (blocking) { 820 // set hook for Thread.interrupt 821 begin(); 822 } 823 synchronized (stateLock) { 824 ensureOpen(); 825 if (state != ST_CONNECTIONPENDING) 826 throw new NoConnectionPendingException(); 827 if (blocking) { 828 // record thread so it can be signalled if needed 829 readerThread = NativeThread.current(); 830 } 831 } 832 } 833 834 /** 835 * Marks the end of a finishConnect operation that may have blocked. 836 * 837 * @throws AsynchronousCloseException if the channel was closed due to this 838 * thread being interrupted on a blocking connect operation. 839 * @throws IOException if completed and unable to obtain the local address 840 */ 841 private void endFinishConnect(boolean blocking, boolean completed) 842 throws IOException 843 { 844 endRead(blocking, completed); 845 846 if (completed) { 847 synchronized (stateLock) { 848 if (state == ST_CONNECTIONPENDING) { 849 localAddress = Net.localAddress(fd); 850 state = ST_CONNECTED; 851 } 852 } 853 } 854 } 855 856 @Override 857 public boolean finishConnect() throws IOException { 858 try { 859 readLock.lock(); 860 try { 861 writeLock.lock(); 862 try { 863 // no-op if already connected 864 if (isConnected()) 865 return true; 866 867 boolean blocking = isBlocking(); 868 boolean connected = false; 869 try { 870 beginFinishConnect(blocking); 871 boolean polled = Net.pollConnectNow(fd); 872 if (blocking) { 873 while (!polled && isOpen()) { 874 park(Net.POLLOUT); 875 polled = Net.pollConnectNow(fd); 876 } 877 } 878 connected = polled && isOpen(); 879 } finally { 880 endFinishConnect(blocking, connected); 881 } 882 assert (blocking && connected) ^ !blocking; 883 return connected; 884 } finally { 885 writeLock.unlock(); 886 } 887 } finally { 888 readLock.unlock(); 889 } 890 } catch (IOException ioe) { 891 // connect failed, close the channel 892 close(); 893 throw SocketExceptions.of(ioe, remoteAddress); 894 } 895 } 896 897 /** 898 * Closes the socket if there are no I/O operations in progress and the 899 * channel is not registered with a Selector. 900 */ 901 private boolean tryClose() throws IOException { 902 assert Thread.holdsLock(stateLock) && state == ST_CLOSING; 903 if ((readerThread == 0) && (writerThread == 0) && !isRegistered()) { 904 state = ST_CLOSED; 905 nd.close(fd); 906 return true; 907 } else { 908 return false; 909 } 910 } 911 912 /** 913 * Invokes tryClose to attempt to close the socket. 914 * 915 * This method is used for deferred closing by I/O and Selector operations. 916 */ 917 private void tryFinishClose() { 918 try { 919 tryClose(); 920 } catch (IOException ignore) { } 921 } 922 923 /** 924 * Closes this channel when configured in blocking mode. 925 * 926 * If there is an I/O operation in progress then the socket is pre-closed 927 * and the I/O threads signalled, in which case the final close is deferred 928 * until all I/O operations complete. 929 * 930 * Note that a channel configured blocking may be registered with a Selector 931 * This arises when a key is canceled and the channel configured to blocking 932 * mode before the key is flushed from the Selector. 933 */ 934 private void implCloseBlockingMode() throws IOException { 935 synchronized (stateLock) { 936 assert state < ST_CLOSING; 937 state = ST_CLOSING; 938 if (!tryClose()) { 939 long reader = readerThread; 940 long writer = writerThread; 941 if (reader != 0 || writer != 0) { 942 nd.preClose(fd); 943 if (reader != 0) 944 NativeThread.signal(reader); 945 if (writer != 0) 946 NativeThread.signal(writer); 947 } 948 } 949 } 950 } 951 952 /** 953 * Closes this channel when configured in non-blocking mode. 954 * 955 * If the channel is registered with a Selector then the close is deferred 956 * until the channel is flushed from all Selectors. 957 * 958 * If the socket is connected and the channel is registered with a Selector 959 * then the socket is shutdown for writing so that the peer reads EOF. In 960 * addition, if SO_LINGER is set to a non-zero value then it is disabled so 961 * that the deferred close does not wait. 962 */ 963 private void implCloseNonBlockingMode() throws IOException { 964 boolean connected; 965 synchronized (stateLock) { 966 assert state < ST_CLOSING; 967 connected = (state == ST_CONNECTED); 968 state = ST_CLOSING; 969 } 970 971 // wait for any read/write operations to complete 972 readLock.lock(); 973 readLock.unlock(); 974 writeLock.lock(); 975 writeLock.unlock(); 976 977 // if the socket cannot be closed because it's registered with a Selector 978 // then shutdown the socket for writing. 979 synchronized (stateLock) { 980 if (state == ST_CLOSING && !tryClose() && connected && isRegistered()) { 981 try { 982 SocketOption<Integer> opt = StandardSocketOptions.SO_LINGER; 983 int interval = (int) Net.getSocketOption(fd, Net.UNSPEC, opt); 984 if (interval != 0) { 985 if (interval > 0) { 986 // disable SO_LINGER 987 Net.setSocketOption(fd, Net.UNSPEC, opt, -1); 988 } 989 Net.shutdown(fd, Net.SHUT_WR); 990 } 991 } catch (IOException ignore) { } 992 } 993 } 994 } 995 996 /** 997 * Invoked by implCloseChannel to close the channel. 998 */ 999 @Override 1000 protected void implCloseSelectableChannel() throws IOException { 1001 assert !isOpen(); 1002 if (isBlocking()) { 1003 implCloseBlockingMode(); 1004 } else { 1005 implCloseNonBlockingMode(); 1006 } 1007 } 1008 1009 @Override 1010 public void kill() { 1011 synchronized (stateLock) { 1012 if (state == ST_CLOSING) { 1013 tryFinishClose(); 1014 } 1015 } 1016 } 1017 1018 @Override 1019 public SocketChannel shutdownInput() throws IOException { 1020 synchronized (stateLock) { 1021 ensureOpen(); 1022 if (!isConnected()) 1023 throw new NotYetConnectedException(); 1024 if (!isInputClosed) { 1025 Net.shutdown(fd, Net.SHUT_RD); 1026 long thread = readerThread; 1027 if (thread != 0) 1028 NativeThread.signal(thread); 1029 isInputClosed = true; 1030 } 1031 return this; 1032 } 1033 } 1034 1035 @Override 1036 public SocketChannel shutdownOutput() throws IOException { 1037 synchronized (stateLock) { 1038 ensureOpen(); 1039 if (!isConnected()) 1040 throw new NotYetConnectedException(); 1041 if (!isOutputClosed) { 1042 Net.shutdown(fd, Net.SHUT_WR); 1043 long thread = writerThread; 1044 if (thread != 0) 1045 NativeThread.signal(thread); 1046 isOutputClosed = true; 1047 } 1048 return this; 1049 } 1050 } 1051 1052 boolean isInputOpen() { 1053 return !isInputClosed; 1054 } 1055 1056 boolean isOutputOpen() { 1057 return !isOutputClosed; 1058 } 1059 1060 /** 1061 * Waits for a connection attempt to finish with a timeout 1062 * @throws SocketTimeoutException if the connect timeout elapses 1063 */ 1064 private boolean finishTimedConnect(long nanos) throws IOException { 1065 long startNanos = System.nanoTime(); 1066 boolean polled = Net.pollConnectNow(fd); 1067 while (!polled && isOpen()) { 1068 long remainingNanos = nanos - (System.nanoTime() - startNanos); 1069 if (remainingNanos <= 0) { 1070 throw new SocketTimeoutException("Connect timed out"); 1071 } 1072 park(Net.POLLOUT, remainingNanos); 1073 polled = Net.pollConnectNow(fd); 1074 } 1075 return polled && isOpen(); 1076 } 1077 1078 /** 1079 * Attempts to establish a connection to the given socket address with a 1080 * timeout. Closes the socket if connection cannot be established. 1081 * 1082 * @apiNote This method is for use by the socket adaptor. 1083 * 1084 * @throws IllegalBlockingModeException if the channel is non-blocking 1085 * @throws SocketTimeoutException if the read timeout elapses 1086 */ 1087 void blockingConnect(SocketAddress remote, long nanos) throws IOException { 1088 InetSocketAddress isa = checkRemote(remote); 1089 try { 1090 readLock.lock(); 1091 try { 1092 writeLock.lock(); 1093 try { 1094 if (!isBlocking()) 1095 throw new IllegalBlockingModeException(); 1096 boolean connected = false; 1097 try { 1098 if (remote instanceof UnixSocketAddress) { 1099 UnixSocketAddress usa = (UnixSocketAddress)remote; 1100 beginConnect(true, usa); 1101 } else { 1102 beginConnect(true, isa); 1103 } 1104 // change socket to non-blocking 1105 lockedConfigureBlocking(false); 1106 try { 1107 int n; 1108 if (remote instanceof UnixSocketAddress) { 1109 n = Net.connect(fd, (UnixSocketAddress) remote); // TODO: refactor null/remote/isa 1110 } else { 1111 n = Net.connect(fd, isa.getAddress(), isa.getPort()); 1112 } 1113 connected = (n > 0) ? true : finishTimedConnect(nanos); 1114 } finally { 1115 // restore socket to blocking mode 1116 lockedConfigureBlocking(true); 1117 } 1118 } finally { 1119 endConnect(true, connected); 1120 } 1121 } finally { 1122 writeLock.unlock(); 1123 } 1124 } finally { 1125 readLock.unlock(); 1126 } 1127 } catch (IOException ioe) { 1128 // connect failed, close the channel 1129 close(); 1130 throw SocketExceptions.of(ioe, isa); 1131 } 1132 } 1133 1134 /** 1135 * Attempts to read bytes from the socket into the given byte array. 1136 */ 1137 private int tryRead(byte[] b, int off, int len) throws IOException { 1138 ByteBuffer dst = Util.getTemporaryDirectBuffer(len); 1139 assert dst.position() == 0; 1140 try { 1141 int n = nd.read(fd, ((DirectBuffer)dst).address(), len); 1142 if (n > 0) { 1143 dst.get(b, off, n); 1144 } 1145 return n; 1146 } finally{ 1147 Util.offerFirstTemporaryDirectBuffer(dst); 1148 } 1149 } 1150 1151 /** 1152 * Reads bytes from the socket into the given byte array with a timeout. 1153 * @throws SocketTimeoutException if the read timeout elapses 1154 */ 1155 private int timedRead(byte[] b, int off, int len, long nanos) throws IOException { 1156 long startNanos = System.nanoTime(); 1157 int n = tryRead(b, off, len); 1158 while (n == IOStatus.UNAVAILABLE && isOpen()) { 1159 long remainingNanos = nanos - (System.nanoTime() - startNanos); 1160 if (remainingNanos <= 0) { 1161 throw new SocketTimeoutException("Read timed out"); 1162 } 1163 park(Net.POLLIN, remainingNanos); 1164 n = tryRead(b, off, len); 1165 } 1166 return n; 1167 } 1168 1169 /** 1170 * Reads bytes from the socket into the given byte array. 1171 * 1172 * @apiNote This method is for use by the socket adaptor. 1173 * 1174 * @throws IllegalBlockingModeException if the channel is non-blocking 1175 * @throws SocketTimeoutException if the read timeout elapses 1176 */ 1177 int blockingRead(byte[] b, int off, int len, long nanos) throws IOException { 1178 Objects.checkFromIndexSize(off, len, b.length); 1179 if (len == 0) { 1180 // nothing to do 1181 return 0; 1182 } 1183 1184 readLock.lock(); 1185 try { 1186 // check that channel is configured blocking 1187 if (!isBlocking()) 1188 throw new IllegalBlockingModeException(); 1189 1190 int n = 0; 1191 try { 1192 beginRead(true); 1193 1194 // check if connection has been reset 1195 if (connectionReset) 1196 throwConnectionReset(); 1197 1198 // check if input is shutdown 1199 if (isInputClosed) 1200 return IOStatus.EOF; 1201 1202 if (nanos > 0) { 1203 // change socket to non-blocking 1204 lockedConfigureBlocking(false); 1205 try { 1206 n = timedRead(b, off, len, nanos); 1207 } finally { 1208 // restore socket to blocking mode 1209 lockedConfigureBlocking(true); 1210 } 1211 } else { 1212 // read, no timeout 1213 n = tryRead(b, off, len); 1214 while (IOStatus.okayToRetry(n) && isOpen()) { 1215 park(Net.POLLIN); 1216 n = tryRead(b, off, len); 1217 } 1218 } 1219 } catch (ConnectionResetException e) { 1220 connectionReset = true; 1221 throwConnectionReset(); 1222 } finally { 1223 endRead(true, n > 0); 1224 if (n <= 0 && isInputClosed) 1225 return IOStatus.EOF; 1226 } 1227 assert n > 0 || n == -1; 1228 return n; 1229 } finally { 1230 readLock.unlock(); 1231 } 1232 } 1233 1234 /** 1235 * Attempts to write a sequence of bytes to the socket from the given 1236 * byte array. 1237 */ 1238 private int tryWrite(byte[] b, int off, int len) throws IOException { 1239 ByteBuffer src = Util.getTemporaryDirectBuffer(len); 1240 assert src.position() == 0; 1241 try { 1242 src.put(b, off, len); 1243 return nd.write(fd, ((DirectBuffer)src).address(), len); 1244 } finally { 1245 Util.offerFirstTemporaryDirectBuffer(src); 1246 } 1247 } 1248 1249 /** 1250 * Writes a sequence of bytes to the socket from the given byte array. 1251 * 1252 * @apiNote This method is for use by the socket adaptor. 1253 */ 1254 void blockingWriteFully(byte[] b, int off, int len) throws IOException { 1255 Objects.checkFromIndexSize(off, len, b.length); 1256 if (len == 0) { 1257 // nothing to do 1258 return; 1259 } 1260 1261 writeLock.lock(); 1262 try { 1263 // check that channel is configured blocking 1264 if (!isBlocking()) 1265 throw new IllegalBlockingModeException(); 1266 1267 // loop until all bytes have been written 1268 int pos = off; 1269 int end = off + len; 1270 beginWrite(true); 1271 try { 1272 while (pos < end && isOpen()) { 1273 int size = end - pos; 1274 int n = tryWrite(b, pos, size); 1275 while (IOStatus.okayToRetry(n) && isOpen()) { 1276 park(Net.POLLOUT); 1277 n = tryWrite(b, pos, size); 1278 } 1279 if (n > 0) { 1280 pos += n; 1281 } 1282 } 1283 } finally { 1284 endWrite(true, pos >= end); 1285 } 1286 } finally { 1287 writeLock.unlock(); 1288 } 1289 } 1290 1291 /** 1292 * Return the number of bytes in the socket input buffer. 1293 */ 1294 int available() throws IOException { 1295 synchronized (stateLock) { 1296 ensureOpenAndConnected(); 1297 if (isInputClosed) { 1298 return 0; 1299 } else { 1300 return Net.available(fd); 1301 } 1302 } 1303 } 1304 1305 /** 1306 * Translates native poll revent ops into a ready operation ops 1307 */ 1308 public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) { 1309 int intOps = ski.nioInterestOps(); 1310 int oldOps = ski.nioReadyOps(); 1311 int newOps = initialOps; 1312 1313 if ((ops & Net.POLLNVAL) != 0) { 1314 // This should only happen if this channel is pre-closed while a 1315 // selection operation is in progress 1316 // ## Throw an error if this channel has not been pre-closed 1317 return false; 1318 } 1319 1320 if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) { 1321 newOps = intOps; 1322 ski.nioReadyOps(newOps); 1323 return (newOps & ~oldOps) != 0; 1324 } 1325 1326 boolean connected = isConnected(); 1327 if (((ops & Net.POLLIN) != 0) && 1328 ((intOps & SelectionKey.OP_READ) != 0) && connected) 1329 newOps |= SelectionKey.OP_READ; 1330 1331 if (((ops & Net.POLLCONN) != 0) && 1332 ((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending()) 1333 newOps |= SelectionKey.OP_CONNECT; 1334 1335 if (((ops & Net.POLLOUT) != 0) && 1336 ((intOps & SelectionKey.OP_WRITE) != 0) && connected) 1337 newOps |= SelectionKey.OP_WRITE; 1338 1339 ski.nioReadyOps(newOps); 1340 return (newOps & ~oldOps) != 0; 1341 } 1342 1343 public boolean translateAndUpdateReadyOps(int ops, SelectionKeyImpl ski) { 1344 return translateReadyOps(ops, ski.nioReadyOps(), ski); 1345 } 1346 1347 public boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) { 1348 return translateReadyOps(ops, 0, ski); 1349 } 1350 1351 /** 1352 * Translates an interest operation set into a native poll event set 1353 */ 1354 public int translateInterestOps(int ops) { 1355 int newOps = 0; 1356 if ((ops & SelectionKey.OP_READ) != 0) 1357 newOps |= Net.POLLIN; 1358 if ((ops & SelectionKey.OP_WRITE) != 0) 1359 newOps |= Net.POLLOUT; 1360 if ((ops & SelectionKey.OP_CONNECT) != 0) 1361 newOps |= Net.POLLCONN; 1362 return newOps; 1363 } 1364 1365 public FileDescriptor getFD() { 1366 return fd; 1367 } 1368 1369 public int getFDVal() { 1370 return fdVal; 1371 } 1372 1373 @Override 1374 public String toString() { 1375 StringBuilder sb = new StringBuilder(); 1376 sb.append(this.getClass().getSuperclass().getName()); 1377 sb.append('['); 1378 if (!isOpen()) 1379 sb.append("closed"); 1380 else { 1381 synchronized (stateLock) { 1382 switch (state) { 1383 case ST_UNCONNECTED: 1384 sb.append("unconnected"); 1385 break; 1386 case ST_CONNECTIONPENDING: 1387 sb.append("connection-pending"); 1388 break; 1389 case ST_CONNECTED: 1390 sb.append("connected"); 1391 if (isInputClosed) 1392 sb.append(" ishut"); 1393 if (isOutputClosed) 1394 sb.append(" oshut"); 1395 break; 1396 } 1397 InetSocketAddress addr = localAddress(); 1398 if (addr != null) { 1399 sb.append(" local="); 1400 sb.append(Net.getRevealedLocalAddressAsString(addr)); 1401 } 1402 if (remoteAddress() != null) { 1403 sb.append(" remote="); 1404 sb.append(remoteAddress().toString()); 1405 } 1406 } 1407 } 1408 sb.append(']'); 1409 return sb.toString(); 1410 } 1411 }