diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorCreationPerfSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorCreationPerfSpec.scala index 89e4205f29..15f78b9752 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorCreationPerfSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorCreationPerfSpec.scala @@ -7,7 +7,6 @@ import scala.language.postfixOps import akka.testkit.{ PerformanceTest, ImplicitSender, AkkaSpec } import scala.concurrent.duration._ -import akka.TestUtils import akka.testkit.metrics._ import org.scalatest.BeforeAndAfterAll import akka.testkit.metrics.HeapMemoryUsage @@ -128,8 +127,8 @@ class ActorCreationPerfSpec extends AkkaSpec("akka.actor.serialize-messages = of expectMsgPF(15 seconds, s"$scenarioName waiting for Waited") { case Waited ⇒ } driver ! PoisonPill - TestUtils.verifyActorTermination(driver, 15 seconds) - + watch(driver) + expectTerminated(driver, 15.seconds) gc() } @@ -153,7 +152,8 @@ class ActorCreationPerfSpec extends AkkaSpec("akka.actor.serialize-messages = of val after = mem.getHeapSnapshot driver ! PoisonPill - TestUtils.verifyActorTermination(driver, 15 seconds) + watch(driver) + expectTerminated(driver, 15.seconds) after diff before } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala index 441856dbec..424041bb66 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala @@ -7,7 +7,6 @@ package akka.actor import com.typesafe.config.ConfigFactory import akka.testkit._ import akka.dispatch._ -import akka.TestUtils.verifyActorTermination import scala.concurrent.duration.{ Duration, FiniteDuration } import akka.ConfigurationException import com.typesafe.config.Config diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index f9b89f8137..174b877e97 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -14,7 +14,6 @@ import java.lang.IllegalStateException import scala.concurrent.Promise import akka.pattern.ask import akka.serialization.JavaSerializer -import akka.TestUtils.verifyActorTermination object ActorRefSpec { @@ -324,7 +323,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { ref ! PoisonPill - verifyActorTermination(ref) + watch(ref) + expectTerminated(ref) JavaSerializer.currentSystem.withValue(sysImpl) { val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) @@ -403,7 +403,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { Await.result(ffive, timeout.duration) should ===("five") Await.result(fnull, timeout.duration) should ===("null") - verifyActorTermination(ref) + watch(ref) + expectTerminated(ref) } "restart when Kill:ed" in { diff --git a/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala b/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala index cc662b83c7..1609b928b6 100644 --- a/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/CapacityLimitSpec.scala @@ -5,7 +5,7 @@ package akka.io import akka.testkit.{ TestProbe, AkkaSpec } -import akka.TestUtils._ +import akka.testkit.SocketUtil._ import Tcp._ class CapacityLimitSpec extends AkkaSpec(""" diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala index 54b2f8ec96..dabf890aeb 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpConnectionSpec.scala @@ -23,7 +23,7 @@ import akka.io.Inet.SocketOption import akka.actor._ import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe } import akka.util.{ Helpers, ByteString } -import akka.TestUtils._ +import akka.testkit.SocketUtil._ import java.util.Random object TcpConnectionSpec { @@ -629,7 +629,8 @@ class TcpConnectionSpec extends AkkaSpec(""" selector.send(connectionActor, ChannelConnectable) userHandler.expectMsg(CommandFailed(Connect(UnboundAddress))) - verifyActorTermination(connectionActor) + watch(connectionActor) + expectTerminated(connectionActor) } finally sel.close() } } @@ -650,7 +651,8 @@ class TcpConnectionSpec extends AkkaSpec(""" run { connectionActor.toString should not be ("") userHandler.expectMsg(CommandFailed(Connect(UnboundAddress, timeout = Option(100.millis)))) - verifyActorTermination(connectionActor) + watch(connectionActor) + expectTerminated(connectionActor) } } @@ -661,7 +663,8 @@ class TcpConnectionSpec extends AkkaSpec(""" selector.send(connectionActor, ChannelConnectable) userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress])) - verifyActorTermination(connectionActor) + watch(connectionActor) + expectTerminated(connectionActor) } } @@ -670,7 +673,8 @@ class TcpConnectionSpec extends AkkaSpec(""" EventFilter[DeathPactException](occurrences = 1) intercept { userHandler.ref ! PoisonPill - verifyActorTermination(connectionActor) + watch(connectionActor) + expectTerminated(connectionActor) } } } @@ -835,7 +839,8 @@ class TcpConnectionSpec extends AkkaSpec(""" connectionProbe.expectMsgType[Tcp.Connected] val connectionActor = connectionProbe.sender() connectionActor ! PoisonPill - verifyActorTermination(connectionActor) + watch(connectionActor) + expectTerminated(connectionActor) an[IOException] should be thrownBy { socket.getInputStream.read() } } } @@ -1055,7 +1060,8 @@ class TcpConnectionSpec extends AkkaSpec(""" } def assertThisConnectionActorTerminated(): Unit = { - verifyActorTermination(connectionActor) + watch(connectionActor) + expectTerminated(connectionActor) clientSideChannel should not be ('open) } diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala index 0826dbbfd4..00a11dd749 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpec.scala @@ -7,7 +7,7 @@ package akka.io import akka.actor.{ ActorRef, PoisonPill } import akka.io.Tcp._ import akka.testkit.{ TestProbe, AkkaSpec } -import akka.TestUtils._ +import akka.testkit.SocketUtil._ import akka.util.ByteString import java.io.IOException import java.net.{ ServerSocket, InetSocketAddress } @@ -21,6 +21,11 @@ class TcpIntegrationSpec extends AkkaSpec(""" akka.actor.serialize-creators = on """) with TcpIntegrationSpecSupport with Timeouts { + def verifyActorTermination(actor: ActorRef): Unit = { + watch(actor) + expectTerminated(actor) + } + "The TCP transport implementation" should { "properly bind a test server" in new TestSetup diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala index c10c9dab42..684c8fc3cb 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala @@ -9,7 +9,7 @@ import scala.collection.immutable import akka.testkit.{ AkkaSpec, TestProbe } import akka.actor.ActorRef import akka.io.Inet.SocketOption -import akka.TestUtils._ +import akka.testkit.SocketUtil._ import Tcp._ trait TcpIntegrationSpecSupport { _: AkkaSpec ⇒ diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala index 62fcb4aee5..71bbc6af57 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpListenerSpec.scala @@ -12,7 +12,7 @@ import akka.actor._ import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec, EventFilter } import akka.io.TcpListener.{ RegisterIncoming, FailedRegisterIncoming } import akka.io.SelectionHandler._ -import akka.TestUtils +import akka.testkit.SocketUtil import Tcp._ class TcpListenerSpec extends AkkaSpec(""" @@ -135,7 +135,7 @@ class TcpListenerSpec extends AkkaSpec(""" val bindCommander = TestProbe() val parent = TestProbe() val selectorRouter = TestProbe() - val endpoint = TestUtils.temporaryServerAddress() + val endpoint = SocketUtil.temporaryServerAddress() var registerCallReceiver = TestProbe() var interestCallReceiver = TestProbe() diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala index 32597c79b6..ef00d60992 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala @@ -7,7 +7,7 @@ import java.net.InetSocketAddress import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec } import akka.util.ByteString import akka.actor.ActorRef -import akka.TestUtils._ +import akka.testkit.SocketUtil._ class UdpConnectedIntegrationSpec extends AkkaSpec(""" akka.loglevel = INFO diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala index b16ba600f8..1ab0d036bd 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpIntegrationSpec.scala @@ -10,7 +10,7 @@ import akka.util.ByteString import akka.actor.ActorRef import akka.io.Udp._ import akka.io.Inet._ -import akka.TestUtils._ +import akka.testkit.SocketUtil._ class UdpIntegrationSpec extends AkkaSpec(""" akka.loglevel = INFO diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala index 45fa70f454..47912880d3 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -8,7 +8,7 @@ import akka.actor._ import com.typesafe.config.ConfigFactory import akka.actor.RootActorPath import scala.concurrent.duration._ -import akka.TestUtils +import akka.testkit.SocketUtil import akka.event.Logging.Warning @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -48,7 +48,7 @@ akka { system.eventStream.subscribe(probe.ref, classOf[QuarantinedEvent]) val rarp = RARP(system).provider // pick an unused port - val port = TestUtils.temporaryServerAddress().getPort + val port = SocketUtil.temporaryServerAddress().getPort // simulate de-serialized ActorRef val ref = rarp.resolveActorRef(s"akka.tcp://OtherSystem@localhost:$port/user/foo/bar#1752527294") system.actorOf(Props(new Actor { @@ -88,7 +88,7 @@ akka { "quarantine systems after unsuccessful system message delivery if have not communicated before" in { // Synthesize an ActorRef to a remote system this one has never talked to before. // This forces ReliableDeliverySupervisor to start with unknown remote system UID. - val extinctPath = RootActorPath(Address("akka.tcp", "extinct-system", "localhost", TestUtils.temporaryServerAddress().getPort)) / "user" / "noone" + val extinctPath = RootActorPath(Address("akka.tcp", "extinct-system", "localhost", SocketUtil.temporaryServerAddress().getPort)) / "user" / "noone" val transport = RARP(system).provider.transport val extinctRef = new RemoteActorRef(transport, transport.localAddressForRemote(extinctPath.address), extinctPath, Nobody, props = None, deploy = None) diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 229bf2a120..1a732eb704 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -17,7 +17,7 @@ import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom -import akka.TestUtils.temporaryServerAddress +import akka.testkit.SocketUtil.temporaryServerAddress object RemotingSpec { diff --git a/akka-remote/src/test/scala/akka/remote/TypedActorRemoteDeploySpec.scala b/akka-remote/src/test/scala/akka/remote/TypedActorRemoteDeploySpec.scala index 14a6e537e7..057933cf45 100644 --- a/akka-remote/src/test/scala/akka/remote/TypedActorRemoteDeploySpec.scala +++ b/akka-remote/src/test/scala/akka/remote/TypedActorRemoteDeploySpec.scala @@ -9,7 +9,6 @@ import scala.concurrent.{ Await, Future } import TypedActorRemoteDeploySpec._ import akka.actor.{ Deploy, ActorSystem, TypedProps, TypedActor } import scala.concurrent.duration._ -import akka.TestUtils.verifyActorTermination object TypedActorRemoteDeploySpec { val conf = ConfigFactory.parseString(""" @@ -41,7 +40,8 @@ class TypedActorRemoteDeploySpec extends AkkaSpec(conf) { Await.result(f(echoService), 3.seconds) should ===(expected) val actor = ts.getActorRefFor(echoService) system.stop(actor) - verifyActorTermination(actor) + watch(actor) + expectTerminated(actor) } "Typed actors" must { diff --git a/akka-remote/src/test/scala/akka/remote/transport/netty/NettyTransportSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/netty/NettyTransportSpec.scala index ce3665afb8..d9a833d74c 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/netty/NettyTransportSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/netty/NettyTransportSpec.scala @@ -2,7 +2,7 @@ package akka.remote.transport.netty import java.net.{ InetAddress, InetSocketAddress } -import akka.TestUtils +import akka.testkit.SocketUtil import akka.actor.{ ActorSystem, Address, ExtendedActorSystem } import akka.remote.BoundAddressesExtension import com.typesafe.config.ConfigFactory @@ -54,7 +54,7 @@ class NettyTransportSpec extends WordSpec with Matchers with BindBehaviour { } "bind to a random port but remoting accepts from a specified port" in { - val address = TestUtils.temporaryServerAddress(InetAddress.getLocalHost.getHostAddress, udp = false) + val address = SocketUtil.temporaryServerAddress(InetAddress.getLocalHost.getHostAddress, udp = false) val bindConfig = ConfigFactory.parseString(s""" akka.remote.netty.tcp { @@ -71,7 +71,7 @@ class NettyTransportSpec extends WordSpec with Matchers with BindBehaviour { } "bind to a specified port and remoting accepts from a bound port" in { - val address = TestUtils.temporaryServerAddress(InetAddress.getLocalHost.getHostAddress, udp = false) + val address = SocketUtil.temporaryServerAddress(InetAddress.getLocalHost.getHostAddress, udp = false) val bindConfig = ConfigFactory.parseString(s""" akka.remote.netty.tcp { @@ -124,7 +124,7 @@ trait BindBehaviour { this: WordSpec with Matchers ⇒ def theOneWhoKnowsTheDifferenceBetweenBoundAndRemotingAddress(proto: String) = { s"bind to default $proto address" in { - val address = TestUtils.temporaryServerAddress(udp = proto == "udp") + val address = SocketUtil.temporaryServerAddress(udp = proto == "udp") val bindConfig = ConfigFactory.parseString(s""" akka.remote { @@ -144,8 +144,8 @@ trait BindBehaviour { this: WordSpec with Matchers ⇒ } s"bind to specified $proto address" in { - val address = TestUtils.temporaryServerAddress(address = "127.0.0.1", udp = proto == "udp") - val bindAddress = TestUtils.temporaryServerAddress(address = "127.0.1.1", udp = proto == "udp") + val address = SocketUtil.temporaryServerAddress(address = "127.0.0.1", udp = proto == "udp") + val bindAddress = SocketUtil.temporaryServerAddress(address = "127.0.1.1", udp = proto == "udp") val bindConfig = ConfigFactory.parseString(s""" akka.remote { diff --git a/akka-actor-tests/src/test/scala/akka/TestUtils.scala b/akka-testkit/src/main/scala/akka/testkit/SocketUtil.scala similarity index 64% rename from akka-actor-tests/src/test/scala/akka/TestUtils.scala rename to akka-testkit/src/main/scala/akka/testkit/SocketUtil.scala index 264e383978..115565f0a7 100644 --- a/akka-actor-tests/src/test/scala/akka/TestUtils.scala +++ b/akka-testkit/src/main/scala/akka/testkit/SocketUtil.scala @@ -1,22 +1,23 @@ /** * Copyright (C) 2009-2015 Typesafe Inc. */ - -package akka +package akka.testkit import scala.collection.immutable -import scala.concurrent.duration.Duration -import java.net.{ SocketAddress, InetSocketAddress } -import java.nio.channels.{ DatagramChannel, ServerSocketChannel } -import akka.actor.{ ActorSystem, ActorRef } -import akka.testkit.TestProbe +import java.net.InetSocketAddress +import java.net.SocketAddress +import java.nio.channels.DatagramChannel +import java.nio.channels.ServerSocketChannel -import language.reflectiveCalls +/** + * Utilities to get free socket address. + */ +object SocketUtil { -object TestUtils { + import scala.language.reflectiveCalls // Structural type needed since DatagramSocket and ServerSocket has no common ancestor apart from Object - type GeneralSocket = { + private type GeneralSocket = { def bind(sa: SocketAddress): Unit def close(): Unit def getLocalPort(): Int @@ -36,10 +37,4 @@ object TestUtils { } collect { case (socket, address) ⇒ socket.close(); address } } - def verifyActorTermination(actor: ActorRef, max: Duration = Duration.Undefined)(implicit system: ActorSystem): Unit = { - val watcher = TestProbe() - watcher.watch(actor) - watcher.expectTerminated(actor, max) - } - }