diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 486761279a..6ff9cccda8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -125,6 +125,10 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend "An ActorSystem" must { + "use scala.concurrent.Future's InternalCallbackEC" in { + system.asInstanceOf[ActorSystemImpl].internalCallingThreadExecutionContext.getClass.getName must be === "scala.concurrent.Future$InternalCallbackExecutor$" + } + "reject invalid names" in { for ( n ← Seq( diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 11f36cfc22..228cb33252 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -53,12 +53,9 @@ trait ActorRefProvider { */ def settings: ActorSystem.Settings - //FIXME Only here because of AskSupport, should be dealt with - def dispatcher: ExecutionContext - /** * Initialization of an ActorRefProvider happens in two steps: first - * construction of the object with settings, eventStream, scheduler, etc. + * construction of the object with settings, eventStream, etc. * and then—when the ActorSystem is constructed—the second phase during * which actors may be created (e.g. the guardians). */ @@ -69,9 +66,6 @@ trait ActorRefProvider { */ def deployer: Deployer - //FIXME WHY IS THIS HERE? - def scheduler: Scheduler - /** * Generates and returns a unique actor path below “/temp”. */ @@ -333,7 +327,6 @@ class LocalActorRefProvider private[akka] ( _systemName: String, override val settings: ActorSystem.Settings, val eventStream: EventStream, - override val scheduler: Scheduler, val dynamicAccess: DynamicAccess, override val deployer: Deployer, _deadLetters: Option[ActorPath ⇒ InternalActorRef]) @@ -343,12 +336,10 @@ class LocalActorRefProvider private[akka] ( def this(_systemName: String, settings: ActorSystem.Settings, eventStream: EventStream, - scheduler: Scheduler, dynamicAccess: DynamicAccess) = this(_systemName, settings, eventStream, - scheduler, dynamicAccess, new Deployer(settings, dynamicAccess), None) @@ -477,8 +468,6 @@ class LocalActorRefProvider private[akka] ( @volatile private var system: ActorSystemImpl = _ - def dispatcher: ExecutionContext = system.dispatcher - lazy val terminationPromise: Promise[Unit] = Promise[Unit]() def terminationFuture: Future[Unit] = terminationPromise.future diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 3b5408b5f9..ef134dae2a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -538,7 +538,6 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, classOf[String] -> name, classOf[Settings] -> settings, classOf[EventStream] -> eventStream, - classOf[Scheduler] -> scheduler, classOf[DynamicAccess] -> dynamicAccess) dynamicAccess.createInstanceFor[ActorRefProvider](ProviderClass, arguments).get @@ -547,16 +546,14 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, def deadLetters: ActorRef = provider.deadLetters //FIXME Why do we need this at all? - val deadLetterQueue: MessageQueue = new MessageQueue { + val deadLetterMailbox: Mailbox = new Mailbox(new MessageQueue { def enqueue(receiver: ActorRef, envelope: Envelope): Unit = deadLetters.tell(DeadLetter(envelope.message, envelope.sender, receiver), envelope.sender) def dequeue() = null def hasMessages = false def numberOfMessages = 0 def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = () - } - //FIXME Why do we need this at all? - val deadLetterMailbox: Mailbox = new Mailbox(deadLetterQueue) { + }) { becomeClosed() def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = deadLetters ! DeadLetter(handle, receiver, receiver) @@ -569,6 +566,13 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, val dispatcher: ExecutionContext = dispatchers.defaultGlobalDispatcher + val internalCallingThreadExecutionContext: ExecutionContext = + dynamicAccess.getObjectFor[ExecutionContext]("scala.concurrent.Future$InternalCallbackExecutor$").getOrElse( + new ExecutionContext with BatchingExecutor { + override protected def unbatchedExecute(r: Runnable): Unit = r.run() + override def reportFailure(t: Throwable): Unit = dispatcher reportFailure t + }) + def terminationFuture: Future[Unit] = provider.terminationFuture def lookupRoot: InternalActorRef = provider.rootGuardian def guardian: LocalActorRef = provider.guardian diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 2d4efe6464..2dc293bf3c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -311,7 +311,7 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) } if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run() - messageQueue.cleanUp(actor.self, actor.systemImpl.deadLetterQueue) + messageQueue.cleanUp(actor.self, actor.systemImpl.deadLetterMailbox.messageQueue) } } diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 7905acbea3..daaa9fb6b0 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -88,8 +88,7 @@ final class AskableActorRef(val actorRef: ActorRef) extends AnyVal { case ref: InternalActorRef ⇒ if (timeout.duration.length <= 0) Future.failed[Any](new IllegalArgumentException("Timeout length for an `ask` must be greater or equal to 1. Question not sent to [%s]" format actorRef)) else { - val provider = ref.provider - val a = PromiseActorRef(provider, timeout) + val a = PromiseActorRef(ref.provider, timeout) actorRef.tell(message, a) a.result.future } @@ -166,6 +165,9 @@ private[akka] final class PromiseActorRef private (val provider: ActorRefProvide override def getParent: InternalActorRef = provider.tempContainer + def internalCallingThreadExecutionContext: ExecutionContext = + provider.guardian.underlying.systemImpl.internalCallingThreadExecutionContext + /** * Contract of this method: * Must always return the same ActorPath, which must have @@ -250,10 +252,11 @@ private[akka] object PromiseActorRef { private case class StoppedWithPath(path: ActorPath) def apply(provider: ActorRefProvider, timeout: Timeout): PromiseActorRef = { - implicit val ec = provider.dispatcher // TODO should we take an ExecutionContext in the method signature? val result = Promise[Any]() + val scheduler = provider.guardian.underlying.system.scheduler val a = new PromiseActorRef(provider, result) - val f = provider.scheduler.scheduleOnce(timeout.duration) { result tryComplete Failure(new AskTimeoutException("Timed out")) } + implicit val ec = a.internalCallingThreadExecutionContext + val f = scheduler.scheduleOnce(timeout.duration) { result tryComplete Failure(new AskTimeoutException("Timed out")) } result.future onComplete { _ ⇒ try a.stop() finally f.cancel() } a } diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index b8f60ed00d..2fa9f0876e 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -45,25 +45,19 @@ trait GracefulStopSupport { * } * }}} */ - def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any = PoisonPill)(implicit system: ActorSystem): Future[Boolean] = { + def gracefulStop(target: ActorRef, timeout: FiniteDuration, stopMessage: Any = PoisonPill): Future[Boolean] = { if (target.isTerminated) Future successful true - else system match { - case e: ExtendedActorSystem ⇒ - import e.dispatcher // TODO take implicit ExecutionContext/MessageDispatcher in method signature? - val internalTarget = target.asInstanceOf[InternalActorRef] - val ref = PromiseActorRef(e.provider, Timeout(timeout)) - internalTarget.sendSystemMessage(Watch(target, ref)) - val f = ref.result.future - f onComplete { // Just making sure we're not leaking here - case Success(Terminated(a)) if a.path == target.path ⇒ () - case _ ⇒ internalTarget.sendSystemMessage(Unwatch(target, ref)) - } - target ! stopMessage - f map { - case Terminated(a) if a.path == target.path ⇒ true - case _ ⇒ false - } - case s ⇒ throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'") + else { + val internalTarget = target.asInstanceOf[InternalActorRef] + val ref = PromiseActorRef(internalTarget.provider, Timeout(timeout)) + internalTarget.sendSystemMessage(Watch(target, ref)) + target.tell(stopMessage, Actor.noSender) + ref.result.future.transform( + { + case Terminated(t) if t.path == target.path ⇒ true + case _ ⇒ { internalTarget.sendSystemMessage(Unwatch(target, ref)); false } + }, + t ⇒ { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ref.internalCallingThreadExecutionContext) } } } diff --git a/akka-actor/src/main/scala/akka/pattern/Patterns.scala b/akka-actor/src/main/scala/akka/pattern/Patterns.scala index 484181a71e..26489b3869 100644 --- a/akka-actor/src/main/scala/akka/pattern/Patterns.scala +++ b/akka-actor/src/main/scala/akka/pattern/Patterns.scala @@ -104,8 +104,8 @@ object Patterns { * If the target actor isn't terminated within the timeout the [[scala.concurrent.Future]] * is completed with failure [[akka.pattern.AskTimeoutException]]. */ - def gracefulStop(target: ActorRef, timeout: FiniteDuration, system: ActorSystem): Future[java.lang.Boolean] = - scalaGracefulStop(target, timeout)(system).asInstanceOf[Future[java.lang.Boolean]] + def gracefulStop(target: ActorRef, timeout: FiniteDuration): Future[java.lang.Boolean] = + scalaGracefulStop(target, timeout).asInstanceOf[Future[java.lang.Boolean]] /** * Returns a [[scala.concurrent.Future]] that will be completed with the success or failure of the provided Callable diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index c880496077..7e78feeded 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -1231,9 +1231,9 @@ trait ScatterGatherFirstCompletedLike { this: RouterConfig ⇒ { case (sender, message) ⇒ - val provider: ActorRefProvider = routeeProvider.context.asInstanceOf[ActorCell].systemImpl.provider + val refProvider = routeeProvider.context.system.asInstanceOf[ExtendedActorSystem].provider + val asker = akka.pattern.PromiseActorRef(refProvider, within) import routeeProvider.context.dispatcher - val asker = akka.pattern.PromiseActorRef(provider, within) asker.result.future.pipeTo(sender) toAll(asker, routeeProvider.routees) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index bde47e3c34..a5c8696a43 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -46,9 +46,8 @@ private[akka] class ClusterActorRefProvider( _systemName: String, _settings: ActorSystem.Settings, _eventStream: EventStream, - _scheduler: Scheduler, _dynamicAccess: DynamicAccess) extends RemoteActorRefProvider( - _systemName, _settings, _eventStream, _scheduler, _dynamicAccess) { + _systemName, _settings, _eventStream, _dynamicAccess) { @volatile private var remoteDeploymentWatcher: ActorRef = _ diff --git a/akka-docs/rst/java/code/docs/actor/UntypedActorDocTestBase.java b/akka-docs/rst/java/code/docs/actor/UntypedActorDocTestBase.java index 1c696577eb..40b4f469df 100644 --- a/akka-docs/rst/java/code/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/rst/java/code/docs/actor/UntypedActorDocTestBase.java @@ -203,7 +203,7 @@ public class UntypedActorDocTestBase { //#gracefulStop try { Future stopped = - gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS), system); + gracefulStop(actorRef, Duration.create(5, TimeUnit.SECONDS)); Await.result(stopped, Duration.create(6, TimeUnit.SECONDS)); // the actor has been stopped } catch (AskTimeoutException e) { diff --git a/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala b/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala index f002f73d63..935c6a7def 100644 --- a/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/actor/ActorDocSpec.scala @@ -334,7 +334,7 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { import scala.concurrent.Await try { - val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds)(system) + val stopped: Future[Boolean] = gracefulStop(actorRef, 5 seconds) Await.result(stopped, 6 seconds) // the actor has been stopped } catch { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 4ac2b95229..c02c09f4bd 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -101,7 +101,6 @@ private[akka] class RemoteActorRefProvider( val systemName: String, val settings: ActorSystem.Settings, val eventStream: EventStream, - val scheduler: Scheduler, val dynamicAccess: DynamicAccess) extends ActorRefProvider { import RemoteActorRefProvider._ @@ -115,7 +114,7 @@ private[akka] class RemoteActorRefProvider( */ protected def createDeployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess) - private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, dynamicAccess, deployer, + private val local = new LocalActorRefProvider(systemName, settings, eventStream, dynamicAccess, deployer, Some(deadLettersPath ⇒ new RemoteDeadLetterActorRef(this, deadLettersPath, eventStream))) @volatile @@ -130,7 +129,6 @@ private[akka] class RemoteActorRefProvider( override def guardian: LocalActorRef = local.guardian override def systemGuardian: LocalActorRef = local.systemGuardian override def terminationFuture: Future[Unit] = local.terminationFuture - override def dispatcher: ExecutionContext = local.dispatcher override def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = local.registerTempActor(actorRef, path) override def unregisterTempActor(path: ActorPath): Unit = local.unregisterTempActor(path) override def tempPath(): ActorPath = local.tempPath() diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index 1fe6fc21a7..12a32a590b 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -464,7 +464,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends // Shutdown all endpoints and signal to sender when ready (and whether all endpoints were shut down gracefully) val sys = context.system // Avoid closing over context Future sequence endpoints.allEndpoints.map { - gracefulStop(_, settings.FlushWait, EndpointWriter.FlushAndStop)(sys) + gracefulStop(_, settings.FlushWait, EndpointWriter.FlushAndStop) } map { _.foldLeft(true) { _ && _ } } pipeTo sender // Ignore all other writes context.become(flushing) diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 19b6b0b1d9..9f087c26e8 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -16,12 +16,11 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference import scala.annotation.tailrec import scala.collection.immutable.Queue -import scala.concurrent.Future -import scala.concurrent.Promise +import scala.concurrent.{ Future, Promise } +import scala.concurrent.duration._ import scala.math.min import scala.util.{ Success, Failure } import scala.util.control.NonFatal -import scala.concurrent.duration._ import akka.dispatch.sysmsg.{ Unwatch, Watch } class ThrottlerProvider extends TransportAdapterProvider { @@ -158,13 +157,12 @@ object ThrottlerTransportAdapter { } } -class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedActorSystem) - extends ActorTransportAdapter(_wrappedTransport, _system) { +class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedActorSystem) extends ActorTransportAdapter(_wrappedTransport, _system) { override protected def addedSchemeIdentifier = SchemeIdentifier override protected def maximumOverhead = 0 - protected def managerName = s"throttlermanager.${wrappedTransport.schemeIdentifier}${UniqueId.getAndIncrement}" - protected def managerProps = { + protected def managerName: String = s"throttlermanager.${wrappedTransport.schemeIdentifier}${UniqueId.getAndIncrement}" + protected def managerProps: Props = { val wt = wrappedTransport Props(new ThrottlerManager(wt)) } @@ -172,11 +170,9 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA override def managementCommand(cmd: Any): Future[Boolean] = { import ActorTransportAdapter.AskTimeout cmd match { - case s: SetThrottle ⇒ - manager ? s map { case SetThrottleAck ⇒ true } - case f: ForceDisassociate ⇒ - manager ? f map { case ForceDisassociateAck ⇒ true } - case _ ⇒ wrappedTransport.managementCommand(cmd) + case s: SetThrottle ⇒ manager ? s map { case SetThrottleAck ⇒ true } + case f: ForceDisassociate ⇒ manager ? f map { case ForceDisassociateAck ⇒ true } + case _ ⇒ wrappedTransport.managementCommand(cmd) } } } @@ -268,40 +264,34 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A } private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction): Future[SetThrottleAck.type] = { - import ActorTransportAdapter.AskTimeout if (direction.includes(Direction.Send)) handle.outboundThrottleMode.set(mode) if (direction.includes(Direction.Receive)) - askWithDeathCompletion(handle.throttlerActor, mode, SetThrottleAck).mapTo[SetThrottleAck.type] + askModeWithDeathCompletion(handle.throttlerActor, mode)(ActorTransportAdapter.AskTimeout) else Future.successful(SetThrottleAck) } - private def wrapHandle(originalHandle: AssociationHandle, listener: AssociationEventListener, inbound: Boolean): ThrottlerHandle = { - val managerRef = self - val throttlerActor = context.actorOf(Props(new ThrottledAssociation(managerRef, listener, originalHandle, inbound)), - "throttler" + nextId()) - ThrottlerHandle(originalHandle, throttlerActor) - } - - private def askWithDeathCompletion(target: ActorRef, question: Any, answer: Any)(implicit timeout: Timeout): Future[Any] = { - if (target.isTerminated) Future successful answer + private def askModeWithDeathCompletion(target: ActorRef, mode: ThrottleMode)(implicit timeout: Timeout): Future[SetThrottleAck.type] = { + if (target.isTerminated) Future successful SetThrottleAck else { val internalTarget = target.asInstanceOf[InternalActorRef] - val promiseActorRef = PromiseActorRef(context.system.asInstanceOf[ExtendedActorSystem].provider, timeout) - internalTarget.sendSystemMessage(Watch(target, promiseActorRef)) - val future = promiseActorRef.result.future - future onComplete { // remember to unwatch if termination didn't complete - case Success(Terminated(`target`)) ⇒ () - case _ ⇒ internalTarget.sendSystemMessage(Unwatch(target, promiseActorRef)) - } - target.tell(question, promiseActorRef) - future map { - case Terminated(`target`) ⇒ answer - case x ⇒ x - } + val ref = PromiseActorRef(internalTarget.provider, timeout) + internalTarget.sendSystemMessage(Watch(target, ref)) + target.tell(mode, ref) + ref.result.future.transform({ + case Terminated(t) if t.path == target.path ⇒ SetThrottleAck + case SetThrottleAck ⇒ { internalTarget.sendSystemMessage(Unwatch(target, ref)); SetThrottleAck } + }, t ⇒ { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ref.internalCallingThreadExecutionContext) } } + + private def wrapHandle(originalHandle: AssociationHandle, listener: AssociationEventListener, inbound: Boolean): ThrottlerHandle = { + val managerRef = self + ThrottlerHandle( + originalHandle, + context.actorOf(Props(new ThrottledAssociation(managerRef, listener, originalHandle, inbound)), "throttler" + nextId())) + } } /** diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 361c6d4c29..4830a8fc49 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -343,7 +343,7 @@ class CallingThreadMailbox(_receiver: akka.actor.Cell, val mailboxType: MailboxT val qq = queue CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, qq) super.cleanUp() - qq.cleanUp(actor.self, actor.systemImpl.deadLetterQueue) + qq.cleanUp(actor.self, actor.systemImpl.deadLetterMailbox.messageQueue) q.remove() } }