Merge pull request #1836 from akka/wip-3512-zeromq-test-patriknw

=zer #3512 Harden zeromq ConcurrentSocketActorSpec
This commit is contained in:
Patrik Nordwall 2013-11-14 05:24:58 -08:00
commit f35203ef1b

View file

@ -35,7 +35,12 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
pending
}
val endpoint = "tcp://127.0.0.1:%s" format { val s = new java.net.ServerSocket(0); try s.getLocalPort finally s.close() }
lazy val endpoints: Vector[String] = {
val sockets = Vector.fill(3)(new java.net.ServerSocket(0))
val endpoints = sockets.map(s s"tcp://127.0.0.1:${s.getLocalPort}")
sockets.foreach(_.close())
endpoints
}
// this must stay a def for checkZeroMQInstallation() to work correctly
def zmq = ZeroMQExtension(system)
@ -45,6 +50,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
checkZeroMQInstallation()
val subscriberProbe = TestProbe()
val context = Context()
val endpoint = endpoints(0)
val publisher = zmq.newSocket(SocketType.Pub, context, Bind(endpoint))
val subscriber = zmq.newSocket(SocketType.Sub, context, Listener(subscriberProbe.ref), Connect(endpoint), SubscribeAll)
import system.dispatcher
@ -67,14 +73,16 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
msgNumbers must equal(for (i msgNumbers.head to msgNumbers.last) yield i)
} finally {
msgGenerator.cancel()
system stop publisher
watch(subscriber)
system stop subscriber
subscriberProbe.receiveWhile(3 seconds) {
case msg msg
}.last must equal(Closed)
awaitCond(publisher.isTerminated)
awaitCond(subscriber.isTerminated)
context.term
expectTerminated(subscriber, 5.seconds)
watch(publisher)
system stop publisher
expectTerminated(publisher, 5.seconds)
context.term()
}
}
@ -83,6 +91,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
val requesterProbe = TestProbe()
val replierProbe = TestProbe()
val context = Context()
val endpoint = endpoints(1)
val requester = zmq.newSocket(SocketType.Req, context, Listener(requesterProbe.ref), Bind(endpoint))
val replier = zmq.newSocket(SocketType.Rep, context, Listener(replierProbe.ref), Connect(endpoint))
@ -96,12 +105,14 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
replier ! reply
requesterProbe.expectMsg(reply)
} finally {
system stop requester
watch(replier)
system stop replier
replierProbe.expectMsg(Closed)
awaitCond(requester.isTerminated)
awaitCond(replier.isTerminated)
context.term
expectTerminated(replier, 5.seconds)
watch(requester)
system stop requester
expectTerminated(requester, 5.seconds)
context.term()
}
}
@ -109,6 +120,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
checkZeroMQInstallation()
val pullerProbe = TestProbe()
val context = Context()
val endpoint = endpoints(2)
val pusher = zmq.newSocket(SocketType.Push, context, Bind(endpoint))
val puller = zmq.newSocket(SocketType.Pull, context, Listener(pullerProbe.ref), Connect(endpoint))
@ -119,12 +131,15 @@ class ConcurrentSocketActorSpec extends AkkaSpec {
pusher ! message
pullerProbe.expectMsg(message)
} finally {
system stop pusher
watch(puller)
system stop puller
pullerProbe.expectMsg(Closed)
awaitCond(pusher.isTerminated)
awaitCond(puller.isTerminated)
context.term
expectTerminated(puller, 5.seconds)
watch(pusher)
system stop pusher
expectTerminated(pusher, 5.seconds)
context.term()
}
}