diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala index 735b7f6f83..c1b74b664d 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ConcurrentSocketActor.scala @@ -6,8 +6,7 @@ package akka.zeromq import org.zeromq.ZMQ.{ Socket, Poller } import org.zeromq.{ ZMQ ⇒ JZMQ } import akka.actor._ -import akka.dispatch.{ Await, Promise, Dispatchers, Future } -import akka.util.duration._ +import akka.dispatch.{ Promise, Future } private[zeromq] sealed trait PollLifeCycle private[zeromq] case object NoResults extends PollLifeCycle @@ -136,8 +135,8 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Ac } override def postStop { - currentPoll foreach { _ complete Right(Closing) } poller.unregister(socket) + currentPoll foreach { _ complete Right(Closing) } if (socket != null) socket.close notifyListener(Closed) } @@ -159,7 +158,7 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Ac currentPoll = currentPoll orElse newEventLoop } - private def newEventLoop: Option[Promise[PollLifeCycle]] = if (poller.getSize > 0) { + private def newEventLoop: Option[Promise[PollLifeCycle]] = { implicit val executor = context.system.dispatchers.defaultGlobalDispatcher Some((Future { if (poller.poll(params.pollTimeoutDuration.toMicros) > 0 && poller.pollin(0)) Results else NoResults @@ -175,7 +174,7 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Ac if (!self.isTerminated) self ! 'poll } }) - } else None + } private def receiveFrames(): Seq[Frame] = { diff --git a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala index 9552554a19..ddf0508ecb 100644 --- a/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala +++ b/akka-zeromq/src/test/scala/akka/zeromq/ConcurrentSocketActorSpec.scala @@ -24,7 +24,10 @@ akka { """ } -class ConcurrentSocketActorSpec extends AkkaSpec(ConcurrentSocketActorSpec.config) with MustMatchers with DefaultTimeout { +class ConcurrentSocketActorSpec + extends AkkaSpec(ConcurrentSocketActorSpec.config) + with MustMatchers + with DefaultTimeout { val endpoint = "tcp://127.0.0.1:%s" format FreePort.randomFreePort() @@ -38,6 +41,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec(ConcurrentSocketActorSpec.confi val publisher = newPublisher(context, publisherProbe.ref) val subscriber = newSubscriber(context, subscriberProbe.ref) val msgGenerator = newMessageGenerator(publisher) + try { subscriberProbe.expectMsg(Connecting) val msgNumbers = subscriberProbe.receiveWhile(2 seconds) { @@ -85,6 +89,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec(ConcurrentSocketActorSpec.confi } def newMessageGenerator(actorRef: ActorRef) = { system.actorOf(Props(new MessageGeneratorActor(actorRef)).withTimeout(Timeout(10 millis))) + } def checkZeroMQInstallation = try { zmq.version match { @@ -109,7 +114,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec(ConcurrentSocketActorSpec.confi private var genMessages: Cancellable = null override def preStart() = { - genMessages = system.scheduler.schedule(10 millis, 10 millis, self, 'm) + genMessages = system.scheduler.schedule(100 millis, 10 millis, self, 'm) } override def postStop() = {