Merge pull request #17147 from akka/wip-17080-move-socketutils-master-patriknw
=tes #17080 Move TestUtils to testkit.SocketUtil (for validation)
This commit is contained in:
commit
ea2219950d
15 changed files with 56 additions and 50 deletions
|
|
@ -7,7 +7,6 @@ import scala.language.postfixOps
|
|||
|
||||
import akka.testkit.{ PerformanceTest, ImplicitSender, AkkaSpec }
|
||||
import scala.concurrent.duration._
|
||||
import akka.TestUtils
|
||||
import akka.testkit.metrics._
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import akka.testkit.metrics.HeapMemoryUsage
|
||||
|
|
@ -128,8 +127,8 @@ class ActorCreationPerfSpec extends AkkaSpec("akka.actor.serialize-messages = of
|
|||
expectMsgPF(15 seconds, s"$scenarioName waiting for Waited") { case Waited ⇒ }
|
||||
|
||||
driver ! PoisonPill
|
||||
TestUtils.verifyActorTermination(driver, 15 seconds)
|
||||
|
||||
watch(driver)
|
||||
expectTerminated(driver, 15.seconds)
|
||||
gc()
|
||||
}
|
||||
|
||||
|
|
@ -153,7 +152,8 @@ class ActorCreationPerfSpec extends AkkaSpec("akka.actor.serialize-messages = of
|
|||
val after = mem.getHeapSnapshot
|
||||
|
||||
driver ! PoisonPill
|
||||
TestUtils.verifyActorTermination(driver, 15 seconds)
|
||||
watch(driver)
|
||||
expectTerminated(driver, 15.seconds)
|
||||
|
||||
after diff before
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,7 +7,6 @@ package akka.actor
|
|||
import com.typesafe.config.ConfigFactory
|
||||
import akka.testkit._
|
||||
import akka.dispatch._
|
||||
import akka.TestUtils.verifyActorTermination
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
import akka.ConfigurationException
|
||||
import com.typesafe.config.Config
|
||||
|
|
|
|||
|
|
@ -14,7 +14,6 @@ import java.lang.IllegalStateException
|
|||
import scala.concurrent.Promise
|
||||
import akka.pattern.ask
|
||||
import akka.serialization.JavaSerializer
|
||||
import akka.TestUtils.verifyActorTermination
|
||||
|
||||
object ActorRefSpec {
|
||||
|
||||
|
|
@ -324,7 +323,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
|
||||
ref ! PoisonPill
|
||||
|
||||
verifyActorTermination(ref)
|
||||
watch(ref)
|
||||
expectTerminated(ref)
|
||||
|
||||
JavaSerializer.currentSystem.withValue(sysImpl) {
|
||||
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
|
||||
|
|
@ -403,7 +403,8 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
|
|||
Await.result(ffive, timeout.duration) should ===("five")
|
||||
Await.result(fnull, timeout.duration) should ===("null")
|
||||
|
||||
verifyActorTermination(ref)
|
||||
watch(ref)
|
||||
expectTerminated(ref)
|
||||
}
|
||||
|
||||
"restart when Kill:ed" in {
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
package akka.io
|
||||
|
||||
import akka.testkit.{ TestProbe, AkkaSpec }
|
||||
import akka.TestUtils._
|
||||
import akka.testkit.SocketUtil._
|
||||
import Tcp._
|
||||
|
||||
class CapacityLimitSpec extends AkkaSpec("""
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ import akka.io.Inet.SocketOption
|
|||
import akka.actor._
|
||||
import akka.testkit.{ AkkaSpec, EventFilter, TestActorRef, TestProbe }
|
||||
import akka.util.{ Helpers, ByteString }
|
||||
import akka.TestUtils._
|
||||
import akka.testkit.SocketUtil._
|
||||
import java.util.Random
|
||||
|
||||
object TcpConnectionSpec {
|
||||
|
|
@ -629,7 +629,8 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
selector.send(connectionActor, ChannelConnectable)
|
||||
userHandler.expectMsg(CommandFailed(Connect(UnboundAddress)))
|
||||
|
||||
verifyActorTermination(connectionActor)
|
||||
watch(connectionActor)
|
||||
expectTerminated(connectionActor)
|
||||
} finally sel.close()
|
||||
}
|
||||
}
|
||||
|
|
@ -650,7 +651,8 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
run {
|
||||
connectionActor.toString should not be ("")
|
||||
userHandler.expectMsg(CommandFailed(Connect(UnboundAddress, timeout = Option(100.millis))))
|
||||
verifyActorTermination(connectionActor)
|
||||
watch(connectionActor)
|
||||
expectTerminated(connectionActor)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -661,7 +663,8 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
selector.send(connectionActor, ChannelConnectable)
|
||||
userHandler.expectMsg(Connected(serverAddress, clientSideChannel.socket.getLocalSocketAddress.asInstanceOf[InetSocketAddress]))
|
||||
|
||||
verifyActorTermination(connectionActor)
|
||||
watch(connectionActor)
|
||||
expectTerminated(connectionActor)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -670,7 +673,8 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
EventFilter[DeathPactException](occurrences = 1) intercept {
|
||||
userHandler.ref ! PoisonPill
|
||||
|
||||
verifyActorTermination(connectionActor)
|
||||
watch(connectionActor)
|
||||
expectTerminated(connectionActor)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -835,7 +839,8 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
connectionProbe.expectMsgType[Tcp.Connected]
|
||||
val connectionActor = connectionProbe.sender()
|
||||
connectionActor ! PoisonPill
|
||||
verifyActorTermination(connectionActor)
|
||||
watch(connectionActor)
|
||||
expectTerminated(connectionActor)
|
||||
an[IOException] should be thrownBy { socket.getInputStream.read() }
|
||||
}
|
||||
}
|
||||
|
|
@ -1055,7 +1060,8 @@ class TcpConnectionSpec extends AkkaSpec("""
|
|||
}
|
||||
|
||||
def assertThisConnectionActorTerminated(): Unit = {
|
||||
verifyActorTermination(connectionActor)
|
||||
watch(connectionActor)
|
||||
expectTerminated(connectionActor)
|
||||
clientSideChannel should not be ('open)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ package akka.io
|
|||
import akka.actor.{ ActorRef, PoisonPill }
|
||||
import akka.io.Tcp._
|
||||
import akka.testkit.{ TestProbe, AkkaSpec }
|
||||
import akka.TestUtils._
|
||||
import akka.testkit.SocketUtil._
|
||||
import akka.util.ByteString
|
||||
import java.io.IOException
|
||||
import java.net.{ ServerSocket, InetSocketAddress }
|
||||
|
|
@ -21,6 +21,11 @@ class TcpIntegrationSpec extends AkkaSpec("""
|
|||
akka.actor.serialize-creators = on
|
||||
""") with TcpIntegrationSpecSupport with Timeouts {
|
||||
|
||||
def verifyActorTermination(actor: ActorRef): Unit = {
|
||||
watch(actor)
|
||||
expectTerminated(actor)
|
||||
}
|
||||
|
||||
"The TCP transport implementation" should {
|
||||
|
||||
"properly bind a test server" in new TestSetup
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import scala.collection.immutable
|
|||
import akka.testkit.{ AkkaSpec, TestProbe }
|
||||
import akka.actor.ActorRef
|
||||
import akka.io.Inet.SocketOption
|
||||
import akka.TestUtils._
|
||||
import akka.testkit.SocketUtil._
|
||||
import Tcp._
|
||||
|
||||
trait TcpIntegrationSpecSupport { _: AkkaSpec ⇒
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import akka.actor._
|
|||
import akka.testkit.{ TestProbe, TestActorRef, AkkaSpec, EventFilter }
|
||||
import akka.io.TcpListener.{ RegisterIncoming, FailedRegisterIncoming }
|
||||
import akka.io.SelectionHandler._
|
||||
import akka.TestUtils
|
||||
import akka.testkit.SocketUtil
|
||||
import Tcp._
|
||||
|
||||
class TcpListenerSpec extends AkkaSpec("""
|
||||
|
|
@ -135,7 +135,7 @@ class TcpListenerSpec extends AkkaSpec("""
|
|||
val bindCommander = TestProbe()
|
||||
val parent = TestProbe()
|
||||
val selectorRouter = TestProbe()
|
||||
val endpoint = TestUtils.temporaryServerAddress()
|
||||
val endpoint = SocketUtil.temporaryServerAddress()
|
||||
|
||||
var registerCallReceiver = TestProbe()
|
||||
var interestCallReceiver = TestProbe()
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ import java.net.InetSocketAddress
|
|||
import akka.testkit.{ TestProbe, ImplicitSender, AkkaSpec }
|
||||
import akka.util.ByteString
|
||||
import akka.actor.ActorRef
|
||||
import akka.TestUtils._
|
||||
import akka.testkit.SocketUtil._
|
||||
|
||||
class UdpConnectedIntegrationSpec extends AkkaSpec("""
|
||||
akka.loglevel = INFO
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import akka.util.ByteString
|
|||
import akka.actor.ActorRef
|
||||
import akka.io.Udp._
|
||||
import akka.io.Inet._
|
||||
import akka.TestUtils._
|
||||
import akka.testkit.SocketUtil._
|
||||
|
||||
class UdpIntegrationSpec extends AkkaSpec("""
|
||||
akka.loglevel = INFO
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ import akka.actor._
|
|||
import com.typesafe.config.ConfigFactory
|
||||
import akka.actor.RootActorPath
|
||||
import scala.concurrent.duration._
|
||||
import akka.TestUtils
|
||||
import akka.testkit.SocketUtil
|
||||
import akka.event.Logging.Warning
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
|
|
@ -48,7 +48,7 @@ akka {
|
|||
system.eventStream.subscribe(probe.ref, classOf[QuarantinedEvent])
|
||||
val rarp = RARP(system).provider
|
||||
// pick an unused port
|
||||
val port = TestUtils.temporaryServerAddress().getPort
|
||||
val port = SocketUtil.temporaryServerAddress().getPort
|
||||
// simulate de-serialized ActorRef
|
||||
val ref = rarp.resolveActorRef(s"akka.tcp://OtherSystem@localhost:$port/user/foo/bar#1752527294")
|
||||
system.actorOf(Props(new Actor {
|
||||
|
|
@ -88,7 +88,7 @@ akka {
|
|||
"quarantine systems after unsuccessful system message delivery if have not communicated before" in {
|
||||
// Synthesize an ActorRef to a remote system this one has never talked to before.
|
||||
// This forces ReliableDeliverySupervisor to start with unknown remote system UID.
|
||||
val extinctPath = RootActorPath(Address("akka.tcp", "extinct-system", "localhost", TestUtils.temporaryServerAddress().getPort)) / "user" / "noone"
|
||||
val extinctPath = RootActorPath(Address("akka.tcp", "extinct-system", "localhost", SocketUtil.temporaryServerAddress().getPort)) / "user" / "noone"
|
||||
val transport = RARP(system).provider.transport
|
||||
val extinctRef = new RemoteActorRef(transport, transport.localAddressForRemote(extinctPath.address),
|
||||
extinctPath, Nobody, props = None, deploy = None)
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ import scala.concurrent.Await
|
|||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
import akka.TestUtils.temporaryServerAddress
|
||||
import akka.testkit.SocketUtil.temporaryServerAddress
|
||||
|
||||
object RemotingSpec {
|
||||
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ import scala.concurrent.{ Await, Future }
|
|||
import TypedActorRemoteDeploySpec._
|
||||
import akka.actor.{ Deploy, ActorSystem, TypedProps, TypedActor }
|
||||
import scala.concurrent.duration._
|
||||
import akka.TestUtils.verifyActorTermination
|
||||
|
||||
object TypedActorRemoteDeploySpec {
|
||||
val conf = ConfigFactory.parseString("""
|
||||
|
|
@ -41,7 +40,8 @@ class TypedActorRemoteDeploySpec extends AkkaSpec(conf) {
|
|||
Await.result(f(echoService), 3.seconds) should ===(expected)
|
||||
val actor = ts.getActorRefFor(echoService)
|
||||
system.stop(actor)
|
||||
verifyActorTermination(actor)
|
||||
watch(actor)
|
||||
expectTerminated(actor)
|
||||
}
|
||||
|
||||
"Typed actors" must {
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@ package akka.remote.transport.netty
|
|||
|
||||
import java.net.{ InetAddress, InetSocketAddress }
|
||||
|
||||
import akka.TestUtils
|
||||
import akka.testkit.SocketUtil
|
||||
import akka.actor.{ ActorSystem, Address, ExtendedActorSystem }
|
||||
import akka.remote.BoundAddressesExtension
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
|
@ -54,7 +54,7 @@ class NettyTransportSpec extends WordSpec with Matchers with BindBehaviour {
|
|||
}
|
||||
|
||||
"bind to a random port but remoting accepts from a specified port" in {
|
||||
val address = TestUtils.temporaryServerAddress(InetAddress.getLocalHost.getHostAddress, udp = false)
|
||||
val address = SocketUtil.temporaryServerAddress(InetAddress.getLocalHost.getHostAddress, udp = false)
|
||||
|
||||
val bindConfig = ConfigFactory.parseString(s"""
|
||||
akka.remote.netty.tcp {
|
||||
|
|
@ -71,7 +71,7 @@ class NettyTransportSpec extends WordSpec with Matchers with BindBehaviour {
|
|||
}
|
||||
|
||||
"bind to a specified port and remoting accepts from a bound port" in {
|
||||
val address = TestUtils.temporaryServerAddress(InetAddress.getLocalHost.getHostAddress, udp = false)
|
||||
val address = SocketUtil.temporaryServerAddress(InetAddress.getLocalHost.getHostAddress, udp = false)
|
||||
|
||||
val bindConfig = ConfigFactory.parseString(s"""
|
||||
akka.remote.netty.tcp {
|
||||
|
|
@ -124,7 +124,7 @@ trait BindBehaviour { this: WordSpec with Matchers ⇒
|
|||
|
||||
def theOneWhoKnowsTheDifferenceBetweenBoundAndRemotingAddress(proto: String) = {
|
||||
s"bind to default $proto address" in {
|
||||
val address = TestUtils.temporaryServerAddress(udp = proto == "udp")
|
||||
val address = SocketUtil.temporaryServerAddress(udp = proto == "udp")
|
||||
|
||||
val bindConfig = ConfigFactory.parseString(s"""
|
||||
akka.remote {
|
||||
|
|
@ -144,8 +144,8 @@ trait BindBehaviour { this: WordSpec with Matchers ⇒
|
|||
}
|
||||
|
||||
s"bind to specified $proto address" in {
|
||||
val address = TestUtils.temporaryServerAddress(address = "127.0.0.1", udp = proto == "udp")
|
||||
val bindAddress = TestUtils.temporaryServerAddress(address = "127.0.1.1", udp = proto == "udp")
|
||||
val address = SocketUtil.temporaryServerAddress(address = "127.0.0.1", udp = proto == "udp")
|
||||
val bindAddress = SocketUtil.temporaryServerAddress(address = "127.0.1.1", udp = proto == "udp")
|
||||
|
||||
val bindConfig = ConfigFactory.parseString(s"""
|
||||
akka.remote {
|
||||
|
|
|
|||
|
|
@ -1,22 +1,23 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka
|
||||
package akka.testkit
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration.Duration
|
||||
import java.net.{ SocketAddress, InetSocketAddress }
|
||||
import java.nio.channels.{ DatagramChannel, ServerSocketChannel }
|
||||
import akka.actor.{ ActorSystem, ActorRef }
|
||||
import akka.testkit.TestProbe
|
||||
import java.net.InetSocketAddress
|
||||
import java.net.SocketAddress
|
||||
import java.nio.channels.DatagramChannel
|
||||
import java.nio.channels.ServerSocketChannel
|
||||
|
||||
import language.reflectiveCalls
|
||||
/**
|
||||
* Utilities to get free socket address.
|
||||
*/
|
||||
object SocketUtil {
|
||||
|
||||
object TestUtils {
|
||||
import scala.language.reflectiveCalls
|
||||
|
||||
// Structural type needed since DatagramSocket and ServerSocket has no common ancestor apart from Object
|
||||
type GeneralSocket = {
|
||||
private type GeneralSocket = {
|
||||
def bind(sa: SocketAddress): Unit
|
||||
def close(): Unit
|
||||
def getLocalPort(): Int
|
||||
|
|
@ -36,10 +37,4 @@ object TestUtils {
|
|||
} collect { case (socket, address) ⇒ socket.close(); address }
|
||||
}
|
||||
|
||||
def verifyActorTermination(actor: ActorRef, max: Duration = Duration.Undefined)(implicit system: ActorSystem): Unit = {
|
||||
val watcher = TestProbe()
|
||||
watcher.watch(actor)
|
||||
watcher.expectTerminated(actor, max)
|
||||
}
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue