From 535df04dc2aca548f117e5f7c16f240ca20894e8 Mon Sep 17 00:00:00 2001 From: Joshua Gao Date: Wed, 27 Jun 2012 16:16:20 -0700 Subject: [PATCH] Update polling for compatibility with ZMQ ZMQ 2.0 poll() accepts its duration in microseconds, but 3.0+ accepts milliseconds, causing poll to block for 1000 times as long as it should. --- .../src/main/scala/akka/zeromq/ConcurrentSocketActor.scala | 2 +- akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala | 5 +++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala index e1b1ba4ddf..bc3a9c27df 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala @@ -176,7 +176,7 @@ private[zeromq] class ConcurrentSocketActor(params: Seq[SocketOption]) extends A val duration = fromConfig getOrElse ZeroMQExtension(context.system).DefaultPollTimeout if (duration > Duration.Zero) { (msg: PollMsg) ⇒ // for positive timeout values, do poll (i.e. block this thread) - poller.poll(duration.toMicros) + ZeroMQExtension(context.system).poll(poller, duration) self ! msg } else { val d = -duration diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala index 4bf52a41e3..70ef399f53 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala @@ -4,6 +4,7 @@ package akka.zeromq import org.zeromq.{ ZMQ ⇒ JZMQ } +import org.zeromq.ZMQ.Poller import akka.actor._ import akka.dispatch.{ Await } import akka.pattern.ask @@ -47,6 +48,10 @@ class ZeroMQExtension(system: ActorSystem) extends Extension { val DefaultPollTimeout: Duration = Duration(system.settings.config.getMilliseconds("akka.zeromq.poll-timeout"), TimeUnit.MILLISECONDS) val NewSocketTimeout: Timeout = Timeout(Duration(system.settings.config.getMilliseconds("akka.zeromq.new-socket-timeout"), TimeUnit.MILLISECONDS)) + val poll = + if (version.major >= 3) (poller: Poller, duration: Duration) ⇒ poller.poll(duration.toMillis) + else (poller: Poller, duration: Duration) ⇒ poller.poll(duration.toMicros) + /** * The version of the ZeroMQ library * @return a [[akka.zeromq.ZeroMQVersion]]