diff --git a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala index 8ca5d7cc70..3232a68caf 100644 --- a/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-io/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -6,6 +6,8 @@ package akka.io import scala.annotation.tailrec +import org.scalatest.matchers.{ MatchResult, BeMatcher } + import java.nio.channels.{ Selector, SelectionKey, SocketChannel, ServerSocketChannel } import java.nio.ByteBuffer import java.nio.channels.spi.SelectorProvider @@ -191,7 +193,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") closeCommander.expectMsg(Closed) assertThisConnectionActorTerminated() - checkFor(serverSelectionKey, SelectionKey.OP_READ, 2000) + serverSelectionKey must be(selectedAs(SelectionKey.OP_READ, 2.seconds)) val buffer = ByteBuffer.allocate(1) serverSideChannel.read(buffer) must be(-1) @@ -239,7 +241,7 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") connectionHandler.expectNoMsg(100.millis) // not yet val buffer = ByteBuffer.allocate(1) - checkFor(serverSelectionKey, SelectionKey.OP_READ, 2000) + serverSelectionKey must be(selectedAs(SelectionKey.OP_READ, 2.seconds)) serverSideChannel.read(buffer) must be(-1) serverSideChannel.close() @@ -377,15 +379,13 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") def checkFor(key: SelectionKey, interest: Int, millis: Int = 100): Boolean = if (key.isValid) { - if ((key.readyOps() & interest) != 0) true - else { - key.interestOps(interest) - val ret = nioSelector.select(millis) - key.interestOps(0) + key.interestOps(interest) + nioSelector.selectedKeys().clear() + val ret = nioSelector.select(millis) + key.interestOps(0) - ret > 0 && nioSelector.selectedKeys().contains(key) && key.isValid && - (key.readyOps() & interest) != 0 - } + ret > 0 && nioSelector.selectedKeys().contains(key) && key.isValid && + (key.readyOps() & interest) != 0 } else false def openSelectorFor(channel: SocketChannel, interests: Int): (Selector, SelectionKey) = { @@ -452,6 +452,23 @@ class TcpConnectionSpec extends AkkaSpec("akka.io.tcp.register-timeout = 500ms") verifyActorTermination(connectionActor) clientSideChannel must not be ('open) } + + def selectedAs(interest: Int, duration: Duration): BeMatcher[SelectionKey] = + new BeMatcher[SelectionKey] { + def apply(key: SelectionKey) = + MatchResult( + checkFor(key, interest, duration.toMillis.toInt), + "%s key was not selected for %s after %s" format (key.attachment(), interestsDesc(interest), duration), + "%s key was selected for %s after %s" format (key.attachment(), interestsDesc(interest), duration)) + } + + val interestsNames = Seq( + SelectionKey.OP_ACCEPT -> "accepting", + SelectionKey.OP_CONNECT -> "connecting", + SelectionKey.OP_READ -> "reading", + SelectionKey.OP_WRITE -> "writing") + def interestsDesc(interests: Int): String = + interestsNames.filter(i ⇒ (i._1 & interests) != 0).map(_._2).mkString(", ") } def withUnacceptedConnection( setServerSocketOptions: ServerSocketChannel ⇒ Unit = _ ⇒ (),