Attempt to avoid some race condition in zeromq initialization

This commit is contained in:
Ivan Porto Carrero 2012-01-14 12:13:46 +01:00
parent 448ccadb7c
commit d2fa14abc3
2 changed files with 11 additions and 7 deletions

View file

@ -6,8 +6,7 @@ package akka.zeromq
import org.zeromq.ZMQ.{ Socket, Poller }
import org.zeromq.{ ZMQ JZMQ }
import akka.actor._
import akka.dispatch.{ Await, Promise, Dispatchers, Future }
import akka.util.duration._
import akka.dispatch.{ Promise, Future }
private[zeromq] sealed trait PollLifeCycle
private[zeromq] case object NoResults extends PollLifeCycle
@ -136,8 +135,8 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Ac
}
override def postStop {
currentPoll foreach { _ complete Right(Closing) }
poller.unregister(socket)
currentPoll foreach { _ complete Right(Closing) }
if (socket != null) socket.close
notifyListener(Closed)
}
@ -159,7 +158,7 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Ac
currentPoll = currentPoll orElse newEventLoop
}
private def newEventLoop: Option[Promise[PollLifeCycle]] = if (poller.getSize > 0) {
private def newEventLoop: Option[Promise[PollLifeCycle]] = {
implicit val executor = context.system.dispatchers.defaultGlobalDispatcher
Some((Future {
if (poller.poll(params.pollTimeoutDuration.toMicros) > 0 && poller.pollin(0)) Results else NoResults
@ -175,7 +174,7 @@ private[zeromq] class ConcurrentSocketActor(params: SocketParameters) extends Ac
if (!self.isTerminated) self ! 'poll
}
})
} else None
}
private def receiveFrames(): Seq[Frame] = {

View file

@ -24,7 +24,10 @@ akka {
"""
}
class ConcurrentSocketActorSpec extends AkkaSpec(ConcurrentSocketActorSpec.config) with MustMatchers with DefaultTimeout {
class ConcurrentSocketActorSpec
extends AkkaSpec(ConcurrentSocketActorSpec.config)
with MustMatchers
with DefaultTimeout {
val endpoint = "tcp://127.0.0.1:%s" format FreePort.randomFreePort()
@ -38,6 +41,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec(ConcurrentSocketActorSpec.confi
val publisher = newPublisher(context, publisherProbe.ref)
val subscriber = newSubscriber(context, subscriberProbe.ref)
val msgGenerator = newMessageGenerator(publisher)
try {
subscriberProbe.expectMsg(Connecting)
val msgNumbers = subscriberProbe.receiveWhile(2 seconds) {
@ -85,6 +89,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec(ConcurrentSocketActorSpec.confi
}
def newMessageGenerator(actorRef: ActorRef) = {
system.actorOf(Props(new MessageGeneratorActor(actorRef)).withTimeout(Timeout(10 millis)))
}
def checkZeroMQInstallation = try {
zmq.version match {
@ -109,7 +114,7 @@ class ConcurrentSocketActorSpec extends AkkaSpec(ConcurrentSocketActorSpec.confi
private var genMessages: Cancellable = null
override def preStart() = {
genMessages = system.scheduler.schedule(10 millis, 10 millis, self, 'm)
genMessages = system.scheduler.schedule(100 millis, 10 millis, self, 'm)
}
override def postStop() = {