#2463 - Making it possible to configure the Deserializer for 0MQ and fixing an exhaustiveness check in patmat for SocketOptions
This commit is contained in:
parent
05ac275f12
commit
f066f2d043
2 changed files with 3 additions and 6 deletions
|
|
@ -6,7 +6,7 @@ package docs.routing
|
|||
import RouterDocSpec.MyActor
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.routing.RoundRobinRouter
|
||||
import akka.actor.{ActorRef, Props, Actor}
|
||||
import akka.actor.{ ActorRef, Props, Actor }
|
||||
|
||||
object RouterDocSpec {
|
||||
class MyActor extends Actor {
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
|
|||
private val noBytes = Array[Byte]()
|
||||
private val zmqContext = params collectFirst { case c: Context ⇒ c } getOrElse DefaultContext
|
||||
|
||||
private val deserializer = deserializerFromParams
|
||||
private var deserializer = params collectFirst { case d: Deserializer ⇒ d } getOrElse new ZMQMessageDeserializer
|
||||
private val socketType = {
|
||||
import SocketType.{ ZMQSocketType ⇒ ST }
|
||||
params.collectFirst { case t: ST ⇒ t }.getOrElse(throw new IllegalArgumentException("A socket type is required"))
|
||||
|
|
@ -39,7 +39,6 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
|
|||
|
||||
private val socket: Socket = zmqContext.socket(socketType)
|
||||
private val poller: Poller = zmqContext.poller
|
||||
private val log = Logging(context.system, this)
|
||||
|
||||
private val pendingSends = new ListBuffer[Seq[Frame]]
|
||||
|
||||
|
|
@ -93,6 +92,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
|
|||
case MulticastHops(value) ⇒ socket.setMulticastHops(value)
|
||||
case SendBufferSize(value) ⇒ socket.setSendBufferSize(value)
|
||||
case ReceiveBufferSize(value) ⇒ socket.setReceiveBufferSize(value)
|
||||
case d: Deserializer ⇒ deserializer = d
|
||||
}
|
||||
|
||||
private def handleSocketOptionQuery(msg: SocketOptionQuery): Unit =
|
||||
|
|
@ -135,9 +135,6 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
|
|||
params filter (_.isInstanceOf[PubSubOption]) foreach { self ! _ }
|
||||
}
|
||||
|
||||
private def deserializerFromParams: Deserializer =
|
||||
params collectFirst { case d: Deserializer ⇒ d } getOrElse new ZMQMessageDeserializer
|
||||
|
||||
private def setupSocket() = params foreach {
|
||||
case _: SocketConnectOption | _: PubSubOption | _: SocketMeta ⇒ // ignore, handled differently
|
||||
case m ⇒ self ! m
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue