uses collect first where appropriate, removes redundant config from the spec and uses a deathwatch for the listener

This commit is contained in:
Ivan Porto Carrero 2012-01-19 12:38:36 +01:00
parent 6f94c57ba6
commit 656a98fcaf
2 changed files with 18 additions and 24 deletions

View file

@ -9,6 +9,7 @@ import akka.actor._
import akka.dispatch.{ Promise, Future }
import akka.event.Logging
import akka.util.duration._
import annotation.tailrec
private[zeromq] sealed trait PollLifeCycle
private[zeromq] case object NoResults extends PollLifeCycle
@ -27,13 +28,13 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
import ConcurrentSocketActor._
private val noBytes = Array[Byte]()
private val zmqContext = {
params find (_.isInstanceOf[Context]) map (_.asInstanceOf[Context]) getOrElse new Context(1)
params collectFirst { case c: Context c } getOrElse Context()
}
private lazy val deserializer = deserializerFromParams
private lazy val socket: Socket = socketFromParams
private lazy val poller: Poller = zmqContext.poller
private val log = Logging(context.system, this)
private val deserializer = deserializerFromParams
private val socket: Socket = socketFromParams
private val poller: Poller = zmqContext.poller
private val log = Logging(context.system, this)
private def handleConnectionMessages: Receive = {
case Send(frames) {
@ -61,6 +62,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
socket.unsubscribe(topic.toArray)
pollAndReceiveFrames()
}
case Terminated(_) context stop self
}
private def handleSocketOption: Receive = {
@ -127,6 +129,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
override def receive: Receive = handleConnectionMessages orElse handleSocketOption orElse internalMessage
override def preStart {
watchListener()
setupSocket()
poller.register(socket, Poller.POLLIN)
setupConnection()
@ -139,13 +142,11 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
private def socketFromParams() = {
require(ZeroMQExtension.check[SocketType.ZMQSocketType](params), "A socket type is required")
(params
find (_.isInstanceOf[SocketType.ZMQSocketType])
map (t zmqContext.socket(t.asInstanceOf[SocketType.ZMQSocketType])) get)
(params collectFirst { case t: SocketType.ZMQSocketType zmqContext.socket(t) } get)
}
private def deserializerFromParams = {
params find (_.isInstanceOf[Deserializer]) map (_.asInstanceOf[Deserializer]) getOrElse new ZMQMessageDeserializer
params collectFirst { case d: Deserializer d } getOrElse new ZMQMessageDeserializer
}
private def setupSocket() = {
@ -181,14 +182,12 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
}
private lazy val eventLoopDispatcher = {
val fromConfig = params.find(_.isInstanceOf[PollDispatcher]) map {
option context.system.dispatchers.lookup(option.asInstanceOf[PollDispatcher].name)
}
val fromConfig = params collectFirst { case PollDispatcher(name) context.system.dispatchers.lookup(name) }
fromConfig getOrElse context.system.dispatcher
}
private lazy val pollTimeout = {
val fromConfig = params find (_.isInstanceOf[PollTimeoutDuration]) map (_.asInstanceOf[PollTimeoutDuration].duration)
val fromConfig = params collectFirst { case PollTimeoutDuration(duration) duration }
fromConfig getOrElse 100.millis
}
@ -225,12 +224,12 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
}
}
private def listenerOpt = params collectFirst { case Listener(l) l }
private def watchListener() {
listenerOpt foreach context.watch
}
private def notifyListener(message: Any) {
params find (_.isInstanceOf[Listener]) map (_.asInstanceOf[Listener].listener) foreach { listener
if (listener.isTerminated)
context stop self
else
listener ! message
}
listenerOpt foreach { _ ! message }
}
}

View file

@ -15,11 +15,6 @@ object ConcurrentSocketActorSpec {
val config = """
akka {
extensions = ["akka.zeromq.ZeroMQExtension$"]
zeromq {
socket-dispatcher {
type = "PinnedDispatcher"
}
}
}
"""
}