diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala index 43111cc659..f21f49b023 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/TypedActorSpec.scala @@ -11,7 +11,7 @@ import org.scalatest.{ BeforeAndAfterAll, WordSpec, BeforeAndAfterEach } import akka.actor.TypedActor._ import akka.japi.{ Option ⇒ JOption } import akka.util.Duration -import akka.dispatch.{ Dispatchers, Future, AlreadyCompletedFuture } +import akka.dispatch.{ Dispatchers, Future, KeptPromise } import akka.routing.CyclicIterator object TypedActorSpec { @@ -43,7 +43,7 @@ object TypedActorSpec { def pigdog = "Pigdog" - def futurePigdog(): Future[String] = new AlreadyCompletedFuture(Right(pigdog)) + def futurePigdog(): Future[String] = new KeptPromise(Right(pigdog)) def futurePigdog(delay: Long): Future[String] = { Thread.sleep(delay) futurePigdog @@ -51,7 +51,7 @@ object TypedActorSpec { def futurePigdog(delay: Long, numbered: Int): Future[String] = { Thread.sleep(delay) - new AlreadyCompletedFuture(Right(pigdog + numbered)) + new KeptPromise(Right(pigdog + numbered)) } def futureComposePigdogFrom(foo: Foo): Future[String] = @@ -264,7 +264,7 @@ class TypedActorSpec extends WordSpec with MustMatchers with BeforeAndAfterEach "be able to use work-stealing dispatcher" in { val config = Configuration( Duration(6600, "ms"), - Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher") + Dispatchers.newBalancingDispatcher("pooled-dispatcher") .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity .setCorePoolSize(60) .setMaxPoolSize(60) diff --git a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorMiscSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorMiscSpec.scala index c81455b9e8..6b1017e79d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorMiscSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/supervisor/SupervisorMiscSpec.scala @@ -16,7 +16,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers { val countDownLatch = new CountDownLatch(4) val actor1 = Actor.actorOf(new Actor { - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) + self.dispatcher = Dispatchers.newPinnedDispatcher(self) override def postRestart(cause: Throwable) { countDownLatch.countDown() } protected def receive = { @@ -26,7 +26,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers { }).start() val actor2 = Actor.actorOf(new Actor { - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) + self.dispatcher = Dispatchers.newPinnedDispatcher(self) override def postRestart(cause: Throwable) { countDownLatch.countDown() } protected def receive = { @@ -36,7 +36,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers { }).start() val actor3 = Actor.actorOf(new Actor { - self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("test").build + self.dispatcher = Dispatchers.newDispatcher("test").build override def postRestart(cause: Throwable) { countDownLatch.countDown() } protected def receive = { @@ -46,7 +46,7 @@ class SupervisorMiscSpec extends WordSpec with MustMatchers { }).start() val actor4 = Actor.actorOf(new Actor { - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) + self.dispatcher = Dispatchers.newPinnedDispatcher(self) override def postRestart(cause: Throwable) { countDownLatch.countDown() } protected def receive = { diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 916852a7b4..66d21435f4 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -20,7 +20,7 @@ class ConfigSpec extends WordSpec with MustMatchers { getString("akka.time-unit") must equal(Some("seconds")) getString("akka.version") must equal(Some("2.0-SNAPSHOT")) - getString("akka.actor.default-dispatcher.type") must equal(Some("GlobalExecutorBasedEventDriven")) + getString("akka.actor.default-dispatcher.type") must equal(Some("GlobalDispatcher")) getInt("akka.actor.default-dispatcher.keep-alive-time") must equal(Some(60)) getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(Some(1.0)) getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(Some(4.0)) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala index c6af345d68..97dc67da22 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -344,12 +344,12 @@ abstract class ActorModelSpec extends JUnitSuite { } } -class ExecutorBasedEventDrivenDispatcherModelTest extends ActorModelSpec { +class DispatcherModelTest extends ActorModelSpec { def newInterceptedDispatcher = - new ExecutorBasedEventDrivenDispatcher("foo") with MessageDispatcherInterceptor + new Dispatcher("foo") with MessageDispatcherInterceptor } -class ExecutorBasedEventDrivenWorkStealingDispatcherModelTest extends ActorModelSpec { +class BalancingDispatcherModelTest extends ActorModelSpec { def newInterceptedDispatcher = - new ExecutorBasedEventDrivenWorkStealingDispatcher("foo") with MessageDispatcherInterceptor + new BalancingDispatcher("foo") with MessageDispatcherInterceptor } diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/DispatchersSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/DispatchersSpec.scala index cc75c5d43a..f7df14e195 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/DispatchersSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/DispatchersSpec.scala @@ -21,15 +21,15 @@ object DispatchersSpec { val executorbounds = "executor-bounds" val allowcoretimeout = "allow-core-timeout" val rejectionpolicy = "rejection-policy" // abort, caller-runs, discard-oldest, discard - val throughput = "throughput" // Throughput for ExecutorBasedEventDrivenDispatcher + val throughput = "throughput" // Throughput for Dispatcher def instance(dispatcher: MessageDispatcher): (MessageDispatcher) ⇒ Boolean = _ == dispatcher def ofType[T <: MessageDispatcher: Manifest]: (MessageDispatcher) ⇒ Boolean = _.getClass == manifest[T].erasure def typesAndValidators: Map[String, (MessageDispatcher) ⇒ Boolean] = Map( - "ExecutorBasedEventDrivenWorkStealing" -> ofType[ExecutorBasedEventDrivenWorkStealingDispatcher], - "ExecutorBasedEventDriven" -> ofType[ExecutorBasedEventDrivenDispatcher], - "GlobalExecutorBasedEventDriven" -> instance(globalExecutorBasedEventDrivenDispatcher)) + "BalancingDispatcher" -> ofType[BalancingDispatcher], + "Dispatcher" -> ofType[Dispatcher], + "GlobalDispatcher" -> instance(globalDispatcher)) def validTypes = typesAndValidators.keys.toList diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala index c330d948d3..69fc9ea635 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcherActorSpec.scala @@ -3,14 +3,14 @@ package akka.actor.dispatch import java.util.concurrent.{ CountDownLatch, TimeUnit } import org.scalatest.junit.JUnitSuite import org.junit.Test -import akka.dispatch.{ Dispatchers, ExecutorBasedEventDrivenDispatcher } +import akka.dispatch.{ Dispatchers, Dispatcher } import akka.actor.Actor import Actor._ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } -object ExecutorBasedEventDrivenDispatcherActorSpec { +object DispatcherActorSpec { class TestActor extends Actor { - self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString).build + self.dispatcher = Dispatchers.newDispatcher(self.uuid.toString).build def receive = { case "Hello" ⇒ self.reply("World") @@ -23,14 +23,14 @@ object ExecutorBasedEventDrivenDispatcherActorSpec { val oneWay = new CountDownLatch(1) } class OneWayTestActor extends Actor { - self.dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher(self.uuid.toString).build + self.dispatcher = Dispatchers.newDispatcher(self.uuid.toString).build def receive = { case "OneWay" ⇒ OneWayTestActor.oneWay.countDown() } } } -class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite { - import ExecutorBasedEventDrivenDispatcherActorSpec._ +class DispatcherActorSpec extends JUnitSuite { + import DispatcherActorSpec._ private val unit = TimeUnit.MILLISECONDS @@ -74,7 +74,7 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite { @Test def shouldRespectThroughput { val throughputDispatcher = Dispatchers. - newExecutorBasedEventDrivenDispatcher("THROUGHPUT", 101, 0, Dispatchers.MAILBOX_TYPE). + newDispatcher("THROUGHPUT", 101, 0, Dispatchers.MAILBOX_TYPE). setCorePoolSize(1). build @@ -110,7 +110,7 @@ class ExecutorBasedEventDrivenDispatcherActorSpec extends JUnitSuite { def shouldRespectThroughputDeadline { val deadlineMs = 100 val throughputDispatcher = Dispatchers. - newExecutorBasedEventDrivenDispatcher("THROUGHPUT", 2, deadlineMs, Dispatchers.MAILBOX_TYPE). + newDispatcher("THROUGHPUT", 2, deadlineMs, Dispatchers.MAILBOX_TYPE). setCorePoolSize(1). build val works = new AtomicBoolean(true) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcherActorsSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcherActorsSpec.scala index d636ecc932..f7cc4a0956 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcherActorsSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcherActorsSpec.scala @@ -12,7 +12,7 @@ import Actor._ * * @author Jan Van Besien */ -class ExecutorBasedEventDrivenDispatcherActorsSpec extends JUnitSuite with MustMatchers { +class DispatcherActorsSpec extends JUnitSuite with MustMatchers { class SlowActor(finishedCounter: CountDownLatch) extends Actor { def receive = { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala index 064433c6ca..9ce2c9dccd 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcherSpec.scala @@ -10,9 +10,9 @@ import akka.actor.{ IllegalActorStateException, Actor } import Actor._ import akka.dispatch.{ MessageQueue, Dispatchers } -object ExecutorBasedEventDrivenWorkStealingDispatcherSpec { +object BalancingDispatcherSpec { - def newWorkStealer() = Dispatchers.newExecutorBasedEventDrivenWorkStealingDispatcher("pooled-dispatcher", 1).build + def newWorkStealer() = Dispatchers.newBalancingDispatcher("pooled-dispatcher", 1).build val delayableActorDispatcher, sharedActorDispatcher, parentActorDispatcher = newWorkStealer() @@ -52,8 +52,8 @@ object ExecutorBasedEventDrivenWorkStealingDispatcherSpec { /** * @author Jan Van Besien */ -class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with MustMatchers { - import ExecutorBasedEventDrivenWorkStealingDispatcherSpec._ +class BalancingDispatcherSpec extends JUnitSuite with MustMatchers { + import BalancingDispatcherSpec._ @Test def fastActorShouldStealWorkFromSlowActor { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 773a68ef89..e71ca14721 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -68,7 +68,7 @@ abstract class MailboxSpec extends WordSpec with MustMatchers with BeforeAndAfte //CANDIDATE FOR TESTKIT def spawn[T <: AnyRef](fun: ⇒ T)(implicit within: Duration): Future[T] = { - val result = new DefaultCompletableFuture[T](within.length, within.unit) + val result = new DefaultPromise[T](within.length, within.unit) val t = new Thread(new Runnable { def run = try { result.completeWithResult(fun) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index ff2e85fb05..5740edb9b8 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -8,7 +8,7 @@ import java.util.concurrent.CountDownLatch class PriorityDispatcherSpec extends WordSpec with MustMatchers { - "A PriorityExecutorBasedEventDrivenDispatcher" must { + "A PriorityDispatcher" must { "Order it's messages according to the specified comparator using an unbounded mailbox" in { testOrdering(UnboundedMailbox()) } @@ -19,7 +19,7 @@ class PriorityDispatcherSpec extends WordSpec with MustMatchers { } def testOrdering(mboxType: MailboxType) { - val dispatcher = new PriorityExecutorBasedEventDrivenDispatcher("Test", + val dispatcher = new PriorityDispatcher("Test", PriorityGenerator({ case i: Int ⇒ i //Reverse order case 'Result ⇒ Int.MaxValue diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ThreadBasedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ThreadBasedActorSpec.scala index fd127c1a14..722c48fbdc 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ThreadBasedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ThreadBasedActorSpec.scala @@ -10,7 +10,7 @@ import Actor._ object ThreadBasedActorSpec { class TestActor extends Actor { - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) + self.dispatcher = Dispatchers.newPinnedDispatcher(self) def receive = { case "Hello" ⇒ @@ -30,7 +30,7 @@ class ThreadBasedActorSpec extends JUnitSuite { def shouldSendOneWay { var oneWay = new CountDownLatch(1) val actor = actorOf(new Actor { - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) + self.dispatcher = Dispatchers.newPinnedDispatcher(self) def receive = { case "OneWay" ⇒ oneWay.countDown() } diff --git a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala b/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala index 6214c84da3..df6184c292 100644 --- a/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/misc/ActorRegistrySpec.scala @@ -3,15 +3,14 @@ package akka.actor import org.scalatest.junit.JUnitSuite import org.junit.Test import Actor._ -import java.util.concurrent.{ CyclicBarrier, TimeUnit, CountDownLatch } import org.scalatest.Assertions._ +import java.util.concurrent.{ ConcurrentLinkedQueue, CyclicBarrier, TimeUnit, CountDownLatch } +import akka.dispatch.Future object ActorRegistrySpec { - var record = "" class TestActor extends Actor { def receive = { case "ping" ⇒ - record = "pong" + record self.reply("got ping") } } @@ -19,10 +18,8 @@ object ActorRegistrySpec { class TestActor2 extends Actor { def receive = { case "ping" ⇒ - record = "pong" + record self.reply("got ping") case "ping2" ⇒ - record = "pong" + record self.reply("got ping") } } @@ -41,6 +38,7 @@ class ActorRegistrySpec extends JUnitSuite { assert(actor2.get.address === actor1.address) assert(actor2.get.address === "test-actor-1") actor2.get.stop + assert(Actor.registry.actorFor(actor1.address).isEmpty) } @Test @@ -54,6 +52,7 @@ class ActorRegistrySpec extends JUnitSuite { assert(actorOrNone.get.uuid === uuid) assert(actorOrNone.get.address === "test-actor-1") actor.stop + assert(Actor.registry.local.actorFor(uuid).isEmpty) } @Test @@ -71,10 +70,8 @@ class ActorRegistrySpec extends JUnitSuite { @Test def shouldGetAllActorsFromLocalActorRegistry { Actor.registry.local.shutdownAll - val actor1 = actorOf[TestActor]("test-actor-1") - actor1.start - val actor2 = actorOf[TestActor]("test-actor-2") - actor2.start + val actor1 = actorOf[TestActor]("test-actor-1").start + val actor2 = actorOf[TestActor]("test-actor-2").start val actors = Actor.registry.local.actors assert(actors.size === 2) assert(actors.head.actor.isInstanceOf[TestActor]) @@ -88,13 +85,15 @@ class ActorRegistrySpec extends JUnitSuite { @Test def shouldGetResponseByAllActorsInLocalActorRegistryWhenInvokingForeach { Actor.registry.local.shutdownAll - val actor1 = actorOf[TestActor]("test-actor-1") - actor1.start - val actor2 = actorOf[TestActor]("test-actor-2") - actor2.start - record = "" - Actor.registry.local.foreach(actor ⇒ actor !! "ping") - assert(record === "pongpong") + val actor1 = actorOf[TestActor]("test-actor-1").start + val actor2 = actorOf[TestActor]("test-actor-2").start + val results = new ConcurrentLinkedQueue[Future[String]] + + Actor.registry.local.foreach(actor ⇒ results.add(actor.!!)) + + assert(results.size === 2) + val i = results.iterator + while (i.hasNext) assert(i.next.get === "got ping") actor1.stop() actor2.stop() } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 79a768f85e..f41f2b46ae 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -160,7 +160,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal /** * Akka Java API.
- * The default dispatcher is the Dispatchers.globalExecutorBasedEventDrivenDispatcher. + * The default dispatcher is the Dispatchers.globalDispatcher. * This means that all actors will share the same event-driven executor based dispatcher. * * You can override it so it fits the specific use-case that the actor is used for. @@ -208,7 +208,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal * The reference sender future of the last received message. * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ - def getSenderFuture: Option[CompletableFuture[Any]] = senderFuture + def getSenderFuture: Option[Promise[Any]] = senderFuture /** * Is the actor being restarted? @@ -482,7 +482,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] + senderFuture: Option[Promise[T]]): Promise[T] protected[akka] def actorInstance: AtomicReference[Actor] @@ -698,10 +698,10 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { - val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultCompletableFuture[T](timeout)) + senderFuture: Option[Promise[T]]): Promise[T] = { + val future = if (senderFuture.isDefined) senderFuture else Some(new DefaultPromise[T](timeout)) dispatcher dispatchMessage new MessageInvocation( - this, message, senderOption, future.asInstanceOf[Some[CompletableFuture[Any]]]) + this, message, senderOption, future.asInstanceOf[Some[Promise[Any]]]) future.get } @@ -1020,7 +1020,7 @@ private[akka] case class RemoteActorRef private[akka] ( message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = { + senderFuture: Option[Promise[T]]): Promise[T] = { val future = Actor.remote.send[T]( message, senderOption, senderFuture, remoteAddress, timeout, false, this, loader) @@ -1155,7 +1155,7 @@ trait ScalaActorRef extends ActorRefShared { ref: ActorRef ⇒ * The reference sender future of the last received message. * Is defined if the message was sent with sent with '!!' or '!!!', else None. */ - def senderFuture(): Option[CompletableFuture[Any]] = { + def senderFuture(): Option[Promise[Any]] = { val msg = currentMessage if (msg eq null) None else msg.senderFuture diff --git a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala index c6710d60ff..f3c38887fb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRegistry.scala @@ -34,7 +34,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag //private val isClusterEnabled = ReflectiveAccess.isClusterEnabled private val actorsByAddress = new ConcurrentHashMap[String, ActorRef] - private val actorsByUuid = new ConcurrentHashMap[String, ActorRef] + private val actorsByUuid = new ConcurrentHashMap[Uuid, ActorRef] private val typedActorsByUuid = new ConcurrentHashMap[Uuid, AnyRef] private val guard = new ReadWriteGuard @@ -66,7 +66,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag // throw new IllegalStateException("Actor 'address' [" + address + "] is already in use, can't register actor [" + actor + "]") actorsByAddress.put(address, actor) - actorsByUuid.put(actor.uuid.toString, actor) + actorsByUuid.put(actor.uuid, actor) notifyListeners(ActorRegistered(address, actor)) } @@ -121,7 +121,7 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag */ class LocalActorRegistry( private val actorsByAddress: ConcurrentHashMap[String, ActorRef], - private val actorsByUuid: ConcurrentHashMap[String, ActorRef], + private val actorsByUuid: ConcurrentHashMap[Uuid, ActorRef], private val typedActorsByUuid: ConcurrentHashMap[Uuid, AnyRef]) { /** @@ -153,11 +153,8 @@ class LocalActorRegistry( /** * Finds the actor that have a specific uuid. */ - private[akka] def actorFor(uuid: Uuid): Option[ActorRef] = { - val uuidAsString = uuid.toString - if (actorsByUuid.containsKey(uuidAsString)) Some(actorsByUuid.get(uuidAsString)) - else None - } + private[akka] def actorFor(uuid: Uuid): Option[ActorRef] = + Option(actorsByUuid.get(uuid)) /** * Finds the typed actor that have a specific address. diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index d9e937487a..0fb15e7071 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -358,19 +358,11 @@ object LocalDeployer { } } - private[akka] def undeploy(deployment: Deploy) { - deployments.remove(deployment.address) - } + private[akka] def undeploy(deployment: Deploy): Unit = deployments.remove(deployment.address) - private[akka] def undeployAll() { - deployments.clear() - } + private[akka] def undeployAll(): Unit = deployments.clear() - private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = { - val deployment = deployments.get(address) - if (deployment eq null) None - else Some(deployment) - } + private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = Option(deployments.get(address)) } /** diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 839474a3ba..52a6cf7622 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -15,20 +15,23 @@ object TypedActor { private val selfReference = new ThreadLocal[AnyRef] def self[T <: AnyRef] = selfReference.get.asInstanceOf[T] - class TypedActor[TI <: AnyRef](proxyRef: AtomicReference[AnyRef], createInstance: ⇒ TI) extends Actor { - val me = createInstance - def receive = { + trait TypedActor[Iface <: AnyRef, Impl <: Iface] { self: Actor ⇒ + val proxyRef: AtomicReference[Iface] + def callMethod(methodCall: MethodCall): Unit + def receive: Receive = { case m: MethodCall ⇒ selfReference set proxyRef.get - try { - m match { - case m if m.isOneWay ⇒ m(me) - case m if m.returnsFuture_? ⇒ self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]] - case m ⇒ self reply m(me) - } - } finally { - selfReference set null - } + try { callMethod(m) } finally { selfReference set null } + } + } + + class DefaultTypedActor[Iface <: AnyRef, Impl <: Iface]( + val proxyRef: AtomicReference[Iface], createInstance: ⇒ Impl) extends TypedActor[Iface, Impl] with Actor { + val me = createInstance + def callMethod(methodCall: MethodCall): Unit = methodCall match { + case m if m.isOneWay ⇒ m(me) + case m if m.returnsFuture_? ⇒ self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]] + case m ⇒ self reply m(me) } } @@ -42,18 +45,13 @@ object TypedActor { case m if m.isOneWay ⇒ actor ! m null - case m if m.returnsJOption_? ⇒ - (actor !!! m).as[JOption[Any]] match { - case Some(null) | None ⇒ JOption.none[Any] - case Some(joption) ⇒ joption - } - case m if m.returnsOption_? ⇒ - (actor !!! m).as[AnyRef] match { - case Some(null) | None ⇒ None - case Some(option) ⇒ option - } case m if m.returnsFuture_? ⇒ actor !!! m + case m if m.returnsJOption_? || m.returnsOption_? ⇒ + (actor !!! m).as[AnyRef] match { + case Some(null) | None ⇒ if (m.returnsJOption_?) JOption.none[Any] else None + case Some(joption) ⇒ joption + } case m ⇒ (actor !!! m).get } @@ -103,12 +101,15 @@ object TypedActor { newTypedActor(clazz.getInterfaces, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader) } - protected def newTypedActor[R <: AnyRef, T <: R](interfaces: Array[Class[_]], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = { - val proxyRef = new AtomicReference[AnyRef](null) - configureAndProxyLocalActorRef[T](interfaces, proxyRef, actorOf(new TypedActor[T](proxyRef, constructor)), config, loader) + private[akka] def newTypedActor[R <: AnyRef, T <: R](interfaces: Array[Class[_]], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = + newTypedActor[R, T](interfaces, (ref: AtomicReference[R]) ⇒ new DefaultTypedActor[R, T](ref, constructor), config, loader) + + private[akka] def newTypedActor[R <: AnyRef, T <: R](interfaces: Array[Class[_]], constructor: (AtomicReference[R]) ⇒ TypedActor[R, T], config: Configuration, loader: ClassLoader): R = { + val proxyRef = new AtomicReference[R] + configureAndProxyLocalActorRef[R](interfaces, proxyRef, actorOf(constructor(proxyRef).asInstanceOf[Actor]), config, loader) } - protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[AnyRef], actor: ActorRef, config: Configuration, loader: ClassLoader): T = { + protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[T], actor: ActorRef, config: Configuration, loader: ClassLoader): T = { actor.timeout = config.timeout.toMillis actor.dispatcher = config.dispatcher diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala similarity index 96% rename from akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala rename to akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index eaf4472901..0460720a73 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -19,18 +19,18 @@ import util.DynamicVariable * The preferred way of creating dispatchers is to use * the {@link akka.dispatch.Dispatchers} factory object. * - * @see akka.dispatch.ExecutorBasedEventDrivenWorkStealingDispatcher + * @see akka.dispatch.BalancingDispatcher * @see akka.dispatch.Dispatchers * * @author Viktor Klang */ -class ExecutorBasedEventDrivenWorkStealingDispatcher( +class BalancingDispatcher( _name: String, throughput: Int = Dispatchers.THROUGHPUT, throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, config: ThreadPoolConfig = ThreadPoolConfig()) - extends ExecutorBasedEventDrivenDispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) { + extends Dispatcher(_name, throughput, throughputDeadlineTime, mailboxType, config) { def this(_name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = this(_name, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala similarity index 90% rename from akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala rename to akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 3c985e6409..ab2287a589 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -28,7 +28,7 @@ import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionExcept * * Example usage: * - * val dispatcher = new ExecutorBasedEventDrivenDispatcher("name") + * val dispatcher = new Dispatcher("name") * dispatcher * .withNewThreadPoolWithBoundedBlockingQueue(100) * .setCorePoolSize(16) @@ -43,7 +43,7 @@ import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionExcept * * Example usage: * - * ExecutorBasedEventDrivenDispatcher dispatcher = new ExecutorBasedEventDrivenDispatcher("name"); + * Dispatcher dispatcher = new Dispatcher("name"); * dispatcher * .withNewThreadPoolWithBoundedBlockingQueue(100) * .setCorePoolSize(16) @@ -63,7 +63,7 @@ import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionExcept * always continues until the mailbox is empty. * Larger values (or zero or negative) increase throughput, smaller values increase fairness */ -class ExecutorBasedEventDrivenDispatcher( +class Dispatcher( _name: String, val throughput: Int = Dispatchers.THROUGHPUT, val throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, @@ -117,7 +117,7 @@ class ExecutorBasedEventDrivenDispatcher( case b: UnboundedMailbox ⇒ new ConcurrentLinkedQueue[MessageInvocation] with MessageQueue with ExecutableMailbox { @inline - final def dispatcher = ExecutorBasedEventDrivenDispatcher.this + final def dispatcher = Dispatcher.this @inline final def enqueue(m: MessageInvocation) = this.add(m) @inline @@ -126,7 +126,7 @@ class ExecutorBasedEventDrivenDispatcher( case b: BoundedMailbox ⇒ new DefaultBoundedMessageQueue(b.capacity, b.pushTimeOut) with ExecutableMailbox { @inline - final def dispatcher = ExecutorBasedEventDrivenDispatcher.this + final def dispatcher = Dispatcher.this } } @@ -173,11 +173,11 @@ class ExecutorBasedEventDrivenDispatcher( } /** - * This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox. + * This is the behavior of an Dispatchers mailbox. */ trait ExecutableMailbox extends Runnable { self: MessageQueue ⇒ - def dispatcher: ExecutorBasedEventDrivenDispatcher + def dispatcher: Dispatcher final def run = { try { @@ -237,7 +237,7 @@ object PriorityGenerator { /** * A PriorityGenerator is a convenience API to create a Comparator that orders the messages of a - * PriorityExecutorBasedEventDrivenDispatcher + * PriorityDispatcher */ abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] { def gen(message: Any): Int @@ -247,18 +247,18 @@ abstract class PriorityGenerator extends java.util.Comparator[MessageInvocation] } /** - * A version of ExecutorBasedEventDrivenDispatcher that gives all actors registered to it a priority mailbox, + * A version of Dispatcher that gives all actors registered to it a priority mailbox, * prioritized according to the supplied comparator. * * The dispatcher will process the messages with the _lowest_ priority first. */ -class PriorityExecutorBasedEventDrivenDispatcher( +class PriorityDispatcher( name: String, val comparator: java.util.Comparator[MessageInvocation], throughput: Int = Dispatchers.THROUGHPUT, throughputDeadlineTime: Int = Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType: MailboxType = Dispatchers.MAILBOX_TYPE, - config: ThreadPoolConfig = ThreadPoolConfig()) extends ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox { + config: ThreadPoolConfig = ThreadPoolConfig()) extends Dispatcher(name, throughput, throughputDeadlineTime, mailboxType, config) with PriorityMailbox { def this(name: String, comparator: java.util.Comparator[MessageInvocation], throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType) = this(name, comparator, throughput, throughputDeadlineTime, mailboxType, ThreadPoolConfig()) // Needed for Java API usage @@ -277,14 +277,14 @@ class PriorityExecutorBasedEventDrivenDispatcher( } /** - * Can be used to give an ExecutorBasedEventDrivenDispatcher's actors priority-enabled mailboxes + * Can be used to give an Dispatcher's actors priority-enabled mailboxes * * Usage: - * new ExecutorBasedEventDrivenDispatcher(...) with PriorityMailbox { + * new Dispatcher(...) with PriorityMailbox { * val comparator = ...comparator that determines mailbox priority ordering... * } */ -trait PriorityMailbox { self: ExecutorBasedEventDrivenDispatcher ⇒ +trait PriorityMailbox { self: Dispatcher ⇒ def comparator: java.util.Comparator[MessageInvocation] override def createMailbox(actorRef: ActorRef): AnyRef = self.mailboxType match { diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 07daf658d3..963927582f 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -18,7 +18,7 @@ import java.util.concurrent.TimeUnit * * Example usage: * - * val dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name") + * val dispatcher = Dispatchers.newDispatcher("name") * dispatcher * .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100) * .setCorePoolSize(16) @@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit * * Example usage: * - * MessageDispatcher dispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("name"); + * MessageDispatcher dispatcher = Dispatchers.newDispatcher("name"); * dispatcher * .withNewThreadPoolWithLinkedBlockingQueueWithCapacity(100) * .setCorePoolSize(16) @@ -57,10 +57,10 @@ object Dispatchers { val MAILBOX_TYPE: MailboxType = if (MAILBOX_CAPACITY < 1) UnboundedMailbox() else BoundedMailbox() lazy val defaultGlobalDispatcher = { - config.getSection("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalExecutorBasedEventDrivenDispatcher) + config.getSection("akka.actor.default-dispatcher").flatMap(from).getOrElse(globalDispatcher) } - object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE) + object globalDispatcher extends Dispatcher("global", THROUGHPUT, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE) /** * Creates an thread based dispatcher serving a single actor through the same single thread. @@ -68,16 +68,10 @@ object Dispatchers { * * E.g. each actor consumes its own thread. */ - def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor) - - /** - * Creates an thread based dispatcher serving a single actor through the same single thread. - * Uses the default timeout - * If capacity is negative, it's Integer.MAX_VALUE - * - * E.g. each actor consumes its own thread. - */ - def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int) = new ThreadBasedDispatcher(actor, mailboxCapacity) + def newPinnedDispatcher(actor: ActorRef) = actor match { + case null ⇒ new PinnedDispatcher() + case some ⇒ new PinnedDispatcher(some) + } /** * Creates an thread based dispatcher serving a single actor through the same single thread. @@ -85,69 +79,87 @@ object Dispatchers { * * E.g. each actor consumes its own thread. */ - def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) = - new ThreadBasedDispatcher(actor, mailboxCapacity, pushTimeOut) + def newPinnedDispatcher(actor: ActorRef, mailboxType: MailboxType) = actor match { + case null ⇒ new PinnedDispatcher(mailboxType) + case some ⇒ new PinnedDispatcher(some, mailboxType) + } + + /** + * Creates an thread based dispatcher serving a single actor through the same single thread. + * + * E.g. each actor consumes its own thread. + */ + def newPinnedDispatcher(name: String, mailboxType: MailboxType) = + new PinnedDispatcher(name, mailboxType) + + /** + * Creates an thread based dispatcher serving a single actor through the same single thread. + * + * E.g. each actor consumes its own thread. + */ + def newPinnedDispatcher(name: String) = + new PinnedDispatcher(name) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. * * Has a fluent builder interface for configuring its semantics. */ - def newExecutorBasedEventDrivenDispatcher(name: String) = - ThreadPoolConfigDispatcherBuilder(config ⇒ new ExecutorBasedEventDrivenDispatcher(name, config), ThreadPoolConfig()) + def newDispatcher(name: String) = + ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(name, config), ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. * * Has a fluent builder interface for configuring its semantics. */ - def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = + def newDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new ExecutorBasedEventDrivenDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig()) + new Dispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. * * Has a fluent builder interface for configuring its semantics. */ - def newExecutorBasedEventDrivenDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = + def newDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new ExecutorBasedEventDrivenDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig()) + new Dispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. * * Has a fluent builder interface for configuring its semantics. */ - def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String) = - ThreadPoolConfigDispatcherBuilder(config ⇒ new ExecutorBasedEventDrivenWorkStealingDispatcher(name, config), ThreadPoolConfig()) + def newBalancingDispatcher(name: String) = + ThreadPoolConfigDispatcherBuilder(config ⇒ new BalancingDispatcher(name, config), ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. * * Has a fluent builder interface for configuring its semantics. */ - def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int) = + def newBalancingDispatcher(name: String, throughput: Int) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config), ThreadPoolConfig()) + new BalancingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, MAILBOX_TYPE, config), ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. * * Has a fluent builder interface for configuring its semantics. */ - def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = + def newBalancingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig()) + new BalancingDispatcher(name, throughput, THROUGHPUT_DEADLINE_TIME_MILLIS, mailboxType, config), ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. * * Has a fluent builder interface for configuring its semantics. */ - def newExecutorBasedEventDrivenWorkStealingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = + def newBalancingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new ExecutorBasedEventDrivenWorkStealingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig()) + new BalancingDispatcher(name, throughput, throughputDeadlineMs, mailboxType, config), ThreadPoolConfig()) /** * Utility function that tries to load the specified dispatcher config from the akka.conf * or else use the supplied default dispatcher @@ -169,7 +181,7 @@ object Dispatchers { * executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded * allow-core-timeout = on # Allow core threads to time out * rejection-policy = "caller-runs" # abort, caller-runs, discard-oldest, discard - * throughput = 5 # Throughput for ExecutorBasedEventDrivenDispatcher + * throughput = 5 # Throughput for Dispatcher * } * ex: from(config.getConfigMap(identifier).get) * @@ -180,9 +192,9 @@ object Dispatchers { */ def from(cfg: Configuration): Option[MessageDispatcher] = { cfg.getString("type") map { - case "ExecutorBasedEventDriven" ⇒ new ExecutorBasedEventDrivenDispatcherConfigurator() - case "ExecutorBasedEventDrivenWorkStealing" ⇒ new ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator() - case "GlobalExecutorBasedEventDriven" ⇒ GlobalExecutorBasedEventDrivenDispatcherConfigurator + case "Dispatcher" ⇒ new DispatcherConfigurator() + case "BalancingDispatcher" ⇒ new BalancingDispatcherConfigurator() + case "GlobalDispatcher" ⇒ GlobalDispatcherConfigurator case fqn ⇒ ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match { case r: Right[_, Class[MessageDispatcherConfigurator]] ⇒ @@ -200,13 +212,13 @@ object Dispatchers { } } -object GlobalExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator { - def configure(config: Configuration): MessageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher +object GlobalDispatcherConfigurator extends MessageDispatcherConfigurator { + def configure(config: Configuration): MessageDispatcher = Dispatchers.globalDispatcher } -class ExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherConfigurator { +class DispatcherConfigurator extends MessageDispatcherConfigurator { def configure(config: Configuration): MessageDispatcher = { - configureThreadPool(config, threadPoolConfig ⇒ new ExecutorBasedEventDrivenDispatcher( + configureThreadPool(config, threadPoolConfig ⇒ new Dispatcher( config.getString("name", newUuid.toString), config.getInt("throughput", Dispatchers.THROUGHPUT), config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS), @@ -215,9 +227,9 @@ class ExecutorBasedEventDrivenDispatcherConfigurator extends MessageDispatcherCo } } -class ExecutorBasedEventDrivenWorkStealingDispatcherConfigurator extends MessageDispatcherConfigurator { +class BalancingDispatcherConfigurator extends MessageDispatcherConfigurator { def configure(config: Configuration): MessageDispatcher = { - configureThreadPool(config, threadPoolConfig ⇒ new ExecutorBasedEventDrivenWorkStealingDispatcher( + configureThreadPool(config, threadPoolConfig ⇒ new BalancingDispatcher( config.getString("name", newUuid.toString), config.getInt("throughput", Dispatchers.THROUGHPUT), config.getInt("throughput-deadline-time", Dispatchers.THROUGHPUT_DEADLINE_TIME_MILLIS), diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 3ffdc0e1a6..d235d7f46e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -20,8 +20,6 @@ import java.lang.{ Iterable ⇒ JIterable } import java.util.{ LinkedList ⇒ JLinkedList } import scala.annotation.tailrec -import scala.collection.generic.CanBuildFrom -import scala.collection.mutable.Builder import scala.collection.mutable.Stack class FutureTimeoutException(message: String, cause: Throwable = null) extends AkkaException(message, cause) @@ -56,7 +54,7 @@ object Futures { * Returns a Future to the result of the first future in the list that is completed */ def firstCompletedOf[T](futures: Iterable[Future[T]], timeout: Long = Long.MaxValue): Future[T] = { - val futureResult = new DefaultCompletableFuture[T](timeout) + val futureResult = new DefaultPromise[T](timeout) val completeFirst: Future[T] ⇒ Unit = _.value.foreach(futureResult complete _) for (f ← futures) f onComplete completeFirst @@ -83,9 +81,9 @@ object Futures { */ def fold[T, R](zero: R, timeout: Long = Actor.TIMEOUT)(futures: Iterable[Future[T]])(foldFun: (R, T) ⇒ R): Future[R] = { if (futures.isEmpty) { - new AlreadyCompletedFuture[R](Right(zero)) + new KeptPromise[R](Right(zero)) } else { - val result = new DefaultCompletableFuture[R](timeout) + val result = new DefaultPromise[R](timeout) val results = new ConcurrentLinkedQueue[T]() val allDone = futures.size @@ -135,9 +133,9 @@ object Futures { */ def reduce[T, R >: T](futures: Iterable[Future[T]], timeout: Long = Actor.TIMEOUT)(op: (R, T) ⇒ T): Future[R] = { if (futures.isEmpty) - new AlreadyCompletedFuture[R](Left(new UnsupportedOperationException("empty reduce left"))) + new KeptPromise[R](Left(new UnsupportedOperationException("empty reduce left"))) else { - val result = new DefaultCompletableFuture[R](timeout) + val result = new DefaultPromise[R](timeout) val seedFound = new AtomicBoolean(false) val seedFold: Future[T] ⇒ Unit = f ⇒ { if (seedFound.compareAndSet(false, true)) { //Only the first completed should trigger the fold @@ -202,7 +200,7 @@ object Futures { * in parallel. * * def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A => Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = - * in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => + * in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) => * val fb = fn(a.asInstanceOf[A]) * for (r <- fr; b <-fb) yield (r += b) * }.map(_.result) @@ -230,7 +228,7 @@ object Future { /** * Create an empty Future with default timeout */ - def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultCompletableFuture[T](timeout) + def empty[T](timeout: Long = Actor.TIMEOUT) = new DefaultPromise[T](timeout) import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom @@ -240,7 +238,7 @@ object Future { * Useful for reducing many Futures into a single Future. */ def sequence[A, M[_] <: Traversable[_]](in: M[Future[A]], timeout: Long = Actor.TIMEOUT)(implicit cbf: CanBuildFrom[M[Future[A]], A, M[A]]): Future[M[A]] = - in.foldLeft(new DefaultCompletableFuture[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) + in.foldLeft(new DefaultPromise[Builder[A, M[A]]](timeout).completeWithResult(cbf(in)): Future[Builder[A, M[A]]])((fr, fa) ⇒ for (r ← fr; a ← fa.asInstanceOf[Future[A]]) yield (r += a)).map(_.result) /** * Transforms a Traversable[A] into a Future[Traversable[B]] using the provided Function A => Future[B]. @@ -251,7 +249,7 @@ object Future { * */ def traverse[A, B, M[_] <: Traversable[_]](in: M[A], timeout: Long = Actor.TIMEOUT)(fn: A ⇒ Future[B])(implicit cbf: CanBuildFrom[M[A], B, M[B]]): Future[M[B]] = - in.foldLeft(new DefaultCompletableFuture[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒ + in.foldLeft(new DefaultPromise[Builder[B, M[B]]](timeout).completeWithResult(cbf(in)): Future[Builder[B, M[B]]]) { (fr, a) ⇒ val fb = fn(a.asInstanceOf[A]) for (r ← fr; b ← fb) yield (r += b) }.map(_.result) @@ -267,23 +265,19 @@ object Future { * * This allows working with Futures in an imperative style without blocking for each result. * - * Completing a Future using 'CompletableFuture << Future' will also suspend execution until the + * Completing a Future using 'Promise << Future' will also suspend execution until the * value of the other Future is available. * * The Delimited Continuations compiler plugin must be enabled in order to use this method. */ def flow[A](body: ⇒ A @cps[Future[Any]], timeout: Long = Actor.TIMEOUT): Future[A] = { val future = Promise[A](timeout) - (reset(future.asInstanceOf[CompletableFuture[Any]].completeWithResult(body)): Future[Any]) onComplete { f ⇒ + (reset(future.asInstanceOf[Promise[Any]].completeWithResult(body)): Future[Any]) onComplete { f ⇒ val opte = f.exception if (opte.isDefined) future completeWithException (opte.get) } future } - - private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() { - override def initialValue = None - } } sealed trait Future[+T] { @@ -417,7 +411,7 @@ sealed trait Future[+T] { * */ final def collect[A](pf: PartialFunction[Any, A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val v = ft.value.get fa complete { @@ -450,7 +444,7 @@ sealed trait Future[+T] { * */ final def failure[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val opte = ft.exception fa complete { @@ -482,7 +476,7 @@ sealed trait Future[+T] { * */ final def map[A](f: T ⇒ A): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val optv = ft.value if (optv.isDefined) { @@ -518,7 +512,7 @@ sealed trait Future[+T] { * */ final def flatMap[A](f: T ⇒ Future[A]): Future[A] = { - val fa = new DefaultCompletableFuture[A](timeoutInNanos, NANOS) + val fa = new DefaultPromise[A](timeoutInNanos, NANOS) onComplete { ft ⇒ val optv = ft.value if (optv.isDefined) { @@ -546,7 +540,7 @@ sealed trait Future[+T] { } final def filter(p: Any ⇒ Boolean): Future[Any] = { - val f = new DefaultCompletableFuture[T](timeoutInNanos, NANOS) + val f = new DefaultPromise[T](timeoutInNanos, NANOS) onComplete { ft ⇒ val optv = ft.value if (optv.isDefined) { @@ -596,16 +590,19 @@ sealed trait Future[+T] { object Promise { - def apply[A](timeout: Long): CompletableFuture[A] = new DefaultCompletableFuture[A](timeout) + def apply[A](timeout: Long): Promise[A] = new DefaultPromise[A](timeout) - def apply[A](): CompletableFuture[A] = apply(Actor.TIMEOUT) + def apply[A](): Promise[A] = apply(Actor.TIMEOUT) + private[akka] val callbacksPendingExecution = new ThreadLocal[Option[Stack[() ⇒ Unit]]]() { + override def initialValue = None + } } /** * Essentially this is the Promise (or write-side) of a Future (read-side). */ -trait CompletableFuture[T] extends Future[T] { +trait Promise[T] extends Future[T] { /** * Completes this Future with the specified result, if not already completed. * @return this @@ -637,7 +634,7 @@ trait CompletableFuture[T] extends Future[T] { final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ cont(complete(Right(value))) } final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] ⇒ Future[Any]) ⇒ - val fr = new DefaultCompletableFuture[Any](Actor.TIMEOUT) + val fr = new DefaultPromise[Any](Actor.TIMEOUT) this completeWith other onComplete { f ⇒ try { fr completeWith cont(f) @@ -655,7 +652,7 @@ trait CompletableFuture[T] extends Future[T] { /** * The default concrete Future implementation. */ -class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends CompletableFuture[T] { +class DefaultPromise[T](timeout: Long, timeunit: TimeUnit) extends Promise[T] { def this() = this(0, MILLIS) @@ -722,7 +719,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } } - def complete(value: Either[Throwable, T]): DefaultCompletableFuture[T] = { + def complete(value: Either[Throwable, T]): DefaultPromise[T] = { _lock.lock val notifyTheseListeners = try { if (_value.isEmpty && !isExpired) { //Only complete if we aren't expired @@ -746,7 +743,7 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } } - val pending = Future.callbacksPendingExecution.get + val pending = Promise.callbacksPendingExecution.get if (pending.isDefined) { //Instead of nesting the calls to the callbacks (leading to stack overflow) pending.get.push(() ⇒ { // Linearize/aggregate callbacks at top level and then execute val doNotify = notifyCompleted _ //Hoist closure to avoid garbage @@ -755,16 +752,16 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com } else { try { val callbacks = Stack[() ⇒ Unit]() // Allocate new aggregator for pending callbacks - Future.callbacksPendingExecution.set(Some(callbacks)) // Specify the callback aggregator + Promise.callbacksPendingExecution.set(Some(callbacks)) // Specify the callback aggregator runCallbacks(notifyTheseListeners, callbacks) // Execute callbacks, if they trigger new callbacks, they are aggregated - } finally { Future.callbacksPendingExecution.set(None) } // Ensure cleanup + } finally { Promise.callbacksPendingExecution.set(None) } // Ensure cleanup } } this } - def onComplete(func: Future[T] ⇒ Unit): CompletableFuture[T] = { + def onComplete(func: Future[T] ⇒ Unit): Promise[T] = { _lock.lock val notifyNow = try { if (_value.isEmpty) { @@ -800,10 +797,10 @@ class DefaultCompletableFuture[T](timeout: Long, timeunit: TimeUnit) extends Com * An already completed Future is seeded with it's result at creation, is useful for when you are participating in * a Future-composition but you already have a value to contribute. */ -sealed class AlreadyCompletedFuture[T](suppliedValue: Either[Throwable, T]) extends CompletableFuture[T] { +sealed class KeptPromise[T](suppliedValue: Either[Throwable, T]) extends Promise[T] { val value = Some(suppliedValue) - def complete(value: Either[Throwable, T]): CompletableFuture[T] = this + def complete(value: Either[Throwable, T]): Promise[T] = this def onComplete(func: Future[T] ⇒ Unit): Future[T] = { func(this); this } def await(atMost: Duration): Future[T] = this def await: Future[T] = this diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index f64c8109cb..f5cda388c2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -19,7 +19,7 @@ import akka.actor._ final case class MessageInvocation(receiver: ActorRef, message: Any, sender: Option[ActorRef], - senderFuture: Option[CompletableFuture[Any]]) { + senderFuture: Option[Promise[Any]]) { if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null") def invoke() { @@ -32,7 +32,7 @@ final case class MessageInvocation(receiver: ActorRef, } } -final case class FutureInvocation[T](future: CompletableFuture[T], function: () ⇒ T, cleanup: () ⇒ Unit) extends Runnable { +final case class FutureInvocation[T](future: Promise[T], function: () ⇒ T, cleanup: () ⇒ Unit) extends Runnable { def run() { future complete (try { Right(function()) @@ -99,7 +99,7 @@ trait MessageDispatcher { private[akka] final def dispatchFuture[T](block: () ⇒ T, timeout: Long): Future[T] = { futures.getAndIncrement() try { - val future = new DefaultCompletableFuture[T](timeout) + val future = new DefaultPromise[T](timeout) if (active.isOff) guard withGuard { diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala similarity index 55% rename from akka-actor/src/main/scala/akka/dispatch/ThreadBasedDispatcher.scala rename to akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala index c365329834..e03e6af9e2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala @@ -4,31 +4,36 @@ package akka.dispatch -import akka.actor.{ ActorRef } import akka.util.Duration import java.util.concurrent.atomic.AtomicReference +import akka.actor.{ Actor, ActorRef } /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. * * @author Jonas Bonér */ -class ThreadBasedDispatcher(_actor: ActorRef, _mailboxType: MailboxType) - extends ExecutorBasedEventDrivenDispatcher( - _actor.uuid.toString, Dispatchers.THROUGHPUT, -1, _mailboxType, ThreadBasedDispatcher.oneThread) { +class PinnedDispatcher(_actor: ActorRef, _name: String, _mailboxType: MailboxType) + extends Dispatcher( + _name, Dispatchers.THROUGHPUT, -1, _mailboxType, PinnedDispatcher.oneThread) { + + def this(_name: String, _mailboxType: MailboxType) = this(null, _name, _mailboxType) + + def this(_actor: ActorRef, _name: String) = this(_actor, _name, Dispatchers.MAILBOX_TYPE) + + def this(_name: String) = this(null, _name, Dispatchers.MAILBOX_TYPE) + + def this(_mailboxType: MailboxType) = this(null, "anon", _mailboxType) + + def this(_actor: ActorRef, _mailboxType: MailboxType) = this(_actor, _actor.uuid.toString, _mailboxType) + + def this(_actor: ActorRef) = this(_actor, _actor.uuid.toString, Dispatchers.MAILBOX_TYPE) + + def this() = this(Dispatchers.MAILBOX_TYPE) private[akka] val owner = new AtomicReference[ActorRef](_actor) - def this(actor: ActorRef) = - this(actor, UnboundedMailbox()) // For Java API - - def this(actor: ActorRef, capacity: Int) = - this(actor, BoundedMailbox(capacity)) //For Java API - - def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = //For Java API - this(actor, BoundedMailbox(capacity, pushTimeOut)) - override def register(actorRef: ActorRef) = { val actor = owner.get() if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor) @@ -42,7 +47,7 @@ class ThreadBasedDispatcher(_actor: ActorRef, _mailboxType: MailboxType) } } -object ThreadBasedDispatcher { +object PinnedDispatcher { val oneThread: ThreadPoolConfig = ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1) } diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index 5c2d7b89cc..28adf4946d 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -89,7 +89,7 @@ object EventHandler extends ListenerManagement { class EventHandlerException extends AkkaException - lazy val EventHandlerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("event:handler").build + lazy val EventHandlerDispatcher = Dispatchers.newDispatcher("event:handler").build implicit object defaultListenerFormat extends StatelessActorFormat[DefaultListener] diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 13311138a9..91478c8eb2 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -7,7 +7,7 @@ package akka.remoteinterface import akka.japi.Creator import akka.actor._ import akka.util._ -import akka.dispatch.CompletableFuture +import akka.dispatch.Promise import akka.serialization._ import akka.AkkaException @@ -300,10 +300,10 @@ trait RemoteClientModule extends RemoteModule { self: RemoteModule ⇒ protected[akka] def send[T](message: Any, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]], + senderFuture: Option[Promise[T]], remoteAddress: InetSocketAddress, timeout: Long, isOneWay: Boolean, actorRef: ActorRef, - loader: Option[ClassLoader]): Option[CompletableFuture[T]] + loader: Option[ClassLoader]): Option[Promise[T]] } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 0ea431d88a..99b985053b 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -10,6 +10,8 @@ import akka.actor.Actor._ import akka.actor.ActorRef import scala.collection.JavaConversions._ import scala.collection.immutable.Seq +import java.util.concurrent.atomic.AtomicReference +import annotation.tailrec object Routing { @@ -80,18 +82,27 @@ trait InfiniteIterator[T] extends Iterator[T] { case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] { def this(items: java.util.List[T]) = this(items.toList) - @volatile - private[this] var current: Seq[T] = items + private[this] val current: AtomicReference[Seq[T]] = new AtomicReference(items) def hasNext = items != Nil - def next = { - val nc = if (current == Nil) items else current - current = nc.tail - nc.head + def next: T = { + @tailrec + def findNext: T = { + val currentItems = current.get + val newItems = currentItems match { + case Nil ⇒ items + case xs ⇒ xs + } + + if (current.compareAndSet(currentItems, newItems.tail)) newItems.head + else findNext + } + + findNext } - override def exists(f: T ⇒ Boolean): Boolean = items.exists(f) + override def exists(f: T ⇒ Boolean): Boolean = items exists f } /** diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 3efdb67a63..e47973984c 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -4,7 +4,7 @@ package akka.util -import akka.dispatch.{ Future, CompletableFuture, MessageInvocation } +import akka.dispatch.{ Future, Promise, MessageInvocation } import akka.config.{ Config, ModuleNotAvailableException } import akka.remoteinterface.RemoteSupport import akka.actor._ diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 1eb0f624b3..a32c462441 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -227,22 +227,19 @@ object Cluster { properties = properties + property } - private def nodename: String = { - val overridden = properties.get("akka.cluster.nodename") - if (overridden.isDefined) overridden.get - else Config.nodename + private def nodename: String = properties.get("akka.cluster.nodename") match { + case Some(uberride) ⇒ uberride + case None ⇒ Config.nodename } - private def hostname: String = { - val overridden = properties.get("akka.cluster.hostname") - if (overridden.isDefined) overridden.get - else Config.hostname + private def hostname: String = properties.get("akka.cluster.hostname") match { + case Some(uberride) ⇒ uberride + case None ⇒ Config.hostname } - private def port: Int = { - val overridden = properties.get("akka.cluster.port") - if (overridden.isDefined) overridden.get.toInt - else Config.remoteServerPort + private def port: Int = properties.get("akka.cluster.port") match { + case Some(uberride) ⇒ uberride.toInt + case None ⇒ Config.remoteServerPort } val defaultSerializer = new SerializableSerializer @@ -958,7 +955,9 @@ class ClusterNode private[akka] ( */ def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.isOn) { try { - zkClient.getChildren(actorAddressToUuidsPathFor(actorAddress)).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]] + zkClient.getChildren(actorAddressToUuidsPathFor(actorAddress)).toArray map { + case c: CharSequence ⇒ new UUID(c) + } } catch { case e: ZkNoNodeException ⇒ Array[UUID]() } @@ -969,7 +968,7 @@ class ClusterNode private[akka] ( */ def nodesForActorsInUseWithUuid(uuid: UUID): Array[String] = if (isConnected.isOn) { try { - zkClient.getChildren(actorLocationsPathFor(uuid)).toList.toArray.asInstanceOf[Array[String]] + zkClient.getChildren(actorLocationsPathFor(uuid)).toArray.asInstanceOf[Array[String]] } catch { case e: ZkNoNodeException ⇒ Array[String]() } @@ -982,8 +981,7 @@ class ClusterNode private[akka] ( flatten { actorUuidsForActorAddress(address) map { uuid ⇒ try { - val list = zkClient.getChildren(actorLocationsPathFor(uuid)) - list.toList.toArray.asInstanceOf[Array[String]] + zkClient.getChildren(actorLocationsPathFor(uuid)).toArray.asInstanceOf[Array[String]] } catch { case e: ZkNoNodeException ⇒ Array[String]() } @@ -996,7 +994,9 @@ class ClusterNode private[akka] ( */ def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.isOn) { try { - zkClient.getChildren(actorsAtNodePathFor(nodeName)).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]] + zkClient.getChildren(actorsAtNodePathFor(nodeName)).toArray map { + case c: CharSequence ⇒ new UUID(c) + } } catch { case e: ZkNoNodeException ⇒ Array[UUID]() } @@ -1008,7 +1008,9 @@ class ClusterNode private[akka] ( def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.isOn) { val uuids = try { - zkClient.getChildren(actorsAtNodePathFor(nodeName)).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]] + zkClient.getChildren(actorsAtNodePathFor(nodeName)).toArray map { + case c: CharSequence ⇒ new UUID(c) + } } catch { case e: ZkNoNodeException ⇒ Array[UUID]() } @@ -1024,11 +1026,10 @@ class ClusterNode private[akka] ( zkClient.readData(actorRegistryFormatPathFor(uuid), new Stat).asInstanceOf[Serializer] } - val format = formats.head if (formats.isEmpty) throw new IllegalStateException("No Serializer found for [%s]".format(actorAddress)) - if (formats map (_ == format) exists (_ == false)) throw new IllegalStateException( - "Multiple Serializer classes found for [%s]".format(actorAddress)) - format + if (formats.forall(_ == formats.head) == false) throw new IllegalStateException("Multiple Serializer classes found for [%s]".format(actorAddress)) + + formats.head } /** @@ -1126,9 +1127,7 @@ class ClusterNode private[akka] ( } } }) match { - case Left(_) ⇒ { - /* do nothing */ - } + case Left(_) ⇒ /* do nothing */ case Right(exception) ⇒ throw exception } } @@ -1429,23 +1428,15 @@ class ClusterNode private[akka] ( import Cluster._ - override def start() { - self.start() - } + override def start(): Unit = self.start() - override def stop() { - self.shutdown() - } + override def stop(): Unit = self.shutdown() override def disconnect() = self.disconnect() - override def reconnect() { - self.reconnect() - } + override def reconnect(): Unit = self.reconnect() - override def resign() { - self.resign() - } + override def resign(): Unit = self.resign() override def isConnected = self.isConnected.isOn @@ -1479,15 +1470,11 @@ class ClusterNode private[akka] ( override def getAddressesForActorsInUseOnNode(nodeName: String) = self.addressesForActorsInUseOnNode(nodeName).map(_.toString).toArray - override def setConfigElement(key: String, value: String) { - self.setConfigElement(key, value.getBytes("UTF-8")) - } + override def setConfigElement(key: String, value: String): Unit = self.setConfigElement(key, value.getBytes("UTF-8")) override def getConfigElement(key: String) = new String(self.getConfigElement(key), "UTF-8") - override def removeConfigElement(key: String) { - self.removeConfigElement(key) - } + override def removeConfigElement(key: String): Unit = self.removeConfigElement(key) override def getConfigElementKeys = self.getConfigElementKeys.toArray } @@ -1580,7 +1567,7 @@ object RemoteClusterDaemon { val ADDRESS = "akka-cluster-daemon".intern // FIXME configure functionServerDispatcher to what? - val functionServerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("akka:cloud:cluster:function:server").build + val functionServerDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:function:server").build } /** @@ -1591,7 +1578,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { import RemoteClusterDaemon._ import Cluster._ - self.dispatcher = Dispatchers.newThreadBasedDispatcher(self) + self.dispatcher = Dispatchers.newPinnedDispatcher(self) def receive: Receive = { case message: RemoteDaemonMessageProtocol ⇒ @@ -1664,8 +1651,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { self.dispatcher = functionServerDispatcher def receive = { - case t: Tuple2[Function1[Any, Unit], Any] ⇒ try { - t._1(t._2) + case (fun: Function[Any, Unit], param: Any) ⇒ try { + fun(param) } finally { self.stop() } @@ -1677,8 +1664,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { self.dispatcher = functionServerDispatcher def receive = { - case t: Tuple2[Function1[Any, Any], Any] ⇒ try { - self.reply(t._1(t._2)) + case (fun: Function[Any, Unit], param: Any) ⇒ try { + self.reply(fun(param)) } finally { self.stop() } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index ff6b89506d..7776e8f1d5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -8,7 +8,7 @@ import Cluster._ import akka.actor._ import akka.actor.Actor._ import akka.event.EventHandler -import akka.dispatch.CompletableFuture +import akka.dispatch.Promise import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicReference @@ -29,43 +29,32 @@ class ClusterActorRef private[akka] ( EventHandler.debug(this, "Creating a ClusterActorRef for actor with address [%s]".format(address)) private[akka] val addresses = new AtomicReference[Map[InetSocketAddress, ActorRef]]( - createConnections(actorAddresses)) + (Map[InetSocketAddress, ActorRef]() /: actorAddresses) { + case (map, (uuid, address)) ⇒ map + (address -> createRemoteActorRef(uuid, address)) + }) - def connections: Map[InetSocketAddress, ActorRef] = addresses.get.toMap + def connections: Map[InetSocketAddress, ActorRef] = addresses.get - override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) { + override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = route(message)(senderOption) - } override def postMessageToMailboxAndCreateFutureResultWithTimeout[T]( message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = - route[T](message, timeout)(senderOption).asInstanceOf[CompletableFuture[T]] + senderFuture: Option[Promise[T]]): Promise[T] = + route[T](message, timeout)(senderOption).asInstanceOf[Promise[T]] private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress) { addresses set (addresses.get map { - case (address, actorRef) ⇒ - if (address == from) { - actorRef.stop() - (to, createRemoteActorRef(actorRef.uuid, to)) - } else (address, actorRef) + case (`from`, actorRef) ⇒ + actorRef.stop() + (to, createRemoteActorRef(actorRef.uuid, to)) + case other ⇒ other }) } - private def createConnections(addresses: Array[Tuple2[UUID, InetSocketAddress]]): Map[InetSocketAddress, ActorRef] = { - var connections = Map.empty[InetSocketAddress, ActorRef] - addresses foreach { - case (uuid, address) ⇒ - connections = connections + (address -> createRemoteActorRef(uuid, address)) - } - connections - } - - private def createRemoteActorRef(uuid: UUID, address: InetSocketAddress) = { - RemoteActorRef( - UUID_PREFIX + uuidToString(uuid), // clustered refs are always registered and looked up by UUID - Actor.TIMEOUT, None) - } + // clustered refs are always registered and looked up by UUID + private def createRemoteActorRef(uuid: UUID, address: InetSocketAddress) = + RemoteActorRef(UUID_PREFIX + uuidToString(uuid), Actor.TIMEOUT, None) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala b/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala index cf4bff9859..4b075c7f91 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala @@ -86,7 +86,7 @@ class ReplicatedActorRef private[akka] (actorRef: ActorRef, val address: String) message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture) + senderFuture: Option[Promise[T]]): Promise[T] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture) protected[akka] def actorInstance: AtomicReference[Actor] = actorRef.actorInstance protected[akka] def supervisor_=(sup: Option[ActorRef]) { actorRef.supervisor_=(sup) diff --git a/akka-cluster/src/main/scala/akka/cluster/Routing.scala b/akka-cluster/src/main/scala/akka/cluster/Routing.scala index 68a0a29d3e..d863299777 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Routing.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Routing.scala @@ -14,6 +14,8 @@ import akka.AkkaException import java.net.InetSocketAddress import com.eaio.uuid.UUID +import annotation.tailrec +import java.util.concurrent.atomic.AtomicReference class RoutingException(message: String) extends AkkaException(message) @@ -53,76 +55,67 @@ object Router { trait Router { def connections: Map[InetSocketAddress, ActorRef] - def route(message: Any)(implicit sender: Option[ActorRef]) + def route(message: Any)(implicit sender: Option[ActorRef]): Unit def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] } - /** - * @author Jonas Bonér - */ - trait Direct extends Router { - lazy val connection: Option[ActorRef] = { - if (connections.size == 0) throw new IllegalStateException("DirectRouter need a single replica connection found [0]") - connections.toList.map({ case (address, actor) ⇒ actor }).headOption + trait BasicRouter extends Router { + def route(message: Any)(implicit sender: Option[ActorRef]): Unit = next match { + case Some(actor) ⇒ actor.!(message)(sender) + case _ ⇒ throw new RoutingException("No node connections for router") } - def route(message: Any)(implicit sender: Option[ActorRef]) { - if (connection.isDefined) connection.get.!(message)(sender) - else throw new RoutingException("No node connections for router") + def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = next match { + case Some(actor) ⇒ actor.!!!(message, timeout)(sender) + case _ ⇒ throw new RoutingException("No node connections for router") } - def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = - if (connection.isDefined) connection.get.!!!(message, timeout)(sender) - else throw new RoutingException("No node connections for router") + protected def next: Option[ActorRef] } /** * @author Jonas Bonér */ - trait Random extends Router { + trait Direct extends BasicRouter { + lazy val next: Option[ActorRef] = connections.values.headOption + } + + /** + * @author Jonas Bonér + */ + trait Random extends BasicRouter { private val random = new java.util.Random(System.currentTimeMillis) - def route(message: Any)(implicit sender: Option[ActorRef]) { - if (next.isDefined) next.get.!(message)(sender) - else throw new RoutingException("No node connections for router") - } - - def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = - if (next.isDefined) next.get.!!!(message, timeout)(sender) - else throw new RoutingException("No node connections for router") - - private def next: Option[ActorRef] = { - val nrOfConnections = connections.size - if (nrOfConnections == 0) None - else Some(connections.toArray.apply(random.nextInt(nrOfConnections))._2) - } + def next: Option[ActorRef] = + if (connections.isEmpty) None + else Some(connections.valuesIterator.drop(random.nextInt(connections.size)).next) } /** * @author Jonas Bonér */ - trait RoundRobin extends Router { - private def items: List[ActorRef] = connections.toList.map({ case (address, actor) ⇒ actor }) + trait RoundRobin extends BasicRouter { + private def items: List[ActorRef] = connections.values.toList - @volatile - private var current = items + private val current = new AtomicReference[List[ActorRef]](items) - def route(message: Any)(implicit sender: Option[ActorRef]) { - if (next.isDefined) next.get.!(message)(sender) - else throw new RoutingException("No node connections for router") - } + private def hasNext = connections.nonEmpty - def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = - if (next.isDefined) next.get.!!!(message, timeout)(sender) - else throw new RoutingException("No node connections for router") + def next: Option[ActorRef] = { + @tailrec + def findNext: Option[ActorRef] = { + val currentItems = current.get + val newItems = currentItems match { + case Nil ⇒ items + case xs ⇒ xs + } - private def hasNext = items != Nil + if (current.compareAndSet(currentItems, newItems.tail)) newItems.headOption + else findNext + } - private def next: Option[ActorRef] = { - val rest = if (current == Nil) items else current - current = rest.tail - rest.headOption + findNext } } } diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index b6e30fca1c..f5c96250b4 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -13,7 +13,7 @@ import akka.config._ import Config._ import akka.util._ import akka.event.EventHandler -import akka.dispatch.{ DefaultCompletableFuture, CompletableFuture } +import akka.dispatch.{ DefaultPromise, Promise } import akka.AkkaException import akka.cluster.zookeeper._ @@ -140,7 +140,7 @@ class TransactionLog private (ledger: LedgerHandle, val id: String, val isAsync: "Reading entries [%s -> %s] for log [%s]".format(from, to, logId)) if (isAsync) { - val future = new DefaultCompletableFuture[Vector[Array[Byte]]](timeout) + val future = new DefaultPromise[Vector[Array[Byte]]](timeout) ledger.asyncReadEntries( from, to, new AsyncCallback.ReadCallback { @@ -149,7 +149,7 @@ class TransactionLog private (ledger: LedgerHandle, val id: String, val isAsync: ledgerHandle: LedgerHandle, enumeration: Enumeration[LedgerEntry], ctx: AnyRef) { - val future = ctx.asInstanceOf[CompletableFuture[Vector[Array[Byte]]]] + val future = ctx.asInstanceOf[Promise[Vector[Array[Byte]]]] var entries = Vector[Array[Byte]]() while (enumeration.hasMoreElements) { entries = entries :+ enumeration.nextElement.getEntry @@ -362,7 +362,7 @@ object TransactionLog { if (zkClient.exists(txLogPath)) throw new ReplicationException( "Transaction log for UUID [" + id + "] already exists") - val future = new DefaultCompletableFuture[LedgerHandle](timeout) + val future = new DefaultPromise[LedgerHandle](timeout) if (isAsync) { bookieClient.asyncCreateLedger( ensembleSize, quorumSize, digestType, password, @@ -371,7 +371,7 @@ object TransactionLog { returnCode: Int, ledgerHandle: LedgerHandle, ctx: AnyRef) { - val future = ctx.asInstanceOf[CompletableFuture[LedgerHandle]] + val future = ctx.asInstanceOf[Promise[LedgerHandle]] if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle) else future.completeWithException(BKException.create(returnCode)) } @@ -422,7 +422,7 @@ object TransactionLog { val ledger = try { if (isAsync) { - val future = new DefaultCompletableFuture[LedgerHandle](timeout) + val future = new DefaultPromise[LedgerHandle](timeout) bookieClient.asyncOpenLedger( logId, digestType, password, new AsyncCallback.OpenCallback { @@ -430,7 +430,7 @@ object TransactionLog { returnCode: Int, ledgerHandle: LedgerHandle, ctx: AnyRef) { - val future = ctx.asInstanceOf[CompletableFuture[LedgerHandle]] + val future = ctx.asInstanceOf[Promise[LedgerHandle]] if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle) else future.completeWithException(BKException.create(returnCode)) } @@ -447,7 +447,7 @@ object TransactionLog { TransactionLog(ledger, id, isAsync) } - private[akka] def await[T](future: CompletableFuture[T]): T = { + private[akka] def await[T](future: Promise[T]): T = { future.await if (future.result.isDefined) future.result.get else if (future.exception.isDefined) handleError(future.exception.get) diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterMultiJvmSpec.scala deleted file mode 100644 index 36408dd76a..0000000000 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterMultiJvmSpec.scala +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB