Adds initial version of zeromq support for akka 2.0

This commit is contained in:
Ivan Porto Carrero 2012-01-14 03:16:39 +01:00
parent fd68752fbe
commit a5c55fd017
10 changed files with 602 additions and 0 deletions

View file

@ -146,6 +146,12 @@ akka {
} }
} }
zeromq-dispatcher {
# A zeromq socket needs to be pinned to the thread that created it.
# Changing this value results in weird errors and race conditions within zeromq
type = "PinnedDispatcher"
}
default-dispatcher { default-dispatcher {
# Must be one of the following # Must be one of the following
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are of # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of

View file

@ -0,0 +1,207 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.zeromq
import org.zeromq.ZMQ.{ Socket, Poller }
import org.zeromq.{ ZMQ JZMQ }
import akka.actor._
import akka.dispatch.{ Promise, Dispatchers, Future }
private[zeromq] sealed trait PollLifeCycle
private[zeromq] case object NoResults extends PollLifeCycle
private[zeromq] case object Results extends PollLifeCycle
private[zeromq] case object Closing extends PollLifeCycle
private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Actor {
private val noBytes = Array[Byte]()
private val socket: Socket = params.context.socket(params.socketType)
private val poller: Poller = params.context.poller
override def receive: Receive = {
case Send(frames)
sendFrames(frames)
pollAndReceiveFrames()
case ZMQMessage(frames)
sendFrames(frames)
pollAndReceiveFrames()
case Connect(endpoint)
socket.connect(endpoint)
notifyListener(Connecting)
pollAndReceiveFrames()
case Bind(endpoint)
socket.bind(endpoint)
pollAndReceiveFrames()
case Subscribe(topic)
socket.subscribe(topic.toArray)
pollAndReceiveFrames()
case Unsubscribe(topic)
socket.unsubscribe(topic.toArray)
pollAndReceiveFrames()
case Linger(value)
socket.setLinger(value)
case Linger
sender ! socket.getLinger
case ReconnectIVL
sender ! socket.getReconnectIVL
case ReconnectIVL(value)
socket.setReconnectIVL(value)
case Backlog
sender ! socket.getBacklog
case Backlog(value)
socket.setBacklog(value)
case ReconnectIVLMax
sender ! socket.getReconnectIVLMax
case ReconnectIVLMax(value)
socket.setReconnectIVLMax(value)
case MaxMsgSize
sender ! socket.getMaxMsgSize
case MaxMsgSize(value)
socket.setMaxMsgSize(value)
case SndHWM
sender ! socket.getSndHWM
case SndHWM(value)
socket.setSndHWM(value)
case RcvHWM
sender ! socket.getRcvHWM
case RcvHWM(value)
socket.setRcvHWM(value)
case HWM(value)
socket.setHWM(value)
case Swap
sender ! socket.getSwap
case Swap(value)
socket.setSwap(value)
case Affinity
sender ! socket.getAffinity
case Affinity(value)
socket.setAffinity(value)
case Identity
sender ! socket.getIdentity
case Identity(value)
socket.setIdentity(value)
case Rate
sender ! socket.getRate
case Rate(value)
socket.setRate(value)
case RecoveryInterval
sender ! socket.getRecoveryInterval
case RecoveryInterval(value)
socket.setRecoveryInterval(value)
case MulticastLoop
sender ! socket.hasMulticastLoop
case MulticastLoop(value)
socket.setMulticastLoop(value)
case MulticastHops
sender ! socket.getMulticastHops
case MulticastHops(value)
socket.setMulticastHops(value)
case ReceiveTimeOut
sender ! socket.getReceiveTimeOut
case ReceiveTimeOut(value)
socket.setReceiveTimeOut(value)
case SendTimeOut
sender ! socket.getSendTimeOut
case SendTimeOut(value)
socket.setSendTimeOut(value)
case SendBufferSize
sender ! socket.getSendBufferSize
case SendBufferSize(value)
socket.setSendBufferSize(value)
case ReceiveBufferSize
sender ! socket.getReceiveBufferSize
case ReceiveBufferSize(value)
socket.setReceiveBufferSize(value)
case ReceiveMore
sender ! socket.hasReceiveMore
case FileDescriptor
sender ! socket.getFD
case 'poll {
currentPoll = None
pollAndReceiveFrames()
}
case 'receiveFrames {
receiveFrames() match {
case Seq()
case frames notifyListener(params.deserializer(frames))
}
self ! 'poll
}
}
override def preStart {
poller.register(socket, Poller.POLLIN)
}
override def postStop {
currentPoll foreach { _ complete Right(Closing) }
poller.unregister(socket)
socket.close
notifyListener(Closed)
}
private def sendFrames(frames: Seq[Frame]) {
def sendBytes(bytes: Seq[Byte], flags: Int) {
socket.send(bytes.toArray, flags)
}
val iter = frames.iterator
while (iter.hasNext) {
val payload = iter.next.payload
val flags = if (iter.hasNext) JZMQ.SNDMORE else 0
sendBytes(payload, flags)
}
}
private var currentPoll: Option[Promise[PollLifeCycle]] = None
private def pollAndReceiveFrames() {
currentPoll = currentPoll orElse Some(newEventLoop)
}
private def newEventLoop: Promise[PollLifeCycle] = {
implicit val executor = context.system.dispatchers.defaultGlobalDispatcher
(Future {
if (poller.poll(params.pollTimeoutDuration.toMillis) > 0 && poller.pollin(0)) Results else NoResults
}).asInstanceOf[Promise[PollLifeCycle]] onSuccess {
case Results if (!self.isTerminated) self ! 'receiveFrames
case NoResults if (!self.isTerminated) self ! 'poll
case _ currentPoll = None
} onFailure {
case ex {
if (context.system != null) {
context.system.log.error(ex, "There was an error receiving messages on the zeromq socket")
}
if (!self.isTerminated) self ! 'poll
}
}
}
private def receiveFrames(): Seq[Frame] = {
@inline def receiveBytes(): Array[Byte] = socket.recv(0) match {
case null noBytes
case bytes: Array[Byte] if bytes.length > 0 bytes
case _ noBytes
}
receiveBytes() match {
case `noBytes` Vector.empty
case someBytes
var frames = Vector(Frame(someBytes))
while (socket.hasReceiveMore) receiveBytes() match {
case `noBytes`
case someBytes frames :+= Frame(someBytes)
}
frames
}
}
private def notifyListener(message: Any) {
params.listener.foreach { listener
if (listener.isTerminated)
context stop self
else
listener ! message
}
}
}

