Adds explicit return type for newSocket, removes protected receive and makes context a val

This commit is contained in:
Ivan Porto Carrero 2012-01-16 10:54:46 +01:00
parent 3fd10205b5
commit 1d27d26022
3 changed files with 49 additions and 51 deletions

View file

@ -25,7 +25,7 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Ac
private case object ReceiveFrames
private case object ClearPoll
private case class PollError(ex: Throwable)
private def handleConnectionMessages: Receive = {
case Send(frames) {
sendFrames(frames)
@ -51,51 +51,51 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Ac
case Unsubscribe(topic) {
socket.unsubscribe(topic.toArray)
pollAndReceiveFrames()
}
}
}
private def handleSocketOption: Receive = {
case Linger(value) socket.setLinger(value)
case Linger sender ! socket.getLinger
case ReconnectIVL sender ! socket.getReconnectIVL
case ReconnectIVL(value) socket.setReconnectIVL(value)
case Backlog sender ! socket.getBacklog
case Backlog(value) socket.setBacklog(value)
case ReconnectIVLMax sender ! socket.getReconnectIVLMax
case ReconnectIVLMax(value) socket.setReconnectIVLMax(value)
case MaxMsgSize sender ! socket.getMaxMsgSize
case MaxMsgSize(value) socket.setMaxMsgSize(value)
case SndHWM sender ! socket.getSndHWM
case SndHWM(value) socket.setSndHWM(value)
case RcvHWM sender ! socket.getRcvHWM
case RcvHWM(value) socket.setRcvHWM(value)
case HWM(value) socket.setHWM(value)
case Swap sender ! socket.getSwap
case Swap(value) socket.setSwap(value)
case Affinity sender ! socket.getAffinity
case Affinity(value) socket.setAffinity(value)
case Identity sender ! socket.getIdentity
case Identity(value) socket.setIdentity(value)
case Rate sender ! socket.getRate
case Rate(value) socket.setRate(value)
case RecoveryInterval sender ! socket.getRecoveryInterval
case RecoveryInterval(value) socket.setRecoveryInterval(value)
case MulticastLoop sender ! socket.hasMulticastLoop
case MulticastLoop(value) socket.setMulticastLoop(value)
case MulticastHops sender ! socket.getMulticastHops
case MulticastHops(value) socket.setMulticastHops(value)
case ReceiveTimeOut sender ! socket.getReceiveTimeOut
case ReceiveTimeOut(value) socket.setReceiveTimeOut(value)
case SendTimeOut sender ! socket.getSendTimeOut
case SendTimeOut(value) socket.setSendTimeOut(value)
case SendBufferSize sender ! socket.getSendBufferSize
case SendBufferSize(value) socket.setSendBufferSize(value)
case ReceiveBufferSize sender ! socket.getReceiveBufferSize
case Linger(value) socket.setLinger(value)
case Linger sender ! socket.getLinger
case ReconnectIVL sender ! socket.getReconnectIVL
case ReconnectIVL(value) socket.setReconnectIVL(value)
case Backlog sender ! socket.getBacklog
case Backlog(value) socket.setBacklog(value)
case ReconnectIVLMax sender ! socket.getReconnectIVLMax
case ReconnectIVLMax(value) socket.setReconnectIVLMax(value)
case MaxMsgSize sender ! socket.getMaxMsgSize
case MaxMsgSize(value) socket.setMaxMsgSize(value)
case SndHWM sender ! socket.getSndHWM
case SndHWM(value) socket.setSndHWM(value)
case RcvHWM sender ! socket.getRcvHWM
case RcvHWM(value) socket.setRcvHWM(value)
case HWM(value) socket.setHWM(value)
case Swap sender ! socket.getSwap
case Swap(value) socket.setSwap(value)
case Affinity sender ! socket.getAffinity
case Affinity(value) socket.setAffinity(value)
case Identity sender ! socket.getIdentity
case Identity(value) socket.setIdentity(value)
case Rate sender ! socket.getRate
case Rate(value) socket.setRate(value)
case RecoveryInterval sender ! socket.getRecoveryInterval
case RecoveryInterval(value) socket.setRecoveryInterval(value)
case MulticastLoop sender ! socket.hasMulticastLoop
case MulticastLoop(value) socket.setMulticastLoop(value)
case MulticastHops sender ! socket.getMulticastHops
case MulticastHops(value) socket.setMulticastHops(value)
case ReceiveTimeOut sender ! socket.getReceiveTimeOut
case ReceiveTimeOut(value) socket.setReceiveTimeOut(value)
case SendTimeOut sender ! socket.getSendTimeOut
case SendTimeOut(value) socket.setSendTimeOut(value)
case SendBufferSize sender ! socket.getSendBufferSize
case SendBufferSize(value) socket.setSendBufferSize(value)
case ReceiveBufferSize sender ! socket.getReceiveBufferSize
case ReceiveBufferSize(value) socket.setReceiveBufferSize(value)
case ReceiveMore sender ! socket.hasReceiveMore
case FileDescriptor sender ! socket.getFD
case ReceiveMore sender ! socket.hasReceiveMore
case FileDescriptor sender ! socket.getFD
}
private def internalMessage: Receive = {
case Poll {
currentPoll = None
@ -108,8 +108,8 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Ac
}
self ! Poll
}
case ClearPoll => currentPoll = None
case PollError(ex) => {
case ClearPoll currentPoll = None
case PollError(ex) {
log.error(ex, "There was a problem polling the zeromq socket")
self ! Poll
}

View file

@ -7,7 +7,7 @@ import org.zeromq.{ ZMQ ⇒ JZMQ }
import akka.zeromq.SocketType._
class Context(numIoThreads: Int) {
private var context = JZMQ.context(numIoThreads)
private val context = JZMQ.context(numIoThreads)
def socket(socketType: SocketType) = {
context.socket(socketType.id)
}

View file

@ -8,7 +8,7 @@ import akka.util.duration._
import akka.zeromq.SocketType._
import org.zeromq.{ ZMQ JZMQ }
import akka.actor._
import akka.dispatch.{Dispatcher, Await}
import akka.dispatch.{ Dispatcher, Await }
case class SocketParameters(
socketType: SocketType,
@ -25,7 +25,7 @@ case class ZeroMQVersion(major: Int, minor: Int, patch: Int) {
object ZeroMQExtension extends ExtensionId[ZeroMQExtension] with ExtensionIdProvider {
def lookup() = this
def createExtension(system: ActorSystemImpl) = new ZeroMQExtension(system)
private val minVersionString = "2.1.0"
private val minVersion = JZMQ.makeVersion(2, 1, 0)
}
@ -47,7 +47,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension {
context: Context = DefaultContext, // For most applications you want to use the default context
deserializer: Deserializer = new ZMQMessageDeserializer,
pollDispatcher: Option[Dispatcher] = None,
pollTimeoutDuration: Duration = 500 millis) = {
pollTimeoutDuration: Duration = 500 millis): ActorRef = {
verifyZeroMQVersion
val params = SocketParameters(socketType, context, listener, pollDispatcher, deserializer, pollTimeoutDuration)
implicit val timeout = system.settings.ActorTimeout
@ -58,11 +58,9 @@ class ZeroMQExtension(system: ActorSystem) extends Extension {
val zeromq: ActorRef = {
verifyZeroMQVersion
system.actorOf(Props(new Actor {
protected def receive = { case p: Props sender ! context.actorOf(p) }
def receive = { case p: Props sender ! context.actorOf(p) }
}), "zeromq")
}
private def verifyZeroMQVersion = {
require(