From 1d27d26022be04458e5eed13804fb1742e5aa4ef Mon Sep 17 00:00:00 2001 From: Ivan Porto Carrero Date: Mon, 16 Jan 2012 10:54:46 +0100 Subject: [PATCH] Adds explicit return type for newSocket, removes protected receive and makes context a val --- .../akka/zeromq/ConcurrentSocketActor.scala | 88 +++++++++---------- .../src/main/scala/akka/zeromq/Context.scala | 2 +- .../scala/akka/zeromq/ZeroMQExtension.scala | 10 +-- 3 files changed, 49 insertions(+), 51 deletions(-) diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala index fc71400fdd..723e7ef72f 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala @@ -25,7 +25,7 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Ac private case object ReceiveFrames private case object ClearPoll private case class PollError(ex: Throwable) - + private def handleConnectionMessages: Receive = { case Send(frames) ⇒ { sendFrames(frames) @@ -51,51 +51,51 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Ac case Unsubscribe(topic) ⇒ { socket.unsubscribe(topic.toArray) pollAndReceiveFrames() - } + } } - + private def handleSocketOption: Receive = { - case Linger(value) ⇒ socket.setLinger(value) - case Linger ⇒ sender ! socket.getLinger - case ReconnectIVL ⇒ sender ! socket.getReconnectIVL - case ReconnectIVL(value) ⇒ socket.setReconnectIVL(value) - case Backlog ⇒ sender ! socket.getBacklog - case Backlog(value) ⇒ socket.setBacklog(value) - case ReconnectIVLMax ⇒ sender ! socket.getReconnectIVLMax - case ReconnectIVLMax(value) ⇒ socket.setReconnectIVLMax(value) - case MaxMsgSize ⇒ sender ! socket.getMaxMsgSize - case MaxMsgSize(value) ⇒ socket.setMaxMsgSize(value) - case SndHWM ⇒ sender ! socket.getSndHWM - case SndHWM(value) ⇒ socket.setSndHWM(value) - case RcvHWM ⇒ sender ! socket.getRcvHWM - case RcvHWM(value) ⇒ socket.setRcvHWM(value) - case HWM(value) ⇒ socket.setHWM(value) - case Swap ⇒ sender ! socket.getSwap - case Swap(value) ⇒ socket.setSwap(value) - case Affinity ⇒ sender ! socket.getAffinity - case Affinity(value) ⇒ socket.setAffinity(value) - case Identity ⇒ sender ! socket.getIdentity - case Identity(value) ⇒ socket.setIdentity(value) - case Rate ⇒ sender ! socket.getRate - case Rate(value) ⇒ socket.setRate(value) - case RecoveryInterval ⇒ sender ! socket.getRecoveryInterval - case RecoveryInterval(value) ⇒ socket.setRecoveryInterval(value) - case MulticastLoop ⇒ sender ! socket.hasMulticastLoop - case MulticastLoop(value) ⇒ socket.setMulticastLoop(value) - case MulticastHops ⇒ sender ! socket.getMulticastHops - case MulticastHops(value) ⇒ socket.setMulticastHops(value) - case ReceiveTimeOut ⇒ sender ! socket.getReceiveTimeOut - case ReceiveTimeOut(value) ⇒ socket.setReceiveTimeOut(value) - case SendTimeOut ⇒ sender ! socket.getSendTimeOut - case SendTimeOut(value) ⇒ socket.setSendTimeOut(value) - case SendBufferSize ⇒ sender ! socket.getSendBufferSize - case SendBufferSize(value) ⇒ socket.setSendBufferSize(value) - case ReceiveBufferSize ⇒ sender ! socket.getReceiveBufferSize + case Linger(value) ⇒ socket.setLinger(value) + case Linger ⇒ sender ! socket.getLinger + case ReconnectIVL ⇒ sender ! socket.getReconnectIVL + case ReconnectIVL(value) ⇒ socket.setReconnectIVL(value) + case Backlog ⇒ sender ! socket.getBacklog + case Backlog(value) ⇒ socket.setBacklog(value) + case ReconnectIVLMax ⇒ sender ! socket.getReconnectIVLMax + case ReconnectIVLMax(value) ⇒ socket.setReconnectIVLMax(value) + case MaxMsgSize ⇒ sender ! socket.getMaxMsgSize + case MaxMsgSize(value) ⇒ socket.setMaxMsgSize(value) + case SndHWM ⇒ sender ! socket.getSndHWM + case SndHWM(value) ⇒ socket.setSndHWM(value) + case RcvHWM ⇒ sender ! socket.getRcvHWM + case RcvHWM(value) ⇒ socket.setRcvHWM(value) + case HWM(value) ⇒ socket.setHWM(value) + case Swap ⇒ sender ! socket.getSwap + case Swap(value) ⇒ socket.setSwap(value) + case Affinity ⇒ sender ! socket.getAffinity + case Affinity(value) ⇒ socket.setAffinity(value) + case Identity ⇒ sender ! socket.getIdentity + case Identity(value) ⇒ socket.setIdentity(value) + case Rate ⇒ sender ! socket.getRate + case Rate(value) ⇒ socket.setRate(value) + case RecoveryInterval ⇒ sender ! socket.getRecoveryInterval + case RecoveryInterval(value) ⇒ socket.setRecoveryInterval(value) + case MulticastLoop ⇒ sender ! socket.hasMulticastLoop + case MulticastLoop(value) ⇒ socket.setMulticastLoop(value) + case MulticastHops ⇒ sender ! socket.getMulticastHops + case MulticastHops(value) ⇒ socket.setMulticastHops(value) + case ReceiveTimeOut ⇒ sender ! socket.getReceiveTimeOut + case ReceiveTimeOut(value) ⇒ socket.setReceiveTimeOut(value) + case SendTimeOut ⇒ sender ! socket.getSendTimeOut + case SendTimeOut(value) ⇒ socket.setSendTimeOut(value) + case SendBufferSize ⇒ sender ! socket.getSendBufferSize + case SendBufferSize(value) ⇒ socket.setSendBufferSize(value) + case ReceiveBufferSize ⇒ sender ! socket.getReceiveBufferSize case ReceiveBufferSize(value) ⇒ socket.setReceiveBufferSize(value) - case ReceiveMore ⇒ sender ! socket.hasReceiveMore - case FileDescriptor ⇒ sender ! socket.getFD + case ReceiveMore ⇒ sender ! socket.hasReceiveMore + case FileDescriptor ⇒ sender ! socket.getFD } - + private def internalMessage: Receive = { case Poll ⇒ { currentPoll = None @@ -108,8 +108,8 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Ac } self ! Poll } - case ClearPoll => currentPoll = None - case PollError(ex) => { + case ClearPoll ⇒ currentPoll = None + case PollError(ex) ⇒ { log.error(ex, "There was a problem polling the zeromq socket") self ! Poll } diff --git a/akka-zeromq/src/main/scala/akka/zeromq/Context.scala b/akka-zeromq/src/main/scala/akka/zeromq/Context.scala index 073270436f..7d768e1492 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/Context.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/Context.scala @@ -7,7 +7,7 @@ import org.zeromq.{ ZMQ ⇒ JZMQ } import akka.zeromq.SocketType._ class Context(numIoThreads: Int) { - private var context = JZMQ.context(numIoThreads) + private val context = JZMQ.context(numIoThreads) def socket(socketType: SocketType) = { context.socket(socketType.id) } diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala index 06b929e48a..af01428ebc 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala @@ -8,7 +8,7 @@ import akka.util.duration._ import akka.zeromq.SocketType._ import org.zeromq.{ ZMQ ⇒ JZMQ } import akka.actor._ -import akka.dispatch.{Dispatcher, Await} +import akka.dispatch.{ Dispatcher, Await } case class SocketParameters( socketType: SocketType, @@ -25,7 +25,7 @@ case class ZeroMQVersion(major: Int, minor: Int, patch: Int) { object ZeroMQExtension extends ExtensionId[ZeroMQExtension] with ExtensionIdProvider { def lookup() = this def createExtension(system: ActorSystemImpl) = new ZeroMQExtension(system) - + private val minVersionString = "2.1.0" private val minVersion = JZMQ.makeVersion(2, 1, 0) } @@ -47,7 +47,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension { context: Context = DefaultContext, // For most applications you want to use the default context deserializer: Deserializer = new ZMQMessageDeserializer, pollDispatcher: Option[Dispatcher] = None, - pollTimeoutDuration: Duration = 500 millis) = { + pollTimeoutDuration: Duration = 500 millis): ActorRef = { verifyZeroMQVersion val params = SocketParameters(socketType, context, listener, pollDispatcher, deserializer, pollTimeoutDuration) implicit val timeout = system.settings.ActorTimeout @@ -58,11 +58,9 @@ class ZeroMQExtension(system: ActorSystem) extends Extension { val zeromq: ActorRef = { verifyZeroMQVersion system.actorOf(Props(new Actor { - protected def receive = { case p: Props ⇒ sender ! context.actorOf(p) } + def receive = { case p: Props ⇒ sender ! context.actorOf(p) } }), "zeromq") } - - private def verifyZeroMQVersion = { require(