2013-01-15 18:08:45 +01:00
/* *
* Copyright ( C ) 2009 - 2012 Typesafe Inc . < http : //www.typesafe.com>
*/
package akka.io
import scala.annotation.tailrec
2013-01-21 17:05:44 +01:00
import java.nio.channels. { Selector , SelectionKey , SocketChannel , ServerSocketChannel }
2013-01-15 18:08:45 +01:00
import java.nio.ByteBuffer
import java.nio.channels.spi.SelectorProvider
import java.io.IOException
import java.net._
import scala.collection.immutable
import scala.concurrent.duration._
import scala.util.control.NonFatal
2013-01-21 14:45:19 +01:00
import akka.actor. { PoisonPill , ActorRef , Terminated }
2013-01-15 18:08:45 +01:00
import akka.testkit. { TestProbe , TestActorRef , AkkaSpec }
import akka.util.ByteString
2013-01-18 13:20:17 +01:00
import TestUtils._
2013-01-22 14:10:36 +01:00
import TcpSelector._
2013-01-15 18:08:45 +01:00
import Tcp._
class TcpConnectionSpec extends AkkaSpec ( "akka.io.tcp.register-timeout = 500ms" ) {
2013-01-22 13:48:36 +01:00
val serverAddress = temporaryServerAddress ( )
2013-01-15 18:08:45 +01:00
"An outgoing connection" must {
// common behavior
"set socket options before connecting" in withLocalServer ( ) { localServer ⇒
val userHandler = TestProbe ( )
val selector = TestProbe ( )
val connectionActor =
2013-01-16 16:59:55 +01:00
createConnectionActor ( options = Vector ( SO . ReuseAddress ( true ) ) ) ( selector . ref , userHandler . ref )
2013-01-15 18:08:45 +01:00
val clientChannel = connectionActor . underlyingActor . channel
clientChannel . socket . getReuseAddress must be ( true )
}
"set socket options after connecting" in withLocalServer ( ) { localServer ⇒
val userHandler = TestProbe ( )
val selector = TestProbe ( )
val connectionActor =
2013-01-16 16:59:55 +01:00
createConnectionActor ( options = Vector ( SO . KeepAlive ( true ) ) ) ( selector . ref , userHandler . ref )
2013-01-15 18:08:45 +01:00
val clientChannel = connectionActor . underlyingActor . channel
clientChannel . socket . getKeepAlive must be ( false ) // only set after connection is established
selector . send ( connectionActor , ChannelConnectable )
clientChannel . socket . getKeepAlive must be ( true )
}
2013-01-18 13:20:17 +01:00
"send incoming data to the connection handler" in withEstablishedConnection ( ) { setup ⇒
2013-01-15 18:08:45 +01:00
import setup._
serverSideChannel . write ( ByteBuffer . wrap ( "testdata" . getBytes ( "ASCII" ) ) )
2013-01-21 17:05:44 +01:00
2013-01-21 16:22:31 +01:00
expectReceivedString ( "testdata" )
2013-01-21 17:05:44 +01:00
2013-01-15 18:08:45 +01:00
// have two packets in flight before the selector notices
serverSideChannel . write ( ByteBuffer . wrap ( "testdata2" . getBytes ( "ASCII" ) ) )
serverSideChannel . write ( ByteBuffer . wrap ( "testdata3" . getBytes ( "ASCII" ) ) )
2013-01-21 16:22:31 +01:00
expectReceivedString ( "testdata2testdata3" )
2013-01-15 18:08:45 +01:00
}
"write data to network (and acknowledge)" in withEstablishedConnection ( ) { setup ⇒
import setup._
2013-01-17 14:45:50 +01:00
2013-01-15 18:08:45 +01:00
object Ack
2013-01-17 14:45:50 +01:00
val writer = TestProbe ( )
// directly acknowledge an empty write
writer . send ( connectionActor , Write ( ByteString . empty , Ack ) )
writer . expectMsg ( Ack )
2013-01-18 13:20:17 +01:00
// reply to write commander with Ack
val ackedWrite = Write ( ByteString ( "testdata" ) , Ack )
2013-01-15 18:08:45 +01:00
val buffer = ByteBuffer . allocate ( 100 )
serverSideChannel . read ( buffer ) must be ( 0 )
2013-01-18 13:20:17 +01:00
writer . send ( connectionActor , ackedWrite )
2013-01-17 14:45:50 +01:00
writer . expectMsg ( Ack )
2013-01-15 18:08:45 +01:00
serverSideChannel . read ( buffer ) must be ( 8 )
buffer . flip ( )
2013-01-18 13:20:17 +01:00
// not reply to write commander for writes without Ack
val unackedWrite = Write ( ByteString ( "morestuff!" ) )
buffer . clear ( )
serverSideChannel . read ( buffer ) must be ( 0 )
writer . send ( connectionActor , unackedWrite )
2013-01-21 14:45:19 +01:00
writer . expectNoMsg ( 500. millis )
2013-01-18 13:20:17 +01:00
serverSideChannel . read ( buffer ) must be ( 10 )
buffer . flip ( )
ByteString ( buffer ) . take ( 10 ) . decodeString ( "ASCII" ) must be ( "morestuff!" )
2013-01-15 18:08:45 +01:00
}
2013-01-22 17:32:46 +01:00
"write data after not acknowledged data" in withEstablishedConnection ( ) { setup ⇒
import setup._
object Ack
val writer = TestProbe ( )
writer . send ( connectionActor , Write ( ByteString ( 42. toByte ) ) )
writer . expectNoMsg ( 500. millis )
writer . send ( connectionActor , Write ( ByteString . empty , Ack ) )
writer . expectMsg ( Ack )
}
2013-01-15 18:08:45 +01:00
"stop writing in cases of backpressure and resume afterwards" in
withEstablishedConnection ( setSmallRcvBuffer ) { setup ⇒
import setup._
object Ack1
object Ack2
clientSideChannel . socket . setSendBufferSize ( 1024 )
2013-01-17 14:45:50 +01:00
val writer = TestProbe ( )
2013-01-15 18:08:45 +01:00
// producing backpressure by sending much more than currently fits into
// our send buffer
val firstWrite = writeCmd ( Ack1 )
// try to write the buffer but since the SO_SNDBUF is too small
// it will have to keep the rest of the piece and send it
// when possible
2013-01-17 14:45:50 +01:00
writer . send ( connectionActor , firstWrite )
2013-01-15 18:08:45 +01:00
selector . expectMsg ( WriteInterest )
// send another write which should fail immediately
// because we don't store more than one piece in flight
val secondWrite = writeCmd ( Ack2 )
2013-01-17 14:45:50 +01:00
writer . send ( connectionActor , secondWrite )
writer . expectMsg ( CommandFailed ( secondWrite ) )
// reject even empty writes
writer . send ( connectionActor , Write . Empty )
writer . expectMsg ( CommandFailed ( Write . Empty ) )
2013-01-15 18:08:45 +01:00
// there will be immediately more space in the send buffer because
// some data will have been sent by now, so we assume we can write
// again, but still it can't write everything
selector . send ( connectionActor , ChannelWritable )
// both buffers should now be filled so no more writing
// is possible
2013-01-17 14:45:50 +01:00
pullFromServerSide ( TestSize )
writer . expectMsg ( Ack1 )
2013-01-15 18:08:45 +01:00
}
"respect StopReading and ResumeReading" in withEstablishedConnection ( ) { setup ⇒
import setup._
connectionHandler . send ( connectionActor , StopReading )
// the selector interprets StopReading to deregister interest
// for reading
selector . expectMsg ( StopReading )
connectionHandler . send ( connectionActor , ResumeReading )
selector . expectMsg ( ReadInterest )
}
2013-01-18 13:20:17 +01:00
"close the connection and reply with `Closed` upon reception of a `Close` command" in withEstablishedConnection ( setSmallRcvBuffer ) { setup ⇒
2013-01-15 18:08:45 +01:00
import setup._
// we should test here that a pending write command is properly finished first
object Ack
// set an artificially small send buffer size so that the write is queued
// inside the connection actor
clientSideChannel . socket . setSendBufferSize ( 1024 )
// we send a write and a close command directly afterwards
connectionHandler . send ( connectionActor , writeCmd ( Ack ) )
2013-01-18 13:20:17 +01:00
val closeCommander = TestProbe ( )
closeCommander . send ( connectionActor , Close )
2013-01-15 18:08:45 +01:00
2013-01-17 14:45:50 +01:00
pullFromServerSide ( TestSize )
2013-01-15 18:08:45 +01:00
connectionHandler . expectMsg ( Ack )
connectionHandler . expectMsg ( Closed )
2013-01-18 13:20:17 +01:00
closeCommander . expectMsg ( Closed )
2013-01-17 14:45:50 +01:00
assertThisConnectionActorTerminated ( )
2013-01-15 18:08:45 +01:00
2013-01-22 11:35:15 +01:00
checkFor ( serverSelectionKey , SelectionKey . OP_READ , 2000 )
2013-01-21 18:26:56 +01:00
2013-01-15 18:08:45 +01:00
val buffer = ByteBuffer . allocate ( 1 )
serverSideChannel . read ( buffer ) must be ( - 1 )
}
2013-01-18 13:20:17 +01:00
"send only one `Closed` event to the handler, if the handler commanded the Close" in withEstablishedConnection ( ) { setup ⇒
import setup._
connectionHandler . send ( connectionActor , Close )
connectionHandler . expectMsg ( Closed )
2013-01-21 14:45:19 +01:00
connectionHandler . expectNoMsg ( 500. millis )
2013-01-18 13:20:17 +01:00
}
"abort the connection and reply with `Aborted` upong reception of an `Abort` command" in withEstablishedConnection ( ) { setup ⇒
2013-01-15 18:08:45 +01:00
import setup._
connectionHandler . send ( connectionActor , Abort )
connectionHandler . expectMsg ( Aborted )
assertThisConnectionActorTerminated ( )
val buffer = ByteBuffer . allocate ( 1 )
val thrown = evaluating { serverSideChannel . read ( buffer ) } must produce [ IOException ]
thrown . getMessage must be ( "Connection reset by peer" )
}
2013-01-18 13:20:17 +01:00
"close the connection and reply with `ConfirmedClosed` upong reception of an `ConfirmedClose` command" in withEstablishedConnection ( setSmallRcvBuffer ) { setup ⇒
2013-01-15 18:08:45 +01:00
import setup._
// we should test here that a pending write command is properly finished first
object Ack
// set an artificially small send buffer size so that the write is queued
// inside the connection actor
clientSideChannel . socket . setSendBufferSize ( 1024 )
// we send a write and a close command directly afterwards
connectionHandler . send ( connectionActor , writeCmd ( Ack ) )
connectionHandler . send ( connectionActor , ConfirmedClose )
connectionHandler . expectNoMsg ( 100. millis )
2013-01-17 14:45:50 +01:00
pullFromServerSide ( TestSize )
2013-01-15 18:08:45 +01:00
connectionHandler . expectMsg ( Ack )
selector . send ( connectionActor , ChannelReadable )
connectionHandler . expectNoMsg ( 100. millis ) // not yet
val buffer = ByteBuffer . allocate ( 1 )
2013-01-22 11:35:15 +01:00
checkFor ( serverSelectionKey , SelectionKey . OP_READ , 2000 )
2013-01-15 18:08:45 +01:00
serverSideChannel . read ( buffer ) must be ( - 1 )
serverSideChannel . close ( )
selector . send ( connectionActor , ChannelReadable )
connectionHandler . expectMsg ( ConfirmedClosed )
assertThisConnectionActorTerminated ( )
}
"report when peer closed the connection" in withEstablishedConnection ( ) { setup ⇒
import setup._
serverSideChannel . close ( )
selector . send ( connectionActor , ChannelReadable )
connectionHandler . expectMsg ( PeerClosed )
assertThisConnectionActorTerminated ( )
}
"report when peer aborted the connection" in withEstablishedConnection ( ) { setup ⇒
import setup._
abortClose ( serverSideChannel )
selector . send ( connectionActor , ChannelReadable )
2013-01-17 14:45:50 +01:00
connectionHandler . expectMsgType [ ErrorClose ] . cause must be ( "Connection reset by peer" )
2013-01-15 18:08:45 +01:00
// wait a while
connectionHandler . expectNoMsg ( 200. millis )
assertThisConnectionActorTerminated ( )
}
"report when peer closed the connection when trying to write" in withEstablishedConnection ( ) { setup ⇒
import setup._
2013-01-17 14:45:50 +01:00
val writer = TestProbe ( )
2013-01-15 18:08:45 +01:00
abortClose ( serverSideChannel )
2013-01-17 14:45:50 +01:00
writer . send ( connectionActor , Write ( ByteString ( "testdata" ) ) )
// bother writer and handler should get the message
writer . expectMsgType [ ErrorClose ]
connectionHandler . expectMsgType [ ErrorClose ]
2013-01-15 18:08:45 +01:00
assertThisConnectionActorTerminated ( )
}
// error conditions
2013-01-16 16:59:55 +01:00
"report failed connection attempt while not accepted" in withUnacceptedConnection ( ) { setup ⇒
import setup._
2013-01-15 18:08:45 +01:00
// close instead of accept
localServer . close ( )
2013-01-16 16:59:55 +01:00
2013-01-15 18:08:45 +01:00
selector . send ( connectionActor , ChannelConnectable )
2013-01-17 14:45:50 +01:00
userHandler . expectMsgType [ ErrorClose ] . cause must be ( "Connection reset by peer" )
2013-01-15 18:08:45 +01:00
2013-01-18 13:20:17 +01:00
verifyActorTermination ( connectionActor )
2013-01-15 18:08:45 +01:00
}
2013-01-22 13:48:36 +01:00
val UnboundAddress = temporaryServerAddress ( )
2013-01-16 16:59:55 +01:00
"report failed connection attempt when target is unreachable" in
2013-01-17 15:07:00 +01:00
withUnacceptedConnection ( connectionActorCons = createConnectionActor ( serverAddress = UnboundAddress ) ) { setup ⇒
2013-01-16 16:59:55 +01:00
import setup._
2013-01-15 18:08:45 +01:00
2013-01-16 16:59:55 +01:00
val sel = SelectorProvider . provider ( ) . openSelector ( )
val key = clientSideChannel . register ( sel , SelectionKey . OP_CONNECT | SelectionKey . OP_READ )
sel . select ( 200 )
key . isConnectable must be ( true )
selector . send ( connectionActor , ChannelConnectable )
2013-01-17 14:45:50 +01:00
userHandler . expectMsgType [ ErrorClose ] . cause must be ( "Connection refused" )
2013-01-16 16:59:55 +01:00
2013-01-18 13:20:17 +01:00
verifyActorTermination ( connectionActor )
2013-01-15 18:08:45 +01:00
}
2013-01-16 16:59:55 +01:00
"time out when Connected isn't answered with Register" in withUnacceptedConnection ( ) { setup ⇒
import setup._
2013-01-15 18:08:45 +01:00
localServer . accept ( )
selector . send ( connectionActor , ChannelConnectable )
userHandler . expectMsg ( Connected ( serverAddress , clientSideChannel . socket . getLocalSocketAddress . asInstanceOf [ InetSocketAddress ] ) )
2013-01-18 13:20:17 +01:00
verifyActorTermination ( connectionActor )
2013-01-15 18:08:45 +01:00
}
2013-01-16 16:59:55 +01:00
"close the connection when user handler dies while connecting" in withUnacceptedConnection ( ) { setup ⇒
import setup._
2013-01-21 14:45:19 +01:00
userHandler . ref ! PoisonPill
2013-01-16 16:59:55 +01:00
2013-01-18 13:20:17 +01:00
verifyActorTermination ( connectionActor )
2013-01-15 18:08:45 +01:00
}
"close the connection when connection handler dies while connected" in withEstablishedConnection ( ) { setup ⇒
import setup._
watch ( connectionHandler . ref )
watch ( connectionActor )
system . stop ( connectionHandler . ref )
expectMsgType [ Terminated ] . actor must be ( connectionHandler . ref )
expectMsgType [ Terminated ] . actor must be ( connectionActor )
}
}
def withLocalServer ( setServerSocketOptions : ServerSocketChannel ⇒ Unit = _ ⇒ ( ) ) ( body : ServerSocketChannel ⇒ Any ) : Unit = {
val localServer = ServerSocketChannel . open ( )
try {
setServerSocketOptions ( localServer )
localServer . socket . bind ( serverAddress )
localServer . configureBlocking ( false )
body ( localServer )
} finally localServer . close ( )
}
2013-01-16 16:59:55 +01:00
case class UnacceptedSetup (
localServer : ServerSocketChannel ,
2013-01-15 18:08:45 +01:00
userHandler : TestProbe ,
selector : TestProbe ,
connectionActor : TestActorRef [ TcpOutgoingConnection ] ,
2013-01-16 16:59:55 +01:00
clientSideChannel : SocketChannel )
case class RegisteredSetup (
unregisteredSetup : UnacceptedSetup ,
connectionHandler : TestProbe ,
2013-01-15 18:08:45 +01:00
serverSideChannel : SocketChannel ) {
2013-01-16 16:59:55 +01:00
def userHandler : TestProbe = unregisteredSetup . userHandler
def selector : TestProbe = unregisteredSetup . selector
def connectionActor : TestActorRef [ TcpOutgoingConnection ] = unregisteredSetup . connectionActor
def clientSideChannel : SocketChannel = unregisteredSetup . clientSideChannel
2013-01-22 11:35:15 +01:00
val nioSelector = SelectorProvider . provider ( ) . openSelector ( )
val clientSelectionKey = registerChannel ( clientSideChannel , "client" )
val serverSelectionKey = registerChannel ( serverSideChannel , "server" )
def registerChannel ( channel : SocketChannel , name : String ) : SelectionKey = {
val res = channel . register ( nioSelector , 0 )
res . attach ( name )
res
}
def checkFor ( key : SelectionKey , interest : Int , millis : Int = 100 ) : Boolean =
if ( key . isValid ) {
if ( ( key . readyOps ( ) & interest ) != 0 ) true
else {
key . interestOps ( interest )
val ret = nioSelector . select ( millis )
key . interestOps ( 0 )
ret > 0 && nioSelector . selectedKeys ( ) . contains ( key ) && key . isValid &&
( key . readyOps ( ) & interest ) != 0
}
} else false
def openSelectorFor ( channel : SocketChannel , interests : Int ) : ( Selector , SelectionKey ) = {
2013-01-21 17:05:44 +01:00
val sel = SelectorProvider . provider ( ) . openSelector ( )
2013-01-22 11:35:15 +01:00
val key = channel . register ( sel , interests )
2013-01-21 17:05:44 +01:00
( sel , key )
}
2013-01-15 18:08:45 +01:00
val buffer = ByteBuffer . allocate ( TestSize )
2013-01-21 17:05:44 +01:00
2013-01-21 17:26:52 +01:00
/* *
* Tries to simultaneously act on client and server side to read from the server
* all pending data from the client .
*/
2013-01-22 11:35:15 +01:00
@tailrec final def pullFromServerSide ( remaining : Int , remainingTries : Int = 1000 ) : Unit =
if ( remainingTries <= 0 )
throw new AssertionError ( "Pulling took too many loops" )
else if ( remaining > 0 ) {
2013-01-21 17:26:52 +01:00
if ( selector . msgAvailable ) {
selector . expectMsg ( WriteInterest )
2013-01-22 11:35:15 +01:00
clientSelectionKey . interestOps ( SelectionKey . OP_WRITE )
}
2013-01-21 17:26:52 +01:00
2013-01-22 11:35:15 +01:00
serverSelectionKey . interestOps ( SelectionKey . OP_READ )
nioSelector . select ( 10 )
if ( nioSelector . selectedKeys ( ) . contains ( clientSelectionKey ) ) {
clientSelectionKey . interestOps ( 0 )
selector . send ( connectionActor , ChannelWritable )
}
2013-01-21 17:26:52 +01:00
2013-01-22 11:35:15 +01:00
val read =
if ( nioSelector . selectedKeys ( ) . contains ( serverSelectionKey ) ) tryReading ( )
else 0
2013-01-22 11:41:40 +01:00
nioSelector . selectedKeys ( ) . clear ( )
2013-01-22 11:35:15 +01:00
pullFromServerSide ( remaining - read , remainingTries - 1 )
}
private def tryReading ( ) : Int = {
buffer . clear ( )
val read = serverSideChannel . read ( buffer )
2013-01-22 11:41:40 +01:00
if ( read == 0 )
throw new IllegalStateException ( "Made no progress" )
else if ( read == - 1 )
2013-01-22 11:35:15 +01:00
throw new IllegalStateException ( "Connection was closed unexpectedly with remaining bytes " + remaining )
else read
2013-01-21 17:26:52 +01:00
}
2013-01-15 18:08:45 +01:00
2013-01-21 16:22:31 +01:00
@tailrec final def expectReceivedString ( data : String ) : Unit = {
data . length must be > 0
selector . send ( connectionActor , ChannelReadable )
val gotReceived = connectionHandler . expectMsgType [ Received ]
val receivedString = gotReceived . data . decodeString ( "ASCII" )
data . startsWith ( receivedString ) must be ( true )
if ( receivedString . length < data . length )
expectReceivedString ( data . drop ( receivedString . length ) )
}
2013-01-15 18:08:45 +01:00
def assertThisConnectionActorTerminated ( ) : Unit = {
2013-01-18 13:20:17 +01:00
verifyActorTermination ( connectionActor )
2013-01-15 18:08:45 +01:00
clientSideChannel must not be ( 'open)
}
}
2013-01-16 16:59:55 +01:00
def withUnacceptedConnection (
setServerSocketOptions : ServerSocketChannel ⇒ Unit = _ ⇒ ( ) ,
connectionActorCons : ( ActorRef , ActorRef ) ⇒ TestActorRef [ TcpOutgoingConnection ] = createConnectionActor ( ) ) ( body : UnacceptedSetup ⇒ Any ) : Unit =
withLocalServer ( setServerSocketOptions ) { localServer ⇒
val userHandler = TestProbe ( )
val selector = TestProbe ( )
val connectionActor = connectionActorCons ( selector . ref , userHandler . ref )
val clientSideChannel = connectionActor . underlyingActor . channel
2013-01-15 18:08:45 +01:00
2013-01-16 16:59:55 +01:00
selector . expectMsg ( RegisterOutgoingConnection ( clientSideChannel ) )
body {
UnacceptedSetup (
localServer ,
userHandler ,
selector ,
connectionActor ,
clientSideChannel )
}
}
def withEstablishedConnection ( setServerSocketOptions : ServerSocketChannel ⇒ Unit = _ ⇒ ( ) ) ( body : RegisteredSetup ⇒ Any ) : Unit = withUnacceptedConnection ( setServerSocketOptions ) { unregisteredSetup ⇒
import unregisteredSetup._
2013-01-15 18:08:45 +01:00
localServer . configureBlocking ( true )
val serverSideChannel = localServer . accept ( )
2013-01-21 17:05:44 +01:00
serverSideChannel . configureBlocking ( false )
2013-01-15 18:08:45 +01:00
serverSideChannel must not be ( null )
selector . send ( connectionActor , ChannelConnectable )
userHandler . expectMsg ( Connected ( serverAddress , clientSideChannel . socket . getLocalSocketAddress . asInstanceOf [ InetSocketAddress ] ) )
2013-01-16 16:59:55 +01:00
val connectionHandler = TestProbe ( )
2013-01-15 18:08:45 +01:00
userHandler . send ( connectionActor , Register ( connectionHandler . ref ) )
selector . expectMsg ( ReadInterest )
body {
2013-01-16 16:59:55 +01:00
RegisteredSetup (
unregisteredSetup ,
2013-01-15 18:08:45 +01:00
connectionHandler ,
serverSideChannel )
}
}
val TestSize = 10000
def writeCmd ( ack : AnyRef ) =
Write ( ByteString ( Array . fill [ Byte ] ( TestSize ) ( 0 ) ) , ack )
def setSmallRcvBuffer ( channel : ServerSocketChannel ) : Unit =
channel . socket . setReceiveBufferSize ( 1024 )
def createConnectionActor (
serverAddress : InetSocketAddress = serverAddress ,
localAddress : Option [ InetSocketAddress ] = None ,
2013-01-16 16:59:55 +01:00
options : immutable.Seq [ Tcp . SocketOption ] = Nil ) (
2013-01-22 15:51:21 +01:00
_selector : ActorRef ,
2013-01-16 16:59:55 +01:00
commander : ActorRef ) : TestActorRef [ TcpOutgoingConnection ] = {
2013-01-15 18:08:45 +01:00
TestActorRef (
2013-01-22 15:51:21 +01:00
new TcpOutgoingConnection ( Tcp ( system ) , commander , serverAddress , localAddress , options ) {
2013-01-15 18:08:45 +01:00
override def postRestart ( reason : Throwable ) {
// ensure we never restart
context . stop ( self )
}
2013-01-22 15:51:21 +01:00
override def selector = _selector
2013-01-15 18:08:45 +01:00
} )
}
def abortClose ( channel : SocketChannel ) : Unit = {
try channel . socket . setSoLinger ( true , 0 ) // causes the following close() to send TCP RST
catch {
case NonFatal ( e ) ⇒
// setSoLinger can fail due to http://bugs.sun.com/view_bug.do?bug_id=6799574
// (also affected: OS/X Java 1.6.0_37)
log . debug ( "setSoLinger(true, 0) failed with {}" , e )
}
channel . close ( )
}
def abort ( channel : SocketChannel ) {
channel . socket . setSoLinger ( true , 0 )
channel . close ( )
}
}