2012-01-14 03:16:39 +01:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
package akka.zeromq
|
|
|
|
|
|
|
|
|
|
import org.zeromq.ZMQ.{ Socket, Poller }
|
|
|
|
|
import org.zeromq.{ ZMQ ⇒ JZMQ }
|
|
|
|
|
import akka.actor._
|
2012-01-14 12:13:46 +01:00
|
|
|
import akka.dispatch.{ Promise, Future }
|
2012-01-16 10:48:23 +01:00
|
|
|
import akka.event.Logging
|
2012-01-14 03:16:39 +01:00
|
|
|
|
|
|
|
|
private[zeromq] sealed trait PollLifeCycle
|
|
|
|
|
private[zeromq] case object NoResults extends PollLifeCycle
|
|
|
|
|
private[zeromq] case object Results extends PollLifeCycle
|
|
|
|
|
private[zeromq] case object Closing extends PollLifeCycle
|
|
|
|
|
|
|
|
|
|
private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Actor {
|
|
|
|
|
|
|
|
|
|
private val noBytes = Array[Byte]()
|
|
|
|
|
private val socket: Socket = params.context.socket(params.socketType)
|
|
|
|
|
private val poller: Poller = params.context.poller
|
2012-01-16 10:48:23 +01:00
|
|
|
private val log = Logging(context.system, this)
|
|
|
|
|
|
2012-01-16 00:47:30 +01:00
|
|
|
private case object Poll
|
|
|
|
|
private case object ReceiveFrames
|
|
|
|
|
private case object ClearPoll
|
2012-01-16 00:44:09 +01:00
|
|
|
private case class PollError(ex: Throwable)
|
|
|
|
|
|
|
|
|
|
private def handleConnectionMessages: Receive = {
|
|
|
|
|
case Send(frames) ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
sendFrames(frames)
|
|
|
|
|
pollAndReceiveFrames()
|
2012-01-16 00:44:09 +01:00
|
|
|
}
|
|
|
|
|
case ZMQMessage(frames) ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
sendFrames(frames)
|
|
|
|
|
pollAndReceiveFrames()
|
2012-01-16 00:44:09 +01:00
|
|
|
}
|
|
|
|
|
case Connect(endpoint) ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
socket.connect(endpoint)
|
|
|
|
|
notifyListener(Connecting)
|
|
|
|
|
pollAndReceiveFrames()
|
2012-01-16 00:44:09 +01:00
|
|
|
}
|
|
|
|
|
case Bind(endpoint) ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
socket.bind(endpoint)
|
|
|
|
|
pollAndReceiveFrames()
|
2012-01-16 00:44:09 +01:00
|
|
|
}
|
|
|
|
|
case Subscribe(topic) ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
socket.subscribe(topic.toArray)
|
|
|
|
|
pollAndReceiveFrames()
|
2012-01-16 00:44:09 +01:00
|
|
|
}
|
|
|
|
|
case Unsubscribe(topic) ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
socket.unsubscribe(topic.toArray)
|
|
|
|
|
pollAndReceiveFrames()
|
2012-01-16 00:44:09 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def handleSocketOption: Receive = {
|
|
|
|
|
case Linger(value) ⇒ socket.setLinger(value)
|
|
|
|
|
case Linger ⇒ sender ! socket.getLinger
|
|
|
|
|
case ReconnectIVL ⇒ sender ! socket.getReconnectIVL
|
|
|
|
|
case ReconnectIVL(value) ⇒ socket.setReconnectIVL(value)
|
|
|
|
|
case Backlog ⇒ sender ! socket.getBacklog
|
|
|
|
|
case Backlog(value) ⇒ socket.setBacklog(value)
|
|
|
|
|
case ReconnectIVLMax ⇒ sender ! socket.getReconnectIVLMax
|
|
|
|
|
case ReconnectIVLMax(value) ⇒ socket.setReconnectIVLMax(value)
|
|
|
|
|
case MaxMsgSize ⇒ sender ! socket.getMaxMsgSize
|
|
|
|
|
case MaxMsgSize(value) ⇒ socket.setMaxMsgSize(value)
|
|
|
|
|
case SndHWM ⇒ sender ! socket.getSndHWM
|
|
|
|
|
case SndHWM(value) ⇒ socket.setSndHWM(value)
|
|
|
|
|
case RcvHWM ⇒ sender ! socket.getRcvHWM
|
|
|
|
|
case RcvHWM(value) ⇒ socket.setRcvHWM(value)
|
|
|
|
|
case HWM(value) ⇒ socket.setHWM(value)
|
|
|
|
|
case Swap ⇒ sender ! socket.getSwap
|
|
|
|
|
case Swap(value) ⇒ socket.setSwap(value)
|
|
|
|
|
case Affinity ⇒ sender ! socket.getAffinity
|
|
|
|
|
case Affinity(value) ⇒ socket.setAffinity(value)
|
|
|
|
|
case Identity ⇒ sender ! socket.getIdentity
|
|
|
|
|
case Identity(value) ⇒ socket.setIdentity(value)
|
|
|
|
|
case Rate ⇒ sender ! socket.getRate
|
|
|
|
|
case Rate(value) ⇒ socket.setRate(value)
|
|
|
|
|
case RecoveryInterval ⇒ sender ! socket.getRecoveryInterval
|
|
|
|
|
case RecoveryInterval(value) ⇒ socket.setRecoveryInterval(value)
|
|
|
|
|
case MulticastLoop ⇒ sender ! socket.hasMulticastLoop
|
|
|
|
|
case MulticastLoop(value) ⇒ socket.setMulticastLoop(value)
|
|
|
|
|
case MulticastHops ⇒ sender ! socket.getMulticastHops
|
|
|
|
|
case MulticastHops(value) ⇒ socket.setMulticastHops(value)
|
|
|
|
|
case ReceiveTimeOut ⇒ sender ! socket.getReceiveTimeOut
|
|
|
|
|
case ReceiveTimeOut(value) ⇒ socket.setReceiveTimeOut(value)
|
|
|
|
|
case SendTimeOut ⇒ sender ! socket.getSendTimeOut
|
|
|
|
|
case SendTimeOut(value) ⇒ socket.setSendTimeOut(value)
|
|
|
|
|
case SendBufferSize ⇒ sender ! socket.getSendBufferSize
|
|
|
|
|
case SendBufferSize(value) ⇒ socket.setSendBufferSize(value)
|
|
|
|
|
case ReceiveBufferSize ⇒ sender ! socket.getReceiveBufferSize
|
|
|
|
|
case ReceiveBufferSize(value) ⇒ socket.setReceiveBufferSize(value)
|
|
|
|
|
case ReceiveMore ⇒ sender ! socket.hasReceiveMore
|
|
|
|
|
case FileDescriptor ⇒ sender ! socket.getFD
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def internalMessage: Receive = {
|
|
|
|
|
case Poll ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
currentPoll = None
|
|
|
|
|
pollAndReceiveFrames()
|
|
|
|
|
}
|
2012-01-16 00:44:09 +01:00
|
|
|
case ReceiveFrames ⇒ {
|
2012-01-14 03:16:39 +01:00
|
|
|
receiveFrames() match {
|
|
|
|
|
case Seq() ⇒
|
|
|
|
|
case frames ⇒ notifyListener(params.deserializer(frames))
|
|
|
|
|
}
|
2012-01-16 00:44:09 +01:00
|
|
|
self ! Poll
|
|
|
|
|
}
|
|
|
|
|
case ClearPoll => currentPoll = None
|
|
|
|
|
case PollError(ex) => {
|
2012-01-16 10:48:23 +01:00
|
|
|
log.error(ex, "There was a problem polling the zeromq socket")
|
2012-01-16 00:44:09 +01:00
|
|
|
self ! Poll
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2012-01-16 00:44:09 +01:00
|
|
|
override def receive: Receive = handleConnectionMessages orElse handleSocketOption orElse internalMessage
|
|
|
|
|
|
2012-01-14 03:16:39 +01:00
|
|
|
override def preStart {
|
|
|
|
|
poller.register(socket, Poller.POLLIN)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def postStop {
|
2012-01-16 00:44:09 +01:00
|
|
|
try {
|
|
|
|
|
poller.unregister(socket)
|
|
|
|
|
currentPoll foreach { _ complete Right(Closing) }
|
|
|
|
|
if (socket != null) socket.close
|
|
|
|
|
} finally {
|
|
|
|
|
notifyListener(Closed)
|
|
|
|
|
}
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def sendFrames(frames: Seq[Frame]) {
|
2012-01-16 00:44:09 +01:00
|
|
|
def sendBytes(bytes: Seq[Byte], flags: Int) = socket.send(bytes.toArray, flags)
|
2012-01-14 03:16:39 +01:00
|
|
|
val iter = frames.iterator
|
|
|
|
|
while (iter.hasNext) {
|
|
|
|
|
val payload = iter.next.payload
|
|
|
|
|
val flags = if (iter.hasNext) JZMQ.SNDMORE else 0
|
|
|
|
|
sendBytes(payload, flags)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private var currentPoll: Option[Promise[PollLifeCycle]] = None
|
|
|
|
|
private def pollAndReceiveFrames() {
|
2012-01-16 00:44:09 +01:00
|
|
|
if (currentPoll.isEmpty) currentPoll = newEventLoop
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
|
2012-01-14 12:13:46 +01:00
|
|
|
private def newEventLoop: Option[Promise[PollLifeCycle]] = {
|
2012-01-16 10:48:23 +01:00
|
|
|
implicit val executor = params.pollDispatcher getOrElse context.system.dispatcher
|
2012-01-14 11:46:51 +01:00
|
|
|
Some((Future {
|
|
|
|
|
if (poller.poll(params.pollTimeoutDuration.toMicros) > 0 && poller.pollin(0)) Results else NoResults
|
2012-01-14 03:16:39 +01:00
|
|
|
}).asInstanceOf[Promise[PollLifeCycle]] onSuccess {
|
2012-01-16 00:44:09 +01:00
|
|
|
case Results ⇒ self ! ReceiveFrames
|
|
|
|
|
case NoResults ⇒ self ! Poll
|
|
|
|
|
case _ ⇒ self ! ClearPoll
|
2012-01-14 03:16:39 +01:00
|
|
|
} onFailure {
|
2012-01-16 00:44:09 +01:00
|
|
|
case ex ⇒ self ! PollError(ex)
|
2012-01-14 11:46:51 +01:00
|
|
|
})
|
2012-01-14 12:13:46 +01:00
|
|
|
}
|
2012-01-14 03:16:39 +01:00
|
|
|
|
|
|
|
|
private def receiveFrames(): Seq[Frame] = {
|
|
|
|
|
|
|
|
|
|
@inline def receiveBytes(): Array[Byte] = socket.recv(0) match {
|
2012-01-14 11:46:51 +01:00
|
|
|
case null ⇒ noBytes
|
|
|
|
|
case bytes: Array[_] if bytes.length > 0 ⇒ bytes
|
|
|
|
|
case _ ⇒ noBytes
|
2012-01-14 03:16:39 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
receiveBytes() match {
|
|
|
|
|
case `noBytes` ⇒ Vector.empty
|
|
|
|
|
case someBytes ⇒
|
|
|
|
|
var frames = Vector(Frame(someBytes))
|
|
|
|
|
while (socket.hasReceiveMore) receiveBytes() match {
|
|
|
|
|
case `noBytes` ⇒
|
|
|
|
|
case someBytes ⇒ frames :+= Frame(someBytes)
|
|
|
|
|
}
|
|
|
|
|
frames
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def notifyListener(message: Any) {
|
|
|
|
|
params.listener.foreach { listener ⇒
|
|
|
|
|
if (listener.isTerminated)
|
|
|
|
|
context stop self
|
|
|
|
|
else
|
|
|
|
|
listener ! message
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|