View file

@ -0,0 +1,20 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.zeromq
import org.zeromq.{ ZMQ JZMQ }
import akka.zeromq.SocketType._
class Context(numIoThreads: Int) {
private var context = JZMQ.context(numIoThreads)
def socket(socketType: SocketType) = {
context.socket(socketType.id)
}
def poller = {
context.poller
}
def term = {
context.term
}
}

View file

@ -0,0 +1,15 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.zeromq
case class Frame(payload: Seq[Byte])
object Frame { def apply(s: String): Frame = Frame(s.getBytes) }
trait Deserializer {
def apply(frames: Seq[Frame]): Any
}
class ZMQMessageDeserializer extends Deserializer {
def apply(frames: Seq[Frame]) = ZMQMessage(frames)
}

View file

@ -0,0 +1,104 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.zeromq
import com.google.protobuf.Message
sealed trait Request
sealed trait SocketOption extends Request
sealed trait SocketOptionQuery extends Request
case class Connect(endpoint: String) extends Request
case class Bind(endpoint: String) extends Request
private[zeromq] case object Close extends Request
case class Subscribe(payload: Seq[Byte]) extends Request
object Subscribe {
def apply(topic: String): Subscribe = {
Subscribe(topic.getBytes)
}
}
case class Unsubscribe(payload: Seq[Byte]) extends Request
object Unsubscribe {
def apply(topic: String): Unsubscribe = {
Unsubscribe(topic.getBytes)
}
}
case class Send(frames: Seq[Frame]) extends Request
case class ZMQMessage(frames: Seq[Frame]) {
def firstFrameAsString = {
new String(frames.head.payload.toArray)
}
}
object ZMQMessage {
def apply(bytes: Array[Byte]): ZMQMessage = {
ZMQMessage(Seq(Frame(bytes)))
}
def apply(message: Message): ZMQMessage = {
ZMQMessage(message.toByteArray)
}
}
case class Linger(value: Long) extends SocketOption
object Linger extends SocketOptionQuery
case class ReconnectIVL(value: Long) extends SocketOption
object ReconnectIVL extends SocketOptionQuery
case class Backlog(value: Long) extends SocketOption
object Backlog extends SocketOptionQuery
case class ReconnectIVLMax(value: Long) extends SocketOption
object ReconnectIVLMax extends SocketOptionQuery
case class MaxMsgSize(value: Long) extends SocketOption
object MaxMsgSize extends SocketOptionQuery
case class SndHWM(value: Long) extends SocketOption
object SndHWM extends SocketOptionQuery
case class RcvHWM(value: Long) extends SocketOption
object RcvHWM extends SocketOptionQuery
case class HWM(value: Long) extends SocketOption
/* object HWM extends SocketOptionQuery */
case class Swap(value: Long) extends SocketOption
object Swap extends SocketOptionQuery
case class Affinity(value: Long) extends SocketOption
object Affinity extends SocketOptionQuery
case class Identity(value: Array[Byte]) extends SocketOption
object Identity extends SocketOptionQuery
case class Rate(value: Long) extends SocketOption
object Rate extends SocketOptionQuery
case class RecoveryInterval(value: Long) extends SocketOption
object RecoveryInterval extends SocketOptionQuery
case class MulticastLoop(value: Boolean) extends SocketOption
object MulticastLoop extends SocketOptionQuery
case class MulticastHops(value: Long) extends SocketOption
object MulticastHops extends SocketOptionQuery
case class ReceiveTimeOut(value: Long) extends SocketOption
object ReceiveTimeOut extends SocketOptionQuery
case class SendTimeOut(value: Long) extends SocketOption
object SendTimeOut extends SocketOptionQuery
case class SendBufferSize(value: Long) extends SocketOption
object SendBufferSize extends SocketOptionQuery
case class ReceiveBufferSize(value: Long) extends SocketOption
object ReceiveBufferSize extends SocketOptionQuery
object ReceiveMore extends SocketOptionQuery
object FileDescriptor extends SocketOptionQuery

View file

@ -0,0 +1,8 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.zeromq
sealed trait Response
case object Connecting extends Response
case object Closed extends Response

View file

@ -0,0 +1,14 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.zeromq
import org.zeromq.{ ZMQ JZMQ }
object SocketType extends Enumeration {
type SocketType = Value
val Pub = Value(JZMQ.PUB)
val Sub = Value(JZMQ.SUB)
val Dealer = Value(JZMQ.DEALER)
val Router = Value(JZMQ.ROUTER)
}

View file

@ -0,0 +1,65 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.zeromq
import akka.util.Duration
import akka.util.duration._
import akka.zeromq.SocketType._
import org.zeromq.{ ZMQ JZMQ }
import akka.actor._
import akka.dispatch.Await
case class SocketParameters(
socketType: SocketType,
context: Context,
listener: Option[ActorRef] = None,
deserializer: Deserializer = new ZMQMessageDeserializer,
pollTimeoutDuration: Duration = 100 millis)
case class ZeroMQVersion(major: Int, minor: Int, patch: Int) {
override def toString = "%d.%d.%d".format(major, minor, patch)
}
object ZeroMQExtension extends ExtensionId[ZeroMQExtension] with ExtensionIdProvider {
def lookup() = this
def createExtension(system: ActorSystemImpl) = new ZeroMQExtension(system)
}
class ZeroMQExtension(system: ActorSystem) extends Extension {
def version = {
ZeroMQVersion(JZMQ.getMajorVersion, JZMQ.getMinorVersion, JZMQ.getPatchVersion)
}
lazy val DefaultContext = newContext()
def newContext(numIoThreads: Int = 1) = {
verifyZeroMQVersion
new Context(numIoThreads)
}
def newSocket(socketType: SocketType,
listener: Option[ActorRef] = None,
context: Context = DefaultContext, // For most applications you want to use the default context
deserializer: Deserializer = new ZMQMessageDeserializer,
pollTimeoutDuration: Duration = 100 millis) = {
verifyZeroMQVersion
val params = SocketParameters(socketType, context, listener, deserializer, pollTimeoutDuration)
implicit val timeout = system.settings.ActorTimeout
val req = (zeromq ? Props(new ConcurrentSocketActor(params)).withDispatcher("zmqdispatcher")).mapTo[ActorRef]
Await.result(req, timeout.duration)
}
val zeromq: ActorRef = {
verifyZeroMQVersion
system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor {
protected def receive = { case p: Props sender ! context.actorOf(p) }
}), "zeromq")
}
private def verifyZeroMQVersion = {
require(
JZMQ.getFullVersion > JZMQ.makeVersion(2, 1, 0),
"Unsupported ZeroMQ version: %s".format(JZMQ.getVersionString))
}
}

View file

@ -0,0 +1,149 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.zeromq
import org.scalatest.matchers.MustMatchers
import akka.testkit.{ TestProbe, DefaultTimeout, AkkaSpec }
import akka.actor.{ Actor, Props, ActorRef }
import akka.util.Timeout
import akka.util.duration._
import java.net.{ SocketException, ConnectException, Socket }
import util.Random
object ConcurrentSocketActorSpec {
val config = """
akka {
extensions = ["akka.zeromq.ZeroMQExtension$"]
actor {
zmqdispatcher {
type = "PinnedDispatcher"
}
}
}
"""
}
class ConcurrentSocketActorSpec extends AkkaSpec(ConcurrentSocketActorSpec.config) with MustMatchers with DefaultTimeout {
val endpoint = "tcp://127.0.0.1:%s" format FreePort.randomFreePort()
def zmq = system.extension(ZeroMQExtension)
"ConcurrentSocketActor" should {
"support pub-sub connections" in {
checkZeroMQInstallation
val (publisherProbe, subscriberProbe) = (TestProbe(), TestProbe())
val context = zmq.newContext()
val publisher = newPublisher(context, publisherProbe.ref)
val subscriber = newSubscriber(context, subscriberProbe.ref)
val msgGenerator = newMessageGenerator(publisher)
try {
subscriberProbe.expectMsg(Connecting)
val msgNumbers = subscriberProbe.receiveWhile(2 seconds) {
case msg: ZMQMessage {
println("RECV: " + msg.firstFrameAsString)
msg
}
}.map(_.firstFrameAsString.toInt)
msgNumbers.length must be > 0
msgNumbers must equal(for (i msgNumbers.head to msgNumbers.last) yield i)
} finally {
system stop msgGenerator
within(2 seconds) { awaitCond(msgGenerator.isTerminated) }
system stop subscriber
system stop publisher
subscriberProbe.receiveWhile(1 seconds) {
case msg msg
}.last must equal(Closed)
context.term
}
}
"support zero-length message frames" in {
checkZeroMQInstallation
val publisherProbe = TestProbe()
val context = zmq.newContext()
val publisher = newPublisher(context, publisherProbe.ref)
try {
publisher ! ZMQMessage(Seq[Frame]())
} finally {
system stop publisher
publisherProbe.within(5 seconds) {
publisherProbe.expectMsg(Closed)
}
context.term
}
}
def newPublisher(context: Context, listener: ActorRef) = {
val publisher = zmq.newSocket(SocketType.Pub, context = context, listener = Some(listener))
publisher ! Bind(endpoint)
publisher
}
def newSubscriber(context: Context, listener: ActorRef) = {
val subscriber = zmq.newSocket(SocketType.Sub, context = context, listener = Some(listener))
subscriber ! Connect(endpoint)
subscriber ! Subscribe(Seq())
subscriber
}
def newMessageGenerator(actorRef: ActorRef) = {
system.actorOf(Props(new MessageGeneratorActor(actorRef)).withTimeout(Timeout(10 millis)))
}
def checkZeroMQInstallation = try {
zmq.version match {
case ZeroMQVersion(2, 1, _) Unit
case version invalidZeroMQVersion(version)
}
} catch {
case e: LinkageError zeroMQNotInstalled
}
def invalidZeroMQVersion(version: ZeroMQVersion) {
info("WARNING: The tests are not run because invalid ZeroMQ version: %s. Version >= 2.1.x required.".format(version))
pending
}
def zeroMQNotInstalled {
info("WARNING: The tests are not run because ZeroMQ is not installed. Version >= 2.1.x required.")
pending
}
}
class MessageGeneratorActor(actorRef: ActorRef) extends Actor {
var messageNumber: Int = 0
protected def receive = {
case _
val payload = "%s".format(messageNumber)
messageNumber = messageNumber + 1
actorRef ! ZMQMessage(payload.getBytes)
}
}
object FreePort {
def isPortFree(port: Int) = {
try {
val socket = new Socket("127.0.0.1", port)
socket.close()
false
} catch {
case e: ConnectException true
case e: SocketException if e.getMessage == "Connection reset by peer" true
}
}
private def newPort = Random.nextInt(55365) + 10000
def randomFreePort(maxRetries: Int = 50) = {
var count = 0
var freePort = newPort
while (!isPortFree(freePort)) {
freePort = newPort
count += 1
if (count >= maxRetries) {
throw new RuntimeException("Couldn't determine a free port")
}
}
freePort
}
}
}

View file

@ -195,6 +195,17 @@ object AkkaBuild extends Build {
) )
) )
lazy val zeroMQ = Project(
id = "akka-zeromq",
base = file("akka-zeromq"),
dependencies = Seq(actor, testkit % "test;test->test"),
settings = defaultSettings ++ Seq(
libraryDependencies ++= Dependencies.zeroMQ
)
)
// lazy val spring = Project( // lazy val spring = Project(
// id = "akka-spring", // id = "akka-spring",
// base = file("akka-spring"), // base = file("akka-spring"),
@ -434,6 +445,8 @@ object Dependencies {
val tutorials = Seq(Test.scalatest, Test.junit) val tutorials = Seq(Test.scalatest, Test.junit)
val docs = Seq(Test.scalatest, Test.junit) val docs = Seq(Test.scalatest, Test.junit)
val zeroMQ = Seq(Test.scalatest, Test.junit, protobuf, Dependency.zeroMQ)
} }
object Dependency { object Dependency {
@ -489,6 +502,7 @@ object Dependency {
val zkClient = "zkclient" % "zkclient" % "0.3" // ApacheV2 val zkClient = "zkclient" % "zkclient" % "0.3" // ApacheV2
val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % V.Zookeeper // ApacheV2 val zookeeper = "org.apache.hadoop.zookeeper" % "zookeeper" % V.Zookeeper // ApacheV2
val zookeeperLock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % V.Zookeeper // ApacheV2 val zookeeperLock = "org.apache.hadoop.zookeeper" % "zookeeper-recipes-lock" % V.Zookeeper // ApacheV2
val zeroMQ = "org.zeromq" %% "zeromq-scala-binding" % "0.0.3" // ApacheV2
// Provided // Provided