Rewriting the polling for 0mq so that we can discern EAGAIN from empty poll
This commit is contained in:
parent
f8f0ec1746
commit
382a96a189
1 changed files with 15 additions and 21 deletions
|
|
@ -190,30 +190,24 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
|
|||
}
|
||||
|
||||
@tailrec private def doPoll(mode: PollMsg, togo: Int = 10): Unit =
|
||||
receiveMessage(mode) match {
|
||||
case null ⇒ // receiveMessage has already done something special here
|
||||
case Seq() ⇒ doPollTimeout(mode)
|
||||
case frames ⇒
|
||||
notifyListener(deserializer(frames))
|
||||
if (togo > 0) doPoll(mode, togo - 1)
|
||||
else self ! mode
|
||||
if (togo <= 0) self ! mode
|
||||
else receiveMessage(mode) match {
|
||||
case Seq() ⇒ doPollTimeout(mode)
|
||||
case frames ⇒ notifyListener(deserializer(frames)); doPoll(mode, togo - 1)
|
||||
}
|
||||
|
||||
@tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[Frame] = Vector.empty): Seq[Frame] = {
|
||||
val result = mode match {
|
||||
case Poll ⇒ socket.recv(JZMQ.NOBLOCK)
|
||||
case PollCareful ⇒ if (poller.poll(0) > 0) socket.recv(0) else null
|
||||
@tailrec private def receiveMessage(mode: PollMsg, currentFrames: Vector[Frame] = Vector.empty): Seq[Frame] =
|
||||
if (mode == PollCareful && (poller.poll(0) <= 0)) {
|
||||
if (currentFrames.isEmpty) currentFrames else throw new IllegalStateException("Received partial transmission!")
|
||||
} else {
|
||||
socket.recv(if (mode == Poll) JZMQ.NOBLOCK else 0) match {
|
||||
case null ⇒ /*EAGAIN*/
|
||||
if (currentFrames.isEmpty) currentFrames else receiveMessage(mode, currentFrames)
|
||||
case bytes ⇒
|
||||
val frames = currentFrames :+ Frame(if (bytes.length == 0) noBytes else bytes)
|
||||
if (socket.hasReceiveMore) receiveMessage(mode, frames) else frames
|
||||
}
|
||||
}
|
||||
result match {
|
||||
case null ⇒
|
||||
if (socket.hasReceiveMore) receiveMessage(mode, currentFrames)
|
||||
else if (currentFrames.isEmpty) currentFrames
|
||||
else throw new IllegalStateException("no more frames available while socket.hasReceivedMore==true")
|
||||
case bytes ⇒
|
||||
val frames = currentFrames :+ Frame(if (bytes.length == 0) noBytes else bytes)
|
||||
if (socket.hasReceiveMore) receiveMessage(mode, frames) else frames
|
||||
}
|
||||
}
|
||||
|
||||
private val listenerOpt = params collectFirst { case Listener(l) ⇒ l }
|
||||
private def watchListener(): Unit = listenerOpt foreach context.watch
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue