2012-01-14 03:16:39 +01:00
|
|
|
/**
|
2012-01-18 21:01:14 +01:00
|
|
|
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
|
2012-01-14 03:16:39 +01:00
|
|
|
*/
|
|
|
|
|
package akka.zeromq
|
|
|
|
|
|
|
|
|
|
import org.zeromq.ZMQ.{ Socket, Poller }
|
|
|
|
|
import org.zeromq.{ ZMQ ⇒ JZMQ }
|
|
|
|
|
import akka.actor._
|
2012-01-14 12:13:46 +01:00
|
|
|
import akka.dispatch.{ Promise, Future }
|
2012-01-16 10:48:23 +01:00
|
|
|
import akka.event.Logging
|
2012-01-19 00:26:52 +01:00
|
|
|
import akka.util.duration._
|
2012-01-19 12:38:36 +01:00
|
|
|
import annotation.tailrec
|
2012-01-19 13:06:22 +01:00
|
|
|
import akka.util.Duration
|
|
|
|
|
import java.util.concurrent.TimeUnit
|
2012-01-14 03:16:39 +01:00
|
|
|
|
|
|
|
|
private[zeromq] sealed trait PollLifeCycle
|
|
|
|
|
private[zeromq] case object NoResults extends PollLifeCycle
|
|
|
|
|
private[zeromq] case object Results extends PollLifeCycle
|
|
|
|
|
private[zeromq] case object Closing extends PollLifeCycle
|
|
|
|
|
|
2012-01-19 12:04:35 +01:00
|
|
|
private[zeromq] object ConcurrentSocketActor {
|
|
|
|
|
private case object Poll
|
|
|
|
|
private case object ReceiveFrames
|
|
|
|
|
private case object ClearPoll
|
|
|
|
|
private case class PollError(ex: Throwable)
|
|
|
|
|
|
|
|
|
|
}
|
2012-01-19 00:26:52 +01:00
|
|
|
private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends Actor {
|
2012-01-14 03:16:39 +01:00
|
|
|
|
2012-01-19 12:04:35 +01:00
|
|
|
import ConcurrentSocketActor._
|
2012-01-14 03:16:39 +01:00
|
|
|
private val noBytes = Array[Byte]()
|
2012-01-19 00:26:52 +01:00
|
|
|
private val zmqContext = {
|
2012-01-19 12:38:36 +01:00
|
|
|
params collectFirst { case c: Context ⇒ c } getOrElse Context()
|
2012-01-19 00:26:52 +01:00
|
|
|
}
|
2012-01-16 10:48:23 +01:00
|
|
|
|
2012-01-19 12:38:36 +01:00
|
|
|
private val deserializer = deserializerFromParams
|
|
|
|
|
private val socket: Socket = socketFromParams
|
|
|
|
|
private val poller: Poller = zmqContext.poller
|
|
|
|
|
private val log = Logging(context.system, this)
|
2012-01-16 10:54:46 +01:00
|
|
|
|
2012-01-16 00:44:09 +01:00
|
|
|
private def handleConnectionMessages: Receive = {
|
|
|
|
|
case Send(frames) ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
sendFrames(frames)
|
|
|
|
|
pollAndReceiveFrames()
|
2012-01-16 00:44:09 +01:00
|
|
|
}
|
|
|
|
|
case ZMQMessage(frames) ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
sendFrames(frames)
|
|
|
|
|
pollAndReceiveFrames()
|
2012-01-16 00:44:09 +01:00
|
|
|
}
|
|
|
|
|
case Connect(endpoint) ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
socket.connect(endpoint)
|
|
|
|
|
notifyListener(Connecting)
|
|
|
|
|
pollAndReceiveFrames()
|
2012-01-16 00:44:09 +01:00
|
|
|
}
|
|
|
|
|
case Bind(endpoint) ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
socket.bind(endpoint)
|
|
|
|
|
pollAndReceiveFrames()
|
2012-01-16 00:44:09 +01:00
|
|
|
}
|
|
|
|
|
case Subscribe(topic) ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
socket.subscribe(topic.toArray)
|
|
|
|
|
pollAndReceiveFrames()
|
2012-01-16 00:44:09 +01:00
|
|
|
}
|
|
|
|
|
case Unsubscribe(topic) ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
socket.unsubscribe(topic.toArray)
|
|
|
|
|
pollAndReceiveFrames()
|
2012-01-16 10:54:46 +01:00
|
|
|
}
|
2012-01-19 12:38:36 +01:00
|
|
|
case Terminated(_) ⇒ context stop self
|
2012-01-16 00:44:09 +01:00
|
|
|
}
|
2012-01-16 10:54:46 +01:00
|
|
|
|
2012-01-16 00:44:09 +01:00
|
|
|
private def handleSocketOption: Receive = {
|
2012-01-19 09:50:59 +01:00
|
|
|
case Linger(value) ⇒ socket.setLinger(value)
|
|
|
|
|
case ReconnectIVL(value) ⇒ socket.setReconnectIVL(value)
|
|
|
|
|
case Backlog(value) ⇒ socket.setBacklog(value)
|
|
|
|
|
case ReconnectIVLMax(value) ⇒ socket.setReconnectIVLMax(value)
|
|
|
|
|
case MaxMsgSize(value) ⇒ socket.setMaxMsgSize(value)
|
|
|
|
|
case SndHWM(value) ⇒ socket.setSndHWM(value)
|
|
|
|
|
case RcvHWM(value) ⇒ socket.setRcvHWM(value)
|
|
|
|
|
case HWM(value) ⇒ socket.setHWM(value)
|
|
|
|
|
case Swap(value) ⇒ socket.setSwap(value)
|
|
|
|
|
case Affinity(value) ⇒ socket.setAffinity(value)
|
|
|
|
|
case Identity(value) ⇒ socket.setIdentity(value)
|
|
|
|
|
case Rate(value) ⇒ socket.setRate(value)
|
|
|
|
|
case RecoveryInterval(value) ⇒ socket.setRecoveryInterval(value)
|
|
|
|
|
case MulticastLoop(value) ⇒ socket.setMulticastLoop(value)
|
|
|
|
|
case MulticastHops(value) ⇒ socket.setMulticastHops(value)
|
|
|
|
|
case ReceiveTimeOut(value) ⇒ socket.setReceiveTimeOut(value)
|
|
|
|
|
case SendTimeOut(value) ⇒ socket.setSendTimeOut(value)
|
|
|
|
|
case SendBufferSize(value) ⇒ socket.setSendBufferSize(value)
|
|
|
|
|
case ReceiveBufferSize(value) ⇒ socket.setReceiveBufferSize(value)
|
|
|
|
|
case Linger ⇒ sender ! socket.getLinger
|
|
|
|
|
case ReconnectIVL ⇒ sender ! socket.getReconnectIVL
|
|
|
|
|
case Backlog ⇒ sender ! socket.getBacklog
|
|
|
|
|
case ReconnectIVLMax ⇒ sender ! socket.getReconnectIVLMax
|
|
|
|
|
case MaxMsgSize ⇒ sender ! socket.getMaxMsgSize
|
|
|
|
|
case SndHWM ⇒ sender ! socket.getSndHWM
|
|
|
|
|
case RcvHWM ⇒ sender ! socket.getRcvHWM
|
|
|
|
|
case Swap ⇒ sender ! socket.getSwap
|
|
|
|
|
case Affinity ⇒ sender ! socket.getAffinity
|
|
|
|
|
case Identity ⇒ sender ! socket.getIdentity
|
|
|
|
|
case Rate ⇒ sender ! socket.getRate
|
|
|
|
|
case RecoveryInterval ⇒ sender ! socket.getRecoveryInterval
|
|
|
|
|
case MulticastLoop ⇒ sender ! socket.hasMulticastLoop
|
|
|
|
|
case MulticastHops ⇒ sender ! socket.getMulticastHops
|
|
|
|
|
case ReceiveTimeOut ⇒ sender ! socket.getReceiveTimeOut
|
|
|
|
|
case SendTimeOut ⇒ sender ! socket.getSendTimeOut
|
|
|
|
|
case SendBufferSize ⇒ sender ! socket.getSendBufferSize
|
|
|
|
|
case ReceiveBufferSize ⇒ sender ! socket.getReceiveBufferSize
|
|
|
|
|
case ReceiveMore ⇒ sender ! socket.hasReceiveMore
|
|
|
|
|
case FileDescriptor ⇒ sender ! socket.getFD
|
2012-01-16 00:44:09 +01:00
|
|
|
}
|
2012-01-16 10:54:46 +01:00
|
|
|
|
2012-01-16 00:44:09 +01:00
|
|
|
private def internalMessage: Receive = {
|
|
|
|
|
case Poll ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
currentPoll = None
|
|
|
|
|
pollAndReceiveFrames()
|
|
|
|
|
}
|
2012-01-16 00:44:09 +01:00
|
|
|
case ReceiveFrames ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
receiveFrames() match {
|
|
|
|
|
case Seq() ⇒
|
2012-01-19 00:26:52 +01:00
|
|
|
case frames ⇒ notifyListener(deserializer(frames))
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
2012-01-16 00:44:09 +01:00
|
|
|
self ! Poll
|
|
|
|
|
}
|
2012-01-16 10:54:46 +01:00
|
|
|
case ClearPoll ⇒ currentPoll = None
|
|
|
|
|
case PollError(ex) ⇒ {
|
2012-01-16 10:48:23 +01:00
|
|
|
log.error(ex, "There was a problem polling the zeromq socket")
|
2012-01-16 00:44:09 +01:00
|
|
|
self ! Poll
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-16 00:44:09 +01:00
|
|
|
override def receive: Receive = handleConnectionMessages orElse handleSocketOption orElse internalMessage
|
|
|
|
|
|
2012-01-14 03:16:39 +01:00
|
|
|
override def preStart {
|
2012-01-19 12:38:36 +01:00
|
|
|
watchListener()
|
2012-01-19 00:26:52 +01:00
|
|
|
setupSocket()
|
2012-01-14 03:16:39 +01:00
|
|
|
poller.register(socket, Poller.POLLIN)
|
2012-01-19 09:50:59 +01:00
|
|
|
setupConnection()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def setupConnection() {
|
|
|
|
|
params filter (_.isInstanceOf[SocketConnectOption]) foreach { self ! _ }
|
|
|
|
|
params filter (_.isInstanceOf[PubSubOption]) foreach { self ! _ }
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
|
2012-01-19 00:26:52 +01:00
|
|
|
private def socketFromParams() = {
|
|
|
|
|
require(ZeroMQExtension.check[SocketType.ZMQSocketType](params), "A socket type is required")
|
2012-01-19 12:38:36 +01:00
|
|
|
(params collectFirst { case t: SocketType.ZMQSocketType ⇒ zmqContext.socket(t) } get)
|
2012-01-19 00:26:52 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def deserializerFromParams = {
|
2012-01-19 12:38:36 +01:00
|
|
|
params collectFirst { case d: Deserializer ⇒ d } getOrElse new ZMQMessageDeserializer
|
2012-01-19 00:26:52 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def setupSocket() = {
|
|
|
|
|
params foreach {
|
2012-01-19 09:50:59 +01:00
|
|
|
case _: SocketConnectOption | _: PubSubOption | _: SocketMeta ⇒ // ignore, handled differently
|
|
|
|
|
case m ⇒ self ! m
|
2012-01-19 00:26:52 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-14 03:16:39 +01:00
|
|
|
override def postStop {
|
2012-01-16 00:44:09 +01:00
|
|
|
try {
|
|
|
|
|
currentPoll foreach { _ complete Right(Closing) }
|
2012-01-19 12:50:51 +01:00
|
|
|
poller.unregister(socket)
|
2012-01-16 00:44:09 +01:00
|
|
|
if (socket != null) socket.close
|
|
|
|
|
} finally {
|
|
|
|
|
notifyListener(Closed)
|
|
|
|
|
}
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def sendFrames(frames: Seq[Frame]) {
|
2012-01-16 00:44:09 +01:00
|
|
|
def sendBytes(bytes: Seq[Byte], flags: Int) = socket.send(bytes.toArray, flags)
|
2012-01-14 03:16:39 +01:00
|
|
|
val iter = frames.iterator
|
|
|
|
|
while (iter.hasNext) {
|
|
|
|
|
val payload = iter.next.payload
|
|
|
|
|
val flags = if (iter.hasNext) JZMQ.SNDMORE else 0
|
|
|
|
|
sendBytes(payload, flags)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private var currentPoll: Option[Promise[PollLifeCycle]] = None
|
|
|
|
|
private def pollAndReceiveFrames() {
|
2012-01-16 00:44:09 +01:00
|
|
|
if (currentPoll.isEmpty) currentPoll = newEventLoop
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
|
2012-01-19 12:50:51 +01:00
|
|
|
private val eventLoopDispatcher = {
|
2012-01-19 12:38:36 +01:00
|
|
|
val fromConfig = params collectFirst { case PollDispatcher(name) ⇒ context.system.dispatchers.lookup(name) }
|
2012-01-19 00:26:52 +01:00
|
|
|
fromConfig getOrElse context.system.dispatcher
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-19 13:06:22 +01:00
|
|
|
private val defaultPollTimeout =
|
|
|
|
|
Duration(context.system.settings.config.getMilliseconds("akka.zeromq.poll-timeout"), TimeUnit.MILLISECONDS)
|
|
|
|
|
|
2012-01-19 12:50:51 +01:00
|
|
|
private val pollTimeout = {
|
2012-01-19 12:38:36 +01:00
|
|
|
val fromConfig = params collectFirst { case PollTimeoutDuration(duration) ⇒ duration }
|
2012-01-19 13:06:22 +01:00
|
|
|
fromConfig getOrElse defaultPollTimeout
|
2012-01-18 21:01:14 +01:00
|
|
|
}
|
|
|
|
|
|
2012-01-14 12:13:46 +01:00
|
|
|
private def newEventLoop: Option[Promise[PollLifeCycle]] = {
|
2012-01-18 21:01:14 +01:00
|
|
|
implicit val executor = eventLoopDispatcher
|
2012-01-14 11:46:51 +01:00
|
|
|
Some((Future {
|
2012-01-19 00:26:52 +01:00
|
|
|
if (poller.poll(pollTimeout.toMicros) > 0 && poller.pollin(0)) Results else NoResults
|
2012-01-14 03:16:39 +01:00
|
|
|
}).asInstanceOf[Promise[PollLifeCycle]] onSuccess {
|
2012-01-16 00:44:09 +01:00
|
|
|
case Results ⇒ self ! ReceiveFrames
|
|
|
|
|
case NoResults ⇒ self ! Poll
|
|
|
|
|
case _ ⇒ self ! ClearPoll
|
2012-01-14 03:16:39 +01:00
|
|
|
} onFailure {
|
2012-01-16 00:44:09 +01:00
|
|
|
case ex ⇒ self ! PollError(ex)
|
2012-01-14 11:46:51 +01:00
|
|
|
})
|
2012-01-14 12:13:46 +01:00
|
|
|
}
|
2012-01-14 03:16:39 +01:00
|
|
|
|
|
|
|
|
private def receiveFrames(): Seq[Frame] = {
|
2012-01-19 12:41:04 +01:00
|
|
|
@tailrec def receiveBytes(next: Array[Byte], currentFrames: Vector[Frame] = Vector.empty): Seq[Frame] = {
|
|
|
|
|
val nwBytes = if (next != null && next.nonEmpty) next else noBytes
|
|
|
|
|
val frames = currentFrames :+ Frame(nwBytes)
|
|
|
|
|
if (socket.hasReceiveMore) receiveBytes(socket.recv(0), frames) else frames
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
|
2012-01-19 12:41:04 +01:00
|
|
|
receiveBytes(socket.recv(0))
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
|
2012-01-19 12:50:51 +01:00
|
|
|
private val listenerOpt = params collectFirst { case Listener(l) ⇒ l }
|
2012-01-19 12:38:36 +01:00
|
|
|
private def watchListener() {
|
|
|
|
|
listenerOpt foreach context.watch
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-14 03:16:39 +01:00
|
|
|
private def notifyListener(message: Any) {
|
2012-01-19 12:38:36 +01:00
|
|
|
listenerOpt foreach { _ ! message }
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
}
|