Update polling for compatibility with ZMQ
ZMQ 2.0 poll() accepts its duration in microseconds, but 3.0+ accepts milliseconds, causing poll to block for 1000 times as long as it should.
This commit is contained in:
parent
4e60362356
commit
535df04dc2
2 changed files with 6 additions and 1 deletions
|
|
@ -176,7 +176,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A
|
|||
val duration = fromConfig getOrElse ZeroMQExtension(context.system).DefaultPollTimeout
|
||||
if (duration > Duration.Zero) { (msg: PollMsg) ⇒
|
||||
// for positive timeout values, do poll (i.e. block this thread)
|
||||
poller.poll(duration.toMicros)
|
||||
ZeroMQExtension(context.system).poll(poller, duration)
|
||||
self ! msg
|
||||
} else {
|
||||
val d = -duration
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
package akka.zeromq
|
||||
|
||||
import org.zeromq.{ ZMQ ⇒ JZMQ }
|
||||
import org.zeromq.ZMQ.Poller
|
||||
import akka.actor._
|
||||
import akka.dispatch.{ Await }
|
||||
import akka.pattern.ask
|
||||
|
|
@ -47,6 +48,10 @@ class ZeroMQExtension(system: ActorSystem) extends Extension {
|
|||
val DefaultPollTimeout: Duration = Duration(system.settings.config.getMilliseconds("akka.zeromq.poll-timeout"), TimeUnit.MILLISECONDS)
|
||||
val NewSocketTimeout: Timeout = Timeout(Duration(system.settings.config.getMilliseconds("akka.zeromq.new-socket-timeout"), TimeUnit.MILLISECONDS))
|
||||
|
||||
val poll =
|
||||
if (version.major >= 3) (poller: Poller, duration: Duration) ⇒ poller.poll(duration.toMillis)
|
||||
else (poller: Poller, duration: Duration) ⇒ poller.poll(duration.toMicros)
|
||||
|
||||
/**
|
||||
* The version of the ZeroMQ library
|
||||
* @return a [[akka.zeromq.ZeroMQVersion]]
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue