diff --git a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala index eab6b3503a..2dcc9a623a 100644 --- a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala +++ b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala @@ -35,7 +35,12 @@ class ConcurrentSocketActorSpec extends AkkaSpec { pending } - val endpoint = "tcp://127.0.0.1:%s" format { val s = new java.net.ServerSocket(0); try s.getLocalPort finally s.close() } + lazy val endpoints: Vector[String] = { + val sockets = Vector.fill(3)(new java.net.ServerSocket(0)) + val endpoints = sockets.map(s ⇒ s"tcp://127.0.0.1:${s.getLocalPort}") + sockets.foreach(_.close()) + endpoints + } // this must stay a def for checkZeroMQInstallation() to work correctly def zmq = ZeroMQExtension(system) @@ -45,6 +50,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec { checkZeroMQInstallation() val subscriberProbe = TestProbe() val context = Context() + val endpoint = endpoints(0) val publisher = zmq.newSocket(SocketType.Pub, context, Bind(endpoint)) val subscriber = zmq.newSocket(SocketType.Sub, context, Listener(subscriberProbe.ref), Connect(endpoint), SubscribeAll) import system.dispatcher @@ -67,14 +73,16 @@ class ConcurrentSocketActorSpec extends AkkaSpec { msgNumbers must equal(for (i ← msgNumbers.head to msgNumbers.last) yield i) } finally { msgGenerator.cancel() - system stop publisher + watch(subscriber) system stop subscriber subscriberProbe.receiveWhile(3 seconds) { case msg ⇒ msg }.last must equal(Closed) - awaitCond(publisher.isTerminated) - awaitCond(subscriber.isTerminated) - context.term + expectTerminated(subscriber, 5.seconds) + watch(publisher) + system stop publisher + expectTerminated(publisher, 5.seconds) + context.term() } } @@ -83,6 +91,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec { val requesterProbe = TestProbe() val replierProbe = TestProbe() val context = Context() + val endpoint = endpoints(1) val requester = zmq.newSocket(SocketType.Req, context, Listener(requesterProbe.ref), Bind(endpoint)) val replier = zmq.newSocket(SocketType.Rep, context, Listener(replierProbe.ref), Connect(endpoint)) @@ -96,12 +105,14 @@ class ConcurrentSocketActorSpec extends AkkaSpec { replier ! reply requesterProbe.expectMsg(reply) } finally { - system stop requester + watch(replier) system stop replier replierProbe.expectMsg(Closed) - awaitCond(requester.isTerminated) - awaitCond(replier.isTerminated) - context.term + expectTerminated(replier, 5.seconds) + watch(requester) + system stop requester + expectTerminated(requester, 5.seconds) + context.term() } } @@ -109,6 +120,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec { checkZeroMQInstallation() val pullerProbe = TestProbe() val context = Context() + val endpoint = endpoints(2) val pusher = zmq.newSocket(SocketType.Push, context, Bind(endpoint)) val puller = zmq.newSocket(SocketType.Pull, context, Listener(pullerProbe.ref), Connect(endpoint)) @@ -119,12 +131,15 @@ class ConcurrentSocketActorSpec extends AkkaSpec { pusher ! message pullerProbe.expectMsg(message) } finally { - system stop pusher + + watch(puller) system stop puller pullerProbe.expectMsg(Closed) - awaitCond(pusher.isTerminated) - awaitCond(puller.isTerminated) - context.term + expectTerminated(puller, 5.seconds) + watch(pusher) + system stop pusher + expectTerminated(pusher, 5.seconds) + context.term() } }