From e519e3dc3bbf3bfae642697c07db21aecb6ba8ed Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 31 Jan 2013 02:16:39 +0100 Subject: [PATCH] #2986 - Changing ActorContext and ActorRefFactory's dispatcher to return ExecutionContext --- .../test/scala/akka/pattern/CircuitBreakerSpec.scala | 2 +- akka-actor/src/main/scala/akka/actor/ActorCell.scala | 3 ++- akka-actor/src/main/scala/akka/actor/ActorDSL.scala | 2 +- .../src/main/scala/akka/actor/ActorRefProvider.scala | 10 +++++----- akka-actor/src/main/scala/akka/actor/ActorSystem.scala | 4 ++-- .../src/main/scala/akka/actor/dungeon/Dispatch.scala | 8 ++------ .../main/scala/akka/dispatch/AbstractDispatcher.scala | 2 -- .../util/SerializedSuspendableExecutionContext.scala | 4 ++-- akka-camel/src/main/scala/akka/camel/Activation.scala | 5 ++--- .../main/scala/akka/camel/internal/DefaultCamel.scala | 2 +- .../scala/akka/camel/ActivationIntegrationTest.scala | 2 +- .../scala/akka/camel/ConsumerIntegrationTest.scala | 2 +- akka-camel/src/test/scala/akka/camel/TestSupport.scala | 2 +- .../camel/internal/component/ActorProducerTest.scala | 2 +- akka-cluster/src/main/scala/akka/cluster/Cluster.scala | 2 +- .../main/scala/akka/contrib/jul/JulEventHandler.scala | 2 +- akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst | 6 ++++++ .../scala/akka/remote/RemoteActorRefProvider.scala | 4 ++-- .../transport/FailureInjectorTransportAdapter.scala | 2 -- .../src/test/scala/akka/testkit/AkkaSpecSpec.scala | 2 +- 20 files changed, 33 insertions(+), 35 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala index 8cfcb559a9..05a5fcb5c6 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerSpec.scala @@ -10,7 +10,7 @@ import scala.concurrent.duration._ import akka.testkit._ import org.scalatest.BeforeAndAfter import akka.actor.{ ActorSystem, Scheduler } -import concurrent.{ ExecutionContext, Future, Await } +import scala.concurrent.{ ExecutionContext, Future, Await } object CircuitBreakerSpec { diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index fa0fdb4e52..d1aec217be 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -15,6 +15,7 @@ import akka.dispatch.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Superv import akka.event.Logging.{ LogEvent, Debug, Error } import akka.japi.Procedure import akka.dispatch.NullMessage +import scala.concurrent.ExecutionContext /** * The actor context - the view of the actor cell from the actor. @@ -119,7 +120,7 @@ trait ActorContext extends ActorRefFactory { * Returns the dispatcher (MessageDispatcher) that is used for this Actor. * Importing this member will place a implicit MessageDispatcher in scope. */ - implicit def dispatcher: MessageDispatcher + implicit def dispatcher: ExecutionContext /** * The system that the actor belongs to. diff --git a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala index 2b43fb8196..b229795736 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorDSL.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorDSL.scala @@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit * * {{{ * import ActorDSL._ - * import concurrent.util.duration._ + * import scala.concurrent.util.duration._ * * implicit val system: ActorSystem = ... * diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index a593c6dde8..82d919ec96 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -11,8 +11,8 @@ import akka.util.{ Switch, Helpers } import akka.japi.Util.immutableSeq import akka.util.Collections.EmptyImmutableSeq import scala.util.{ Success, Failure } -import scala.concurrent.{ Future, Promise } import java.util.concurrent.atomic.AtomicLong +import scala.concurrent.{ ExecutionContext, Future, Promise } /** * Interface for all ActorRef providers to implement. @@ -51,8 +51,8 @@ trait ActorRefProvider { */ def settings: ActorSystem.Settings - //FIXME WHY IS THIS HERE? - def dispatcher: MessageDispatcher + //FIXME Only here because of AskSupport, should be dealt with + def dispatcher: ExecutionContext /** * Initialization of an ActorRefProvider happens in two steps: first @@ -169,7 +169,7 @@ trait ActorRefFactory { /** * Returns the default MessageDispatcher associated with this ActorRefFactory */ - implicit def dispatcher: MessageDispatcher + implicit def dispatcher: ExecutionContext /** * Father of all children created by this interface. @@ -469,7 +469,7 @@ class LocalActorRefProvider( @volatile private var system: ActorSystemImpl = _ - def dispatcher: MessageDispatcher = system.dispatcher + def dispatcher: ExecutionContext = system.dispatcher lazy val terminationPromise: Promise[Unit] = Promise[Unit]() diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index a08a3d0311..d0139af91d 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -318,7 +318,7 @@ abstract class ActorSystem extends ActorRefFactory { * explicitly. * Importing this member will place the default MessageDispatcher in scope. */ - implicit def dispatcher: MessageDispatcher + implicit def dispatcher: ExecutionContext /** * Register a block of code (callback) to run after ActorSystem.shutdown has been issued and @@ -564,7 +564,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings)) - val dispatcher: MessageDispatcher = dispatchers.defaultGlobalDispatcher + val dispatcher: ExecutionContext = dispatchers.defaultGlobalDispatcher def terminationFuture: Future[Unit] = provider.terminationFuture def lookupRoot: InternalActorRef = provider.rootGuardian diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index 910ca9bf4d..8d4a62d073 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -8,11 +8,12 @@ import scala.annotation.tailrec import akka.dispatch.{ Terminate, SystemMessage, Suspend, Resume, Recreate, MessageDispatcher, Mailbox, Envelope, Create } import akka.event.Logging.Error import akka.util.Unsafe -import scala.util.control.NonFatal import akka.dispatch.NullMessage import akka.actor.{ NoSerializationVerificationNeeded, InvalidMessageException, ActorRef, ActorCell } import akka.serialization.SerializationExtension +import scala.util.control.NonFatal import scala.util.control.Exception.Catcher +import scala.concurrent.ExecutionContext private[akka] trait Dispatch { this: ActorCell ⇒ @@ -32,11 +33,6 @@ private[akka] trait Dispatch { this: ActorCell ⇒ val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher) - /** - * UntypedActorContext impl - */ - final def getDispatcher(): MessageDispatcher = dispatcher - final def isTerminated: Boolean = mailbox.isClosed /** diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 4f59b9abe6..fff250c547 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -178,8 +178,6 @@ private[akka] object MessageDispatcher { println(" -> " + a + status + messages + parent) } } - - implicit def defaultDispatcher(implicit system: ActorSystem): MessageDispatcher = system.dispatcher } abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContext { diff --git a/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala b/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala index 841cd865fd..fa52d20c5c 100644 --- a/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala +++ b/akka-actor/src/main/scala/akka/util/SerializedSuspendableExecutionContext.scala @@ -11,8 +11,8 @@ private[akka] object SerializedSuspendableExecutionContext { final val On = 1 final val Suspended = 2 - def apply(batchSize: Int)(implicit context: ExecutionContext): SerializedSuspendableExecutionContext = - new SerializedSuspendableExecutionContext(batchSize)(context match { + def apply(throughput: Int)(implicit context: ExecutionContext): SerializedSuspendableExecutionContext = + new SerializedSuspendableExecutionContext(throughput)(context match { case s: SerializedSuspendableExecutionContext ⇒ s.context case other ⇒ other }) diff --git a/akka-camel/src/main/scala/akka/camel/Activation.scala b/akka-camel/src/main/scala/akka/camel/Activation.scala index 983dba0b2d..c4f1097a18 100644 --- a/akka-camel/src/main/scala/akka/camel/Activation.scala +++ b/akka-camel/src/main/scala/akka/camel/Activation.scala @@ -8,9 +8,8 @@ import akka.camel.internal._ import akka.util.Timeout import akka.actor.{ ActorSystem, Props, ActorRef } import akka.pattern._ -import concurrent.{ ExecutionContext, Future } -import scala.concurrent.duration.Duration -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.duration.{ Duration, FiniteDuration } /** * Activation trait that can be used to wait on activation or de-activation of Camel endpoints. diff --git a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala index e876a36e2a..f79da6b568 100644 --- a/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala +++ b/akka-camel/src/main/scala/akka/camel/internal/DefaultCamel.scala @@ -9,7 +9,7 @@ import akka.camel.internal.ActivationProtocol._ import scala.util.control.NonFatal import scala.concurrent.duration._ import org.apache.camel.ProducerTemplate -import concurrent.{ Future, ExecutionContext } +import scala.concurrent.{ Future, ExecutionContext } import akka.util.Timeout import akka.pattern.ask import java.io.InputStream diff --git a/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala index a8edab40ce..408139fc18 100644 --- a/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ActivationIntegrationTest.scala @@ -13,7 +13,7 @@ import akka.actor._ import TestSupport._ import org.scalatest.WordSpec import akka.testkit.TestLatch -import concurrent.Await +import scala.concurrent.Await import java.util.concurrent.TimeoutException import akka.util.Timeout diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala index e903d2ce19..4d2c3e7b48 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerIntegrationTest.scala @@ -18,7 +18,7 @@ import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException } import akka.actor.Status.Failure import scala.concurrent.duration._ -import concurrent.{ ExecutionContext, Await } +import scala.concurrent.{ ExecutionContext, Await } import akka.testkit._ import akka.util.Timeout diff --git a/akka-camel/src/test/scala/akka/camel/TestSupport.scala b/akka-camel/src/test/scala/akka/camel/TestSupport.scala index 0102d06f9f..55e6b13d53 100644 --- a/akka-camel/src/test/scala/akka/camel/TestSupport.scala +++ b/akka-camel/src/test/scala/akka/camel/TestSupport.scala @@ -13,7 +13,7 @@ import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, Suite } import org.scalatest.matchers.{ BePropertyMatcher, BePropertyMatchResult } import scala.reflect.ClassTag import akka.actor.{ ActorRef, Props, ActorSystem, Actor } -import concurrent.Await +import scala.concurrent.Await import akka.util.Timeout import akka.testkit.AkkaSpec diff --git a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala index 35dfb53b18..f353669bd3 100644 --- a/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala +++ b/akka-camel/src/test/scala/akka/camel/internal/component/ActorProducerTest.scala @@ -25,7 +25,7 @@ import akka.actor.ActorSystem.Settings import akka.event.LoggingAdapter import akka.testkit.{ TestLatch, TimingTest, TestKit, TestProbe } import org.apache.camel.impl.DefaultCamelContext -import concurrent.{ Await, Promise, Future } +import scala.concurrent.{ Await, Promise, Future } import akka.util.Timeout import akka.actor._ import akka.testkit._ diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 11d0a0e2ec..e8bd2e21fc 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -22,7 +22,7 @@ import java.io.Closeable import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicReference import akka.util.internal.HashedWheelTimer -import concurrent.{ ExecutionContext, Await } +import scala.concurrent.{ ExecutionContext, Await } import com.typesafe.config.ConfigFactory /** diff --git a/akka-contrib/src/main/scala/akka/contrib/jul/JulEventHandler.scala b/akka-contrib/src/main/scala/akka/contrib/jul/JulEventHandler.scala index 68ce0ed973..6e971ddfcb 100644 --- a/akka-contrib/src/main/scala/akka/contrib/jul/JulEventHandler.scala +++ b/akka-contrib/src/main/scala/akka/contrib/jul/JulEventHandler.scala @@ -4,7 +4,7 @@ import akka.event.Logging._ import akka.actor._ import akka.event.LoggingAdapter import java.util.logging -import concurrent.{ ExecutionContext, Future } +import scala.concurrent.{ ExecutionContext, Future } /** * Makes the Akka `Logging` API available as the `log` diff --git a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst index 0a7d3da4df..61411c3675 100644 --- a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst +++ b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst @@ -25,6 +25,12 @@ Search Replace with If you need to convert from Java to ``scala.collection.immutable.Seq`` or ``scala.collection.immutable.Iterable`` you should use ``akka.japi.Util.immutableSeq(…)``, and if you need to convert from Scala you can simply switch to using immutable collections yourself or use the ``to[immutable.]`` method. +ActorContext & ActorRefFactory dispatcher +========================================= + +The return type of ``ActorContext``'s and ``ActorRefFactory``'s ``dispatcher``-method now returns ``ExecutionContext`` instead of ``MessageDispatcher``. + + API changes to FSM and TestFSMRef ================================= diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index a64b9b3a71..254fcc30b0 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -10,10 +10,10 @@ import akka.event.{ Logging, LoggingAdapter, EventStream } import akka.event.Logging.Error import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension } import akka.pattern.pipe -import scala.concurrent.Future import scala.util.control.NonFatal import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook } import scala.util.control.Exception.Catcher +import scala.concurrent.{ ExecutionContext, Future } object RemoteActorRefProvider { private case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef) @@ -125,7 +125,7 @@ class RemoteActorRefProvider( override def guardian: LocalActorRef = local.guardian override def systemGuardian: LocalActorRef = local.systemGuardian override def terminationFuture: Future[Unit] = local.terminationFuture - override def dispatcher: MessageDispatcher = local.dispatcher + override def dispatcher: ExecutionContext = local.dispatcher override def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = local.registerTempActor(actorRef, path) override def unregisterTempActor(path: ActorPath): Unit = local.unregisterTempActor(path) override def tempPath(): ActorPath = local.tempPath() diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala index ab32cbefe4..1efd89c2fc 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala @@ -39,8 +39,6 @@ private[remote] object FailureInjectorTransportAdapter { private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transport, val extendedSystem: ExtendedActorSystem) extends AbstractTransportAdapter(wrappedTransport)(extendedSystem.dispatcher) with AssociationEventListener { - import extendedSystem.dispatcher - private def rng = ThreadLocalRandom.current() private val log = Logging(extendedSystem, "FailureInjector (gremlin)") diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpecSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpecSpec.scala index 35dd83e7f3..b21ef8dd03 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpecSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpecSpec.scala @@ -10,7 +10,7 @@ import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers import akka.actor._ import com.typesafe.config.ConfigFactory -import concurrent.Await +import scala.concurrent.Await import scala.concurrent.duration._ import akka.actor.DeadLetter import akka.pattern.ask