#2986 - Changing ActorContext and ActorRefFactory's dispatcher to return ExecutionContext
This commit is contained in:
parent
46a6e34103
commit
e519e3dc3b
20 changed files with 33 additions and 35 deletions
|
|
@ -10,7 +10,7 @@ import scala.concurrent.duration._
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
import org.scalatest.BeforeAndAfter
|
import org.scalatest.BeforeAndAfter
|
||||||
import akka.actor.{ ActorSystem, Scheduler }
|
import akka.actor.{ ActorSystem, Scheduler }
|
||||||
import concurrent.{ ExecutionContext, Future, Await }
|
import scala.concurrent.{ ExecutionContext, Future, Await }
|
||||||
|
|
||||||
object CircuitBreakerSpec {
|
object CircuitBreakerSpec {
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,6 +15,7 @@ import akka.dispatch.{ Watch, Unwatch, Terminate, SystemMessage, Suspend, Superv
|
||||||
import akka.event.Logging.{ LogEvent, Debug, Error }
|
import akka.event.Logging.{ LogEvent, Debug, Error }
|
||||||
import akka.japi.Procedure
|
import akka.japi.Procedure
|
||||||
import akka.dispatch.NullMessage
|
import akka.dispatch.NullMessage
|
||||||
|
import scala.concurrent.ExecutionContext
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The actor context - the view of the actor cell from the actor.
|
* The actor context - the view of the actor cell from the actor.
|
||||||
|
|
@ -119,7 +120,7 @@ trait ActorContext extends ActorRefFactory {
|
||||||
* Returns the dispatcher (MessageDispatcher) that is used for this Actor.
|
* Returns the dispatcher (MessageDispatcher) that is used for this Actor.
|
||||||
* Importing this member will place a implicit MessageDispatcher in scope.
|
* Importing this member will place a implicit MessageDispatcher in scope.
|
||||||
*/
|
*/
|
||||||
implicit def dispatcher: MessageDispatcher
|
implicit def dispatcher: ExecutionContext
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The system that the actor belongs to.
|
* The system that the actor belongs to.
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ import java.util.concurrent.TimeUnit
|
||||||
*
|
*
|
||||||
* {{{
|
* {{{
|
||||||
* import ActorDSL._
|
* import ActorDSL._
|
||||||
* import concurrent.util.duration._
|
* import scala.concurrent.util.duration._
|
||||||
*
|
*
|
||||||
* implicit val system: ActorSystem = ...
|
* implicit val system: ActorSystem = ...
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -11,8 +11,8 @@ import akka.util.{ Switch, Helpers }
|
||||||
import akka.japi.Util.immutableSeq
|
import akka.japi.Util.immutableSeq
|
||||||
import akka.util.Collections.EmptyImmutableSeq
|
import akka.util.Collections.EmptyImmutableSeq
|
||||||
import scala.util.{ Success, Failure }
|
import scala.util.{ Success, Failure }
|
||||||
import scala.concurrent.{ Future, Promise }
|
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
import scala.concurrent.{ ExecutionContext, Future, Promise }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Interface for all ActorRef providers to implement.
|
* Interface for all ActorRef providers to implement.
|
||||||
|
|
@ -51,8 +51,8 @@ trait ActorRefProvider {
|
||||||
*/
|
*/
|
||||||
def settings: ActorSystem.Settings
|
def settings: ActorSystem.Settings
|
||||||
|
|
||||||
//FIXME WHY IS THIS HERE?
|
//FIXME Only here because of AskSupport, should be dealt with
|
||||||
def dispatcher: MessageDispatcher
|
def dispatcher: ExecutionContext
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialization of an ActorRefProvider happens in two steps: first
|
* Initialization of an ActorRefProvider happens in two steps: first
|
||||||
|
|
@ -169,7 +169,7 @@ trait ActorRefFactory {
|
||||||
/**
|
/**
|
||||||
* Returns the default MessageDispatcher associated with this ActorRefFactory
|
* Returns the default MessageDispatcher associated with this ActorRefFactory
|
||||||
*/
|
*/
|
||||||
implicit def dispatcher: MessageDispatcher
|
implicit def dispatcher: ExecutionContext
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Father of all children created by this interface.
|
* Father of all children created by this interface.
|
||||||
|
|
@ -469,7 +469,7 @@ class LocalActorRefProvider(
|
||||||
@volatile
|
@volatile
|
||||||
private var system: ActorSystemImpl = _
|
private var system: ActorSystemImpl = _
|
||||||
|
|
||||||
def dispatcher: MessageDispatcher = system.dispatcher
|
def dispatcher: ExecutionContext = system.dispatcher
|
||||||
|
|
||||||
lazy val terminationPromise: Promise[Unit] = Promise[Unit]()
|
lazy val terminationPromise: Promise[Unit] = Promise[Unit]()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -318,7 +318,7 @@ abstract class ActorSystem extends ActorRefFactory {
|
||||||
* explicitly.
|
* explicitly.
|
||||||
* Importing this member will place the default MessageDispatcher in scope.
|
* Importing this member will place the default MessageDispatcher in scope.
|
||||||
*/
|
*/
|
||||||
implicit def dispatcher: MessageDispatcher
|
implicit def dispatcher: ExecutionContext
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register a block of code (callback) to run after ActorSystem.shutdown has been issued and
|
* Register a block of code (callback) to run after ActorSystem.shutdown has been issued and
|
||||||
|
|
@ -564,7 +564,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
|
||||||
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
|
val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(
|
||||||
threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings))
|
threadFactory, eventStream, deadLetterMailbox, scheduler, dynamicAccess, settings))
|
||||||
|
|
||||||
val dispatcher: MessageDispatcher = dispatchers.defaultGlobalDispatcher
|
val dispatcher: ExecutionContext = dispatchers.defaultGlobalDispatcher
|
||||||
|
|
||||||
def terminationFuture: Future[Unit] = provider.terminationFuture
|
def terminationFuture: Future[Unit] = provider.terminationFuture
|
||||||
def lookupRoot: InternalActorRef = provider.rootGuardian
|
def lookupRoot: InternalActorRef = provider.rootGuardian
|
||||||
|
|
|
||||||
|
|
@ -8,11 +8,12 @@ import scala.annotation.tailrec
|
||||||
import akka.dispatch.{ Terminate, SystemMessage, Suspend, Resume, Recreate, MessageDispatcher, Mailbox, Envelope, Create }
|
import akka.dispatch.{ Terminate, SystemMessage, Suspend, Resume, Recreate, MessageDispatcher, Mailbox, Envelope, Create }
|
||||||
import akka.event.Logging.Error
|
import akka.event.Logging.Error
|
||||||
import akka.util.Unsafe
|
import akka.util.Unsafe
|
||||||
import scala.util.control.NonFatal
|
|
||||||
import akka.dispatch.NullMessage
|
import akka.dispatch.NullMessage
|
||||||
import akka.actor.{ NoSerializationVerificationNeeded, InvalidMessageException, ActorRef, ActorCell }
|
import akka.actor.{ NoSerializationVerificationNeeded, InvalidMessageException, ActorRef, ActorCell }
|
||||||
import akka.serialization.SerializationExtension
|
import akka.serialization.SerializationExtension
|
||||||
|
import scala.util.control.NonFatal
|
||||||
import scala.util.control.Exception.Catcher
|
import scala.util.control.Exception.Catcher
|
||||||
|
import scala.concurrent.ExecutionContext
|
||||||
|
|
||||||
private[akka] trait Dispatch { this: ActorCell ⇒
|
private[akka] trait Dispatch { this: ActorCell ⇒
|
||||||
|
|
||||||
|
|
@ -32,11 +33,6 @@ private[akka] trait Dispatch { this: ActorCell ⇒
|
||||||
|
|
||||||
val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher)
|
val dispatcher: MessageDispatcher = system.dispatchers.lookup(props.dispatcher)
|
||||||
|
|
||||||
/**
|
|
||||||
* UntypedActorContext impl
|
|
||||||
*/
|
|
||||||
final def getDispatcher(): MessageDispatcher = dispatcher
|
|
||||||
|
|
||||||
final def isTerminated: Boolean = mailbox.isClosed
|
final def isTerminated: Boolean = mailbox.isClosed
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -178,8 +178,6 @@ private[akka] object MessageDispatcher {
|
||||||
println(" -> " + a + status + messages + parent)
|
println(" -> " + a + status + messages + parent)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
implicit def defaultDispatcher(implicit system: ActorSystem): MessageDispatcher = system.dispatcher
|
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContext {
|
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContext {
|
||||||
|
|
|
||||||
|
|
@ -11,8 +11,8 @@ private[akka] object SerializedSuspendableExecutionContext {
|
||||||
final val On = 1
|
final val On = 1
|
||||||
final val Suspended = 2
|
final val Suspended = 2
|
||||||
|
|
||||||
def apply(batchSize: Int)(implicit context: ExecutionContext): SerializedSuspendableExecutionContext =
|
def apply(throughput: Int)(implicit context: ExecutionContext): SerializedSuspendableExecutionContext =
|
||||||
new SerializedSuspendableExecutionContext(batchSize)(context match {
|
new SerializedSuspendableExecutionContext(throughput)(context match {
|
||||||
case s: SerializedSuspendableExecutionContext ⇒ s.context
|
case s: SerializedSuspendableExecutionContext ⇒ s.context
|
||||||
case other ⇒ other
|
case other ⇒ other
|
||||||
})
|
})
|
||||||
|
|
|
||||||
|
|
@ -8,9 +8,8 @@ import akka.camel.internal._
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import akka.actor.{ ActorSystem, Props, ActorRef }
|
import akka.actor.{ ActorSystem, Props, ActorRef }
|
||||||
import akka.pattern._
|
import akka.pattern._
|
||||||
import concurrent.{ ExecutionContext, Future }
|
import scala.concurrent.{ ExecutionContext, Future }
|
||||||
import scala.concurrent.duration.Duration
|
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||||
import scala.concurrent.duration.FiniteDuration
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Activation trait that can be used to wait on activation or de-activation of Camel endpoints.
|
* Activation trait that can be used to wait on activation or de-activation of Camel endpoints.
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@ import akka.camel.internal.ActivationProtocol._
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import org.apache.camel.ProducerTemplate
|
import org.apache.camel.ProducerTemplate
|
||||||
import concurrent.{ Future, ExecutionContext }
|
import scala.concurrent.{ Future, ExecutionContext }
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
import java.io.InputStream
|
import java.io.InputStream
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ import akka.actor._
|
||||||
import TestSupport._
|
import TestSupport._
|
||||||
import org.scalatest.WordSpec
|
import org.scalatest.WordSpec
|
||||||
import akka.testkit.TestLatch
|
import akka.testkit.TestLatch
|
||||||
import concurrent.Await
|
import scala.concurrent.Await
|
||||||
import java.util.concurrent.TimeoutException
|
import java.util.concurrent.TimeoutException
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,7 +18,7 @@ import org.apache.camel.{ FailedToCreateRouteException, CamelExecutionException
|
||||||
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException }
|
import java.util.concurrent.{ ExecutionException, TimeUnit, TimeoutException }
|
||||||
import akka.actor.Status.Failure
|
import akka.actor.Status.Failure
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import concurrent.{ ExecutionContext, Await }
|
import scala.concurrent.{ ExecutionContext, Await }
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -13,7 +13,7 @@ import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, Suite }
|
||||||
import org.scalatest.matchers.{ BePropertyMatcher, BePropertyMatchResult }
|
import org.scalatest.matchers.{ BePropertyMatcher, BePropertyMatchResult }
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
import akka.actor.{ ActorRef, Props, ActorSystem, Actor }
|
import akka.actor.{ ActorRef, Props, ActorSystem, Actor }
|
||||||
import concurrent.Await
|
import scala.concurrent.Await
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -25,7 +25,7 @@ import akka.actor.ActorSystem.Settings
|
||||||
import akka.event.LoggingAdapter
|
import akka.event.LoggingAdapter
|
||||||
import akka.testkit.{ TestLatch, TimingTest, TestKit, TestProbe }
|
import akka.testkit.{ TestLatch, TimingTest, TestKit, TestProbe }
|
||||||
import org.apache.camel.impl.DefaultCamelContext
|
import org.apache.camel.impl.DefaultCamelContext
|
||||||
import concurrent.{ Await, Promise, Future }
|
import scala.concurrent.{ Await, Promise, Future }
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.testkit._
|
import akka.testkit._
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ import java.io.Closeable
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
import akka.util.internal.HashedWheelTimer
|
import akka.util.internal.HashedWheelTimer
|
||||||
import concurrent.{ ExecutionContext, Await }
|
import scala.concurrent.{ ExecutionContext, Await }
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,7 @@ import akka.event.Logging._
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.event.LoggingAdapter
|
import akka.event.LoggingAdapter
|
||||||
import java.util.logging
|
import java.util.logging
|
||||||
import concurrent.{ ExecutionContext, Future }
|
import scala.concurrent.{ ExecutionContext, Future }
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Makes the Akka `Logging` API available as the `log`
|
* Makes the Akka `Logging` API available as the `log`
|
||||||
|
|
|
||||||
|
|
@ -25,6 +25,12 @@ Search Replace with
|
||||||
If you need to convert from Java to ``scala.collection.immutable.Seq`` or ``scala.collection.immutable.Iterable`` you should use ``akka.japi.Util.immutableSeq(…)``,
|
If you need to convert from Java to ``scala.collection.immutable.Seq`` or ``scala.collection.immutable.Iterable`` you should use ``akka.japi.Util.immutableSeq(…)``,
|
||||||
and if you need to convert from Scala you can simply switch to using immutable collections yourself or use the ``to[immutable.<collection-type>]`` method.
|
and if you need to convert from Scala you can simply switch to using immutable collections yourself or use the ``to[immutable.<collection-type>]`` method.
|
||||||
|
|
||||||
|
ActorContext & ActorRefFactory dispatcher
|
||||||
|
=========================================
|
||||||
|
|
||||||
|
The return type of ``ActorContext``'s and ``ActorRefFactory``'s ``dispatcher``-method now returns ``ExecutionContext`` instead of ``MessageDispatcher``.
|
||||||
|
|
||||||
|
|
||||||
API changes to FSM and TestFSMRef
|
API changes to FSM and TestFSMRef
|
||||||
=================================
|
=================================
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,10 +10,10 @@ import akka.event.{ Logging, LoggingAdapter, EventStream }
|
||||||
import akka.event.Logging.Error
|
import akka.event.Logging.Error
|
||||||
import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension }
|
import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension }
|
||||||
import akka.pattern.pipe
|
import akka.pattern.pipe
|
||||||
import scala.concurrent.Future
|
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook }
|
import akka.actor.SystemGuardian.{ TerminationHookDone, TerminationHook, RegisterTerminationHook }
|
||||||
import scala.util.control.Exception.Catcher
|
import scala.util.control.Exception.Catcher
|
||||||
|
import scala.concurrent.{ ExecutionContext, Future }
|
||||||
|
|
||||||
object RemoteActorRefProvider {
|
object RemoteActorRefProvider {
|
||||||
private case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef)
|
private case class Internals(transport: RemoteTransport, serialization: Serialization, remoteDaemon: InternalActorRef)
|
||||||
|
|
@ -125,7 +125,7 @@ class RemoteActorRefProvider(
|
||||||
override def guardian: LocalActorRef = local.guardian
|
override def guardian: LocalActorRef = local.guardian
|
||||||
override def systemGuardian: LocalActorRef = local.systemGuardian
|
override def systemGuardian: LocalActorRef = local.systemGuardian
|
||||||
override def terminationFuture: Future[Unit] = local.terminationFuture
|
override def terminationFuture: Future[Unit] = local.terminationFuture
|
||||||
override def dispatcher: MessageDispatcher = local.dispatcher
|
override def dispatcher: ExecutionContext = local.dispatcher
|
||||||
override def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = local.registerTempActor(actorRef, path)
|
override def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = local.registerTempActor(actorRef, path)
|
||||||
override def unregisterTempActor(path: ActorPath): Unit = local.unregisterTempActor(path)
|
override def unregisterTempActor(path: ActorPath): Unit = local.unregisterTempActor(path)
|
||||||
override def tempPath(): ActorPath = local.tempPath()
|
override def tempPath(): ActorPath = local.tempPath()
|
||||||
|
|
|
||||||
|
|
@ -39,8 +39,6 @@ private[remote] object FailureInjectorTransportAdapter {
|
||||||
private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transport, val extendedSystem: ExtendedActorSystem)
|
private[remote] class FailureInjectorTransportAdapter(wrappedTransport: Transport, val extendedSystem: ExtendedActorSystem)
|
||||||
extends AbstractTransportAdapter(wrappedTransport)(extendedSystem.dispatcher) with AssociationEventListener {
|
extends AbstractTransportAdapter(wrappedTransport)(extendedSystem.dispatcher) with AssociationEventListener {
|
||||||
|
|
||||||
import extendedSystem.dispatcher
|
|
||||||
|
|
||||||
private def rng = ThreadLocalRandom.current()
|
private def rng = ThreadLocalRandom.current()
|
||||||
private val log = Logging(extendedSystem, "FailureInjector (gremlin)")
|
private val log = Logging(extendedSystem, "FailureInjector (gremlin)")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ import org.scalatest.WordSpec
|
||||||
import org.scalatest.matchers.MustMatchers
|
import org.scalatest.matchers.MustMatchers
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.actor.DeadLetter
|
import akka.actor.DeadLetter
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue