fix checking for connection close in TcpConnectionSpec
The fix was to only rely on actually selecting not also on checking key.readyOps since that isn't necessarily reliable without selecting.
This commit is contained in:
parent
93fc9f1d6f
commit
e994267bf6
1 changed files with 27 additions and 10 deletions
|
|
@ -6,6 +6,8 @@ package akka.io
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
|
|
||||||
|
import org.scalatest.matchers.{ MatchResult, BeMatcher }
|
||||||
|
|
||||||
import java.nio.channels.{ Selector, SelectionKey, SocketChannel, ServerSocketChannel }
|
import java.nio.channels.{ Selector, SelectionKey, SocketChannel, ServerSocketChannel }
|
||||||
import java.nio.ByteBuffer
|
import java.nio.ByteBuffer
|
||||||
import java.nio.channels.spi.SelectorProvider
|
import java.nio.channels.spi.SelectorProvider
|
||||||
|
|
@ -191,7 +193,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
||||||
closeCommander.expectMsg(Closed)
|
closeCommander.expectMsg(Closed)
|
||||||
assertThisConnectionActorTerminated()
|
assertThisConnectionActorTerminated()
|
||||||
|
|
||||||
checkFor(serverSelectionKey, SelectionKey.OP_READ, 2000)
|
serverSelectionKey must be(selectedAs(SelectionKey.OP_READ, 2.seconds))
|
||||||
|
|
||||||
val buffer = ByteBuffer.allocate(1)
|
val buffer = ByteBuffer.allocate(1)
|
||||||
serverSideChannel.read(buffer) must be(-1)
|
serverSideChannel.read(buffer) must be(-1)
|
||||||
|
|
@ -239,7 +241,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
||||||
connectionHandler.expectNoMsg(100.millis) // not yet
|
connectionHandler.expectNoMsg(100.millis) // not yet
|
||||||
|
|
||||||
val buffer = ByteBuffer.allocate(1)
|
val buffer = ByteBuffer.allocate(1)
|
||||||
checkFor(serverSelectionKey, SelectionKey.OP_READ, 2000)
|
serverSelectionKey must be(selectedAs(SelectionKey.OP_READ, 2.seconds))
|
||||||
serverSideChannel.read(buffer) must be(-1)
|
serverSideChannel.read(buffer) must be(-1)
|
||||||
serverSideChannel.close()
|
serverSideChannel.close()
|
||||||
|
|
||||||
|
|
@ -377,15 +379,13 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
||||||
|
|
||||||
def checkFor(key: SelectionKey, interest: Int, millis: Int = 100): Boolean =
|
def checkFor(key: SelectionKey, interest: Int, millis: Int = 100): Boolean =
|
||||||
if (key.isValid) {
|
if (key.isValid) {
|
||||||
if ((key.readyOps() & interest) != 0) true
|
|
||||||
else {
|
|
||||||
key.interestOps(interest)
|
key.interestOps(interest)
|
||||||
|
nioSelector.selectedKeys().clear()
|
||||||
val ret = nioSelector.select(millis)
|
val ret = nioSelector.select(millis)
|
||||||
key.interestOps(0)
|
key.interestOps(0)
|
||||||
|
|
||||||
ret > 0 && nioSelector.selectedKeys().contains(key) && key.isValid &&
|
ret > 0 && nioSelector.selectedKeys().contains(key) && key.isValid &&
|
||||||
(key.readyOps() & interest) != 0
|
(key.readyOps() & interest) != 0
|
||||||
}
|
|
||||||
} else false
|
} else false
|
||||||
|
|
||||||
def openSelectorFor(channel: SocketChannel, interests: Int): (Selector, SelectionKey) = {
|
def openSelectorFor(channel: SocketChannel, interests: Int): (Selector, SelectionKey) = {
|
||||||
|
|
@ -452,6 +452,23 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms")
|
||||||
verifyActorTermination(connectionActor)
|
verifyActorTermination(connectionActor)
|
||||||
clientSideChannel must not be ('open)
|
clientSideChannel must not be ('open)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def selectedAs(interest: Int, duration: Duration): BeMatcher[SelectionKey] =
|
||||||
|
new BeMatcher[SelectionKey] {
|
||||||
|
def apply(key: SelectionKey) =
|
||||||
|
MatchResult(
|
||||||
|
checkFor(key, interest, duration.toMillis.toInt),
|
||||||
|
"%s key was not selected for %s after %s" format (key.attachment(), interestsDesc(interest), duration),
|
||||||
|
"%s key was selected for %s after %s" format (key.attachment(), interestsDesc(interest), duration))
|
||||||
|
}
|
||||||
|
|
||||||
|
val interestsNames = Seq(
|
||||||
|
SelectionKey.OP_ACCEPT -> "accepting",
|
||||||
|
SelectionKey.OP_CONNECT -> "connecting",
|
||||||
|
SelectionKey.OP_READ -> "reading",
|
||||||
|
SelectionKey.OP_WRITE -> "writing")
|
||||||
|
def interestsDesc(interests: Int): String =
|
||||||
|
interestsNames.filter(i ⇒ (i._1 & interests) != 0).map(_._2).mkString(", ")
|
||||||
}
|
}
|
||||||
def withUnacceptedConnection(
|
def withUnacceptedConnection(
|
||||||
setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ (),
|
setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ (),
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue