2012-01-14 03:16:39 +01:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
package akka.zeromq
|
|
|
|
|
|
|
|
|
|
import org.scalatest.matchers.MustMatchers
|
|
|
|
|
import akka.testkit.{ TestProbe, DefaultTimeout, AkkaSpec }
|
|
|
|
|
import akka.util.duration._
|
2012-01-14 11:46:51 +01:00
|
|
|
import akka.actor.{ Cancellable, Actor, Props, ActorRef }
|
2012-01-14 03:16:39 +01:00
|
|
|
|
|
|
|
|
object ConcurrentSocketActorSpec {
|
|
|
|
|
val config = """
|
|
|
|
|
akka {
|
2012-01-30 16:12:11 +01:00
|
|
|
extensions = ["akka.zeromq.ZeroMQExtension"]
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
"""
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-14 12:13:46 +01:00
|
|
|
class ConcurrentSocketActorSpec
|
|
|
|
|
extends AkkaSpec(ConcurrentSocketActorSpec.config)
|
|
|
|
|
with MustMatchers
|
|
|
|
|
with DefaultTimeout {
|
2012-01-14 03:16:39 +01:00
|
|
|
|
2012-01-23 16:53:09 +01:00
|
|
|
val endpoint = "tcp://127.0.0.1:%s" format { val s = new java.net.ServerSocket(0); try s.getLocalPort finally s.close() }
|
2012-01-14 03:16:39 +01:00
|
|
|
|
|
|
|
|
def zmq = system.extension(ZeroMQExtension)
|
|
|
|
|
|
|
|
|
|
"ConcurrentSocketActor" should {
|
|
|
|
|
"support pub-sub connections" in {
|
|
|
|
|
checkZeroMQInstallation
|
|
|
|
|
val (publisherProbe, subscriberProbe) = (TestProbe(), TestProbe())
|
2012-01-19 00:26:52 +01:00
|
|
|
val context = Context()
|
2012-01-14 03:16:39 +01:00
|
|
|
val publisher = newPublisher(context, publisherProbe.ref)
|
|
|
|
|
val subscriber = newSubscriber(context, subscriberProbe.ref)
|
|
|
|
|
val msgGenerator = newMessageGenerator(publisher)
|
2012-01-14 12:13:46 +01:00
|
|
|
|
2012-01-14 03:16:39 +01:00
|
|
|
try {
|
|
|
|
|
subscriberProbe.expectMsg(Connecting)
|
|
|
|
|
val msgNumbers = subscriberProbe.receiveWhile(2 seconds) {
|
2012-01-14 11:46:51 +01:00
|
|
|
case msg: ZMQMessage ⇒ msg
|
2012-01-14 03:16:39 +01:00
|
|
|
}.map(_.firstFrameAsString.toInt)
|
|
|
|
|
msgNumbers.length must be > 0
|
|
|
|
|
msgNumbers must equal(for (i ← msgNumbers.head to msgNumbers.last) yield i)
|
|
|
|
|
} finally {
|
|
|
|
|
system stop msgGenerator
|
|
|
|
|
within(2 seconds) { awaitCond(msgGenerator.isTerminated) }
|
|
|
|
|
system stop publisher
|
2012-01-14 11:46:51 +01:00
|
|
|
system stop subscriber
|
2012-01-14 03:16:39 +01:00
|
|
|
subscriberProbe.receiveWhile(1 seconds) {
|
|
|
|
|
case msg ⇒ msg
|
|
|
|
|
}.last must equal(Closed)
|
|
|
|
|
context.term
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
"support zero-length message frames" in {
|
|
|
|
|
checkZeroMQInstallation
|
|
|
|
|
val publisherProbe = TestProbe()
|
2012-01-19 00:26:52 +01:00
|
|
|
val context = Context()
|
2012-01-14 03:16:39 +01:00
|
|
|
val publisher = newPublisher(context, publisherProbe.ref)
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
publisher ! ZMQMessage(Seq[Frame]())
|
|
|
|
|
} finally {
|
|
|
|
|
system stop publisher
|
|
|
|
|
publisherProbe.within(5 seconds) {
|
|
|
|
|
publisherProbe.expectMsg(Closed)
|
|
|
|
|
}
|
|
|
|
|
context.term
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
def newPublisher(context: Context, listener: ActorRef) = {
|
2012-01-19 09:50:59 +01:00
|
|
|
zmq.newSocket(SocketType.Pub, context, Listener(listener), Bind(endpoint))
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
def newSubscriber(context: Context, listener: ActorRef) = {
|
2012-01-19 09:50:59 +01:00
|
|
|
zmq.newSocket(SocketType.Sub, context, Listener(listener), Connect(endpoint), Subscribe(Seq.empty))
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
def newMessageGenerator(actorRef: ActorRef) = {
|
2012-01-19 00:26:52 +01:00
|
|
|
system.actorOf(Props(new MessageGeneratorActor(actorRef)))
|
2012-01-14 12:13:46 +01:00
|
|
|
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
def checkZeroMQInstallation = try {
|
|
|
|
|
zmq.version match {
|
|
|
|
|
case ZeroMQVersion(2, 1, _) ⇒ Unit
|
|
|
|
|
case version ⇒ invalidZeroMQVersion(version)
|
|
|
|
|
}
|
|
|
|
|
} catch {
|
|
|
|
|
case e: LinkageError ⇒ zeroMQNotInstalled
|
|
|
|
|
}
|
|
|
|
|
def invalidZeroMQVersion(version: ZeroMQVersion) {
|
|
|
|
|
info("WARNING: The tests are not run because invalid ZeroMQ version: %s. Version >= 2.1.x required.".format(version))
|
|
|
|
|
pending
|
|
|
|
|
}
|
|
|
|
|
def zeroMQNotInstalled {
|
|
|
|
|
info("WARNING: The tests are not run because ZeroMQ is not installed. Version >= 2.1.x required.")
|
|
|
|
|
pending
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
class MessageGeneratorActor(actorRef: ActorRef) extends Actor {
|
|
|
|
|
var messageNumber: Int = 0
|
|
|
|
|
|
2012-01-14 11:46:51 +01:00
|
|
|
private var genMessages: Cancellable = null
|
|
|
|
|
|
|
|
|
|
override def preStart() = {
|
2012-01-19 00:26:52 +01:00
|
|
|
genMessages = system.scheduler.schedule(100 millis, 10 millis, self, "genMessage")
|
2012-01-14 11:46:51 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def postStop() = {
|
|
|
|
|
if (genMessages != null && !genMessages.isCancelled) {
|
|
|
|
|
genMessages.cancel
|
|
|
|
|
genMessages = null
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-14 03:16:39 +01:00
|
|
|
protected def receive = {
|
|
|
|
|
case _ ⇒
|
|
|
|
|
val payload = "%s".format(messageNumber)
|
|
|
|
|
messageNumber = messageNumber + 1
|
|
|
|
|
actorRef ! ZMQMessage(payload.getBytes)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|