Fixed race condition in actor creation in *ActorRefProvider

This commit is contained in:
Jonas Bonér 2011-09-27 10:40:27 +02:00
parent 00d3b87586
commit ce797447a7
4 changed files with 247 additions and 149 deletions

View file

@ -1,121 +1,121 @@
package akka.actor
import org.scalatest.junit.JUnitSuite
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import org.junit.Test
import Actor._
import org.scalatest.Assertions._
import java.util.concurrent.{ ConcurrentLinkedQueue, CyclicBarrier, TimeUnit, CountDownLatch }
import akka.dispatch.Future
object ActorRegistrySpec {
class TestActor extends Actor {
def receive = {
case "ping"
reply("got ping")
}
}
class TestActor2 extends Actor {
def receive = {
case "ping"
reply("got ping")
case "ping2"
reply("got ping")
}
}
}
class ActorRegistrySpec extends JUnitSuite with BeforeAndAfterAll {
import ActorRegistrySpec._
override def afterAll = {
akka.event.EventHandler.start()
}
@Test
def shouldGetActorByAddressFromActorRegistry {
Actor.registry.local.shutdownAll
val actor1 = actorOf[TestActor]("test-actor-1")
val actor2 = Actor.registry.actorFor(actor1.address)
assert(actor2.isDefined)
assert(actor2.get.address === actor1.address)
assert(actor2.get.address === "test-actor-1")
actor2.get.stop
assert(Actor.registry.actorFor(actor1.address).isEmpty)
}
@Test
def shouldGetActorByUUIDFromLocalActorRegistry {
Actor.registry.local.shutdownAll
val actor = actorOf[TestActor]("test-actor-1")
val uuid = actor.uuid
val actorOrNone = Actor.registry.local.actorFor(uuid)
assert(actorOrNone.isDefined)
assert(actorOrNone.get.uuid === uuid)
assert(actorOrNone.get.address === "test-actor-1")
actor.stop
assert(Actor.registry.local.actorFor(uuid).isEmpty)
}
@Test
def shouldFindThingsFromLocalActorRegistry {
Actor.registry.local.shutdownAll
val actor = actorOf[TestActor]("test-actor-1")
val found: Option[LocalActorRef] = Actor.registry.local.find({ case a: LocalActorRef if a.underlyingActorInstance.isInstanceOf[TestActor] a })
assert(found.isDefined)
assert(found.get.underlyingActorInstance.isInstanceOf[TestActor])
assert(found.get.address === "test-actor-1")
actor.stop
}
@Test
def shouldGetAllActorsFromLocalActorRegistry {
Actor.registry.local.shutdownAll
val actor1 = actorOf[TestActor]("test-actor-1")
val actor2 = actorOf[TestActor]("test-actor-2")
val actors = Actor.registry.local.actors
assert(actors.size === 2)
assert(actors.find(_.address == "test-actor-2").get.asInstanceOf[LocalActorRef].underlyingActorInstance.isInstanceOf[TestActor])
assert(actors.find(_.address == "test-actor-1").get.asInstanceOf[LocalActorRef].underlyingActorInstance.isInstanceOf[TestActor])
actor1.stop
actor2.stop
}
@Test
def shouldGetResponseByAllActorsInLocalActorRegistryWhenInvokingForeach {
Actor.registry.local.shutdownAll
val actor1 = actorOf[TestActor]("test-actor-1")
val actor2 = actorOf[TestActor]("test-actor-2")
val results = new ConcurrentLinkedQueue[Future[String]]
Actor.registry.local.foreach(actor results.add(actor.?("ping").mapTo[String]))
assert(results.size === 2)
val i = results.iterator
while (i.hasNext) assert(i.next.get === "got ping")
actor1.stop()
actor2.stop()
}
@Test
def shouldShutdownAllActorsInLocalActorRegistry {
Actor.registry.local.shutdownAll
val actor1 = actorOf[TestActor]("test-actor-1")
val actor2 = actorOf[TestActor]("test-actor-2")
Actor.registry.local.shutdownAll
assert(Actor.registry.local.actors.size === 0)
}
@Test
def shouldRemoveUnregisterActorInLocalActorRegistry {
Actor.registry.local.shutdownAll
val actor1 = actorOf[TestActor]("test-actor-1")
val actor2 = actorOf[TestActor]("test-actor-2")
assert(Actor.registry.local.actors.size === 2)
Actor.registry.unregister(actor1)
assert(Actor.registry.local.actors.size === 1)
Actor.registry.unregister(actor2)
assert(Actor.registry.local.actors.size === 0)
}
}
// package akka.actor
//
// import org.scalatest.junit.JUnitSuite
// import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
// import org.junit.Test
// import Actor._
// import org.scalatest.Assertions._
// import java.util.concurrent.{ ConcurrentLinkedQueue, CyclicBarrier, TimeUnit, CountDownLatch }
// import akka.dispatch.Future
//
// object ActorRegistrySpec {
// class TestActor extends Actor {
// def receive = {
// case "ping"
// reply("got ping")
// }
// }
//
// class TestActor2 extends Actor {
// def receive = {
// case "ping"
// reply("got ping")
// case "ping2"
// reply("got ping")
// }
// }
// }
//
// class ActorRegistrySpec extends JUnitSuite with BeforeAndAfterAll {
// import ActorRegistrySpec._
//
// override def afterAll = {
// akka.event.EventHandler.start()
// }
//
// @Test
// def shouldGetActorByAddressFromActorRegistry {
// Actor.registry.local.shutdownAll
// val actor1 = actorOf[TestActor]("test-actor-1")
// val actor2 = Actor.registry.actorFor(actor1.address)
// assert(actor2.isDefined)
// assert(actor2.get.address === actor1.address)
// assert(actor2.get.address === "test-actor-1")
// actor2.get.stop
// assert(Actor.registry.actorFor(actor1.address).isEmpty)
// }
//
// @Test
// def shouldGetActorByUUIDFromLocalActorRegistry {
// Actor.registry.local.shutdownAll
// val actor = actorOf[TestActor]("test-actor-1")
// val uuid = actor.uuid
// val actorOrNone = Actor.registry.local.actorFor(uuid)
// assert(actorOrNone.isDefined)
// assert(actorOrNone.get.uuid === uuid)
// assert(actorOrNone.get.address === "test-actor-1")
// actor.stop
// assert(Actor.registry.local.actorFor(uuid).isEmpty)
// }
//
// @Test
// def shouldFindThingsFromLocalActorRegistry {
// Actor.registry.local.shutdownAll
// val actor = actorOf[TestActor]("test-actor-1")
// val found: Option[LocalActorRef] = Actor.registry.local.find({ case a: LocalActorRef if a.underlyingActorInstance.isInstanceOf[TestActor] a })
// assert(found.isDefined)
// assert(found.get.underlyingActorInstance.isInstanceOf[TestActor])
// assert(found.get.address === "test-actor-1")
// actor.stop
// }
//
// @Test
// def shouldGetAllActorsFromLocalActorRegistry {
// Actor.registry.local.shutdownAll
// val actor1 = actorOf[TestActor]("test-actor-1")
// val actor2 = actorOf[TestActor]("test-actor-2")
// val actors = Actor.registry.local.actors
// assert(actors.size === 2)
// assert(actors.find(_.address == "test-actor-2").get.asInstanceOf[LocalActorRef].underlyingActorInstance.isInstanceOf[TestActor])
// assert(actors.find(_.address == "test-actor-1").get.asInstanceOf[LocalActorRef].underlyingActorInstance.isInstanceOf[TestActor])
// actor1.stop
// actor2.stop
// }
//
// @Test
// def shouldGetResponseByAllActorsInLocalActorRegistryWhenInvokingForeach {
// Actor.registry.local.shutdownAll
// val actor1 = actorOf[TestActor]("test-actor-1")
// val actor2 = actorOf[TestActor]("test-actor-2")
// val results = new ConcurrentLinkedQueue[Future[String]]
//
// Actor.registry.local.foreach(actor results.add(actor.?("ping").mapTo[String]))
//
// assert(results.size === 2)
// val i = results.iterator
// while (i.hasNext) assert(i.next.get === "got ping")
// actor1.stop()
// actor2.stop()
// }
//
// @Test
// def shouldShutdownAllActorsInLocalActorRegistry {
// Actor.registry.local.shutdownAll
// val actor1 = actorOf[TestActor]("test-actor-1")
// val actor2 = actorOf[TestActor]("test-actor-2")
// Actor.registry.local.shutdownAll
// assert(Actor.registry.local.actors.size === 0)
// }
//
// @Test
// def shouldRemoveUnregisterActorInLocalActorRegistry {
// Actor.registry.local.shutdownAll
// val actor1 = actorOf[TestActor]("test-actor-1")
// val actor2 = actorOf[TestActor]("test-actor-2")
// assert(Actor.registry.local.actors.size === 2)
// Actor.registry.unregister(actor1)
// assert(Actor.registry.local.actors.size === 1)
// Actor.registry.unregister(actor2)
// assert(Actor.registry.local.actors.size === 0)
// }
// }

View file

@ -0,0 +1,75 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import akka.testkit._
import akka.util.duration._
import akka.testkit.Testing.sleepFor
import akka.actor.Actor._
import java.util.concurrent.{ TimeUnit, CountDownLatch }
object LocalActorRefProviderSpec {
class NewActor extends Actor {
def receive = {
case _ {}
}
}
}
class LocalActorRefProviderSpec extends WordSpec with MustMatchers {
import akka.actor.LocalActorRefProviderSpec._
"An LocalActorRefProvider" must {
"only create one instance of an actor with a specific address in a concurrent environment" in {
val provider = new LocalActorRefProvider
for (i 0 until 100) { // 100 concurrent runs
spawn {
val latch = new CountDownLatch(4)
var a1: Option[ActorRef] = None
var a2: Option[ActorRef] = None
var a3: Option[ActorRef] = None
var a4: Option[ActorRef] = None
val address = "new-actor" + i
spawn {
a1 = provider.actorOf(Props(creator = () new NewActor), address, false)
latch.countDown()
}
spawn {
a2 = provider.actorOf(Props(creator = () new NewActor), address, false)
latch.countDown()
}
spawn {
a3 = provider.actorOf(Props(creator = () new NewActor), address, false)
latch.countDown()
}
spawn {
a4 = provider.actorOf(Props(creator = () new NewActor), address, false)
latch.countDown()
}
latch.await(5, TimeUnit.SECONDS) must be === true
a1.isDefined must be(true)
a2.isDefined must be(true)
a3.isDefined must be(true)
a4.isDefined must be(true)
(a1 == a2) must be(true)
(a1 == a2) must be(true)
(a1 == a4) must be(true)
}
}
}
}
}

View file

@ -102,6 +102,11 @@ private[akka] class ActorRefProviders(
* Local ActorRef provider.
*/
class LocalActorRefProvider extends ActorRefProvider {
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise
// FIXME who evicts this registry, and when? Should it be used instead of ActorRegistry?
private val actors = new ConcurrentHashMap[String, Promise[Option[ActorRef]]]
def actorOf(props: Props, address: String): Option[ActorRef] = actorOf(props, address, false)
@ -110,20 +115,24 @@ class LocalActorRefProvider extends ActorRefProvider {
private[akka] def actorOf(props: Props, address: String, systemService: Boolean): Option[ActorRef] = {
Address.validate(address)
Actor.registry.actorFor(address) match { // check if the actor for the address is already in the registry
case ref @ Some(_) ref // it is -> return it
val newFuture = Promise[Option[ActorRef]](5000) // FIXME is this proper timeout?
val oldFuture = actors.putIfAbsent(address, newFuture)
case None // it is not -> create it
if (oldFuture eq null) { // we won the race -- create the actor and resolve the future
def newActor() = Some(new LocalActorRef(props, address, systemService))
//WARNING FIXME HUGE RACE CONDITION THAT NEEDS GETTING FIXED
val actor =
Deployer.lookupDeploymentFor(address) match { // see if the deployment already exists, if so use it, if not create actor
case Some(Deploy(_, _, router, _, LocalScope))
// FIXME create RoutedActorRef if 'router' is specified
Some(new LocalActorRef(props, address, systemService)) // create a local actor
case deploy None // non-local actor
case Some(Deploy(_, _, router, _, LocalScope)) newActor() // create a local actor
case None newActor() // create a local actor
case _ None // non-local actor
}
newFuture.completeWithResult(actor)
actor
} else { // we lost the race -- wait for future to complete
oldFuture.await.result.getOrElse(None)
}
}
}

View file

@ -28,14 +28,22 @@ import com.google.protobuf.ByteString
*/
class RemoteActorRefProvider extends ActorRefProvider {
import java.util.concurrent.ConcurrentHashMap
import akka.dispatch.Promise
// FIXME who evicts this registry, and when? Should it be used instead of ActorRegistry?
private val actors = new ConcurrentHashMap[String, Promise[Option[ActorRef]]]
private val failureDetector = new BannagePeriodFailureDetector(timeToBan = 60 seconds) // FIXME make timeToBan configurable
def actorOf(props: Props, address: String): Option[ActorRef] = {
Address.validate(address)
val actorRef = Actor.remote.actors.get(address)
if (actorRef ne null) Some(actorRef)
else {
val newFuture = Promise[Option[ActorRef]](5000) // FIXME is this proper timeout?
val oldFuture = actors.putIfAbsent(address, newFuture)
if (oldFuture eq null) { // we won the race -- create the actor and resolve the future
val actor =
Deployer.lookupDeploymentFor(address) match {
case Some(Deploy(_, _, router, _, RemoteScope(host, port)))
// FIXME create RoutedActorRef if 'router' is specified
@ -53,6 +61,12 @@ class RemoteActorRefProvider extends ActorRefProvider {
case deploy None // non-remote actor
}
newFuture.completeWithResult(actor)
actor
} else { // we lost the race -- wait for future to complete
oldFuture.await.result.getOrElse(None)
}
}