From 656a98fcafbf679feff0702bffd545f54420b792 Mon Sep 17 00:00:00 2001 From: Ivan Porto Carrero Date: Thu, 19 Jan 2012 12:38:36 +0100 Subject: [PATCH] uses collect first where appropriate, removes redundant config from the spec and uses a deathwatch for the listener --- .../akka/zeromq/ConcurrentSocketActor.scala | 37 +++++++++---------- .../zeromq/ConcurrentSocketActorSpec.scala | 5 --- 2 files changed, 18 insertions(+), 24 deletions(-) diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala index 50d1ee4c8a..2a0d376393 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala @@ -9,6 +9,7 @@ import akka.actor._ import akka.dispatch.{ Promise, Future } import akka.event.Logging import akka.util.duration._ +import annotation.tailrec private[zeromq] sealed trait PollLifeCycle private[zeromq] case object NoResults extends PollLifeCycle @@ -27,13 +28,13 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A import ConcurrentSocketActor._ private val noBytes = Array[Byte]() private val zmqContext = { - params find (_.isInstanceOf[Context]) map (_.asInstanceOf[Context]) getOrElse new Context(1) + params collectFirst { case c: Context ⇒ c } getOrElse Context() } - private lazy val deserializer = deserializerFromParams - private lazy val socket: Socket = socketFromParams - private lazy val poller: Poller = zmqContext.poller - private val log = Logging(context.system, this) + private val deserializer = deserializerFromParams + private val socket: Socket = socketFromParams + private val poller: Poller = zmqContext.poller + private val log = Logging(context.system, this) private def handleConnectionMessages: Receive = { case Send(frames) ⇒ { @@ -61,6 +62,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A socket.unsubscribe(topic.toArray) pollAndReceiveFrames() } + case Terminated(_) ⇒ context stop self } private def handleSocketOption: Receive = { @@ -127,6 +129,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A override def receive: Receive = handleConnectionMessages orElse handleSocketOption orElse internalMessage override def preStart { + watchListener() setupSocket() poller.register(socket, Poller.POLLIN) setupConnection() @@ -139,13 +142,11 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A private def socketFromParams() = { require(ZeroMQExtension.check[SocketType.ZMQSocketType](params), "A socket type is required") - (params - find (_.isInstanceOf[SocketType.ZMQSocketType]) - map (t ⇒ zmqContext.socket(t.asInstanceOf[SocketType.ZMQSocketType])) get) + (params collectFirst { case t: SocketType.ZMQSocketType ⇒ zmqContext.socket(t) } get) } private def deserializerFromParams = { - params find (_.isInstanceOf[Deserializer]) map (_.asInstanceOf[Deserializer]) getOrElse new ZMQMessageDeserializer + params collectFirst { case d: Deserializer ⇒ d } getOrElse new ZMQMessageDeserializer } private def setupSocket() = { @@ -181,14 +182,12 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A } private lazy val eventLoopDispatcher = { - val fromConfig = params.find(_.isInstanceOf[PollDispatcher]) map { - option ⇒ context.system.dispatchers.lookup(option.asInstanceOf[PollDispatcher].name) - } + val fromConfig = params collectFirst { case PollDispatcher(name) ⇒ context.system.dispatchers.lookup(name) } fromConfig getOrElse context.system.dispatcher } private lazy val pollTimeout = { - val fromConfig = params find (_.isInstanceOf[PollTimeoutDuration]) map (_.asInstanceOf[PollTimeoutDuration].duration) + val fromConfig = params collectFirst { case PollTimeoutDuration(duration) ⇒ duration } fromConfig getOrElse 100.millis } @@ -225,12 +224,12 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A } } + private def listenerOpt = params collectFirst { case Listener(l) ⇒ l } + private def watchListener() { + listenerOpt foreach context.watch + } + private def notifyListener(message: Any) { - params find (_.isInstanceOf[Listener]) map (_.asInstanceOf[Listener].listener) foreach { listener ⇒ - if (listener.isTerminated) - context stop self - else - listener ! message - } + listenerOpt foreach { _ ! message } } } diff --git a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala index 844df2c139..4b3c5b46b6 100644 --- a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala +++ b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala @@ -15,11 +15,6 @@ object ConcurrentSocketActorSpec { val config = """ akka { extensions = ["akka.zeromq.ZeroMQExtension$"] - zeromq { - socket-dispatcher { - type = "PinnedDispatcher" - } - } } """ }