From d26453b5e8cc0a6728d070c3f26f2eab2f3991c0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 10 Mar 2020 15:39:30 +0100 Subject: [PATCH] Clean up same thread execution contexts #26690 * deprecate internal sameThread ec and use a new one for all internal use sites * Use the respective Scala version standard library "same thread" ec * fallback to the old inline impl on 2.12 when reflection isn't possible --- .../scala/akka/actor/ActorSystemSpec.scala | 10 ---- .../akka/actor/CoordinatedShutdownSpec.scala | 4 +- .../akka/dispatch/ExecutionContextSpec.scala | 2 +- .../SameThreadExecutionContextSpec.scala | 59 +++++++++++++++++++ .../akka/io/TcpIntegrationSpecSupport.scala | 2 +- .../typed/internal/ActorContextImpl.scala | 3 +- .../typed/internal/TimerSchedulerImpl.scala | 7 +-- .../internal/adapter/ActorSystemAdapter.scala | 4 +- ...690-same-thread-execution-context.excludes | 3 + .../internal/SameThreadExecutionContext.scala | 19 ++++++ .../internal/SameThreadExecutionContext.scala | 37 ++++++++++++ .../scala/akka/actor/ActorSelection.scala | 2 +- .../main/scala/akka/actor/ActorSystem.scala | 8 --- .../akka/actor/CoordinatedShutdown.scala | 4 +- .../src/main/scala/akka/dispatch/Future.scala | 18 +++++- .../main/scala/akka/pattern/AskSupport.scala | 8 +-- .../scala/akka/pattern/CircuitBreaker.scala | 12 ++-- .../akka/pattern/GracefulStopSupport.scala | 6 +- .../routing/ScatterGatherFirstCompleted.scala | 6 +- .../cluster/sharding/ShardCoordinator.scala | 2 +- .../scala/akka/remote/artery/Handshake.scala | 2 +- .../remote/artery/SystemMessageDelivery.scala | 4 +- .../artery/tcp/ArteryTcpTransport.scala | 2 +- .../transport/ThrottlerTransportAdapter.scala | 4 +- .../akka/remote/artery/TestContext.scala | 2 +- .../scala/akka/stream/io/FileSinkSpec.scala | 4 +- .../main/scala/akka/stream/KillSwitch.scala | 2 +- .../scala/akka/stream/impl/MaybeSource.scala | 3 +- .../scala/akka/stream/impl/QueueSource.scala | 2 +- .../main/scala/akka/stream/impl/Sinks.scala | 4 +- .../main/scala/akka/stream/impl/Unfold.scala | 2 +- .../impl/UnfoldResourceSourceAsync.scala | 4 +- .../akka/stream/impl/fusing/GraphStages.scala | 4 +- .../scala/akka/stream/impl/fusing/Ops.scala | 14 ++--- .../scala/akka/stream/impl/io/TcpStages.scala | 7 +-- .../main/scala/akka/stream/javadsl/Flow.scala | 16 ++--- .../scala/akka/stream/javadsl/Queue.scala | 4 +- .../main/scala/akka/stream/javadsl/Sink.scala | 32 ++++------ .../scala/akka/stream/javadsl/Source.scala | 8 +-- .../main/scala/akka/stream/javadsl/Tcp.scala | 26 ++++---- .../scala/akka/stream/scaladsl/Flow.scala | 2 +- .../stream/scaladsl/FlowWithContextOps.scala | 2 +- .../scala/akka/stream/scaladsl/Queue.scala | 7 +-- .../scala/akka/stream/scaladsl/Sink.scala | 12 ++-- .../test/scala/akka/testkit/AkkaSpec.scala | 28 +++++---- 45 files changed, 248 insertions(+), 165 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/dispatch/SameThreadExecutionContextSpec.scala create mode 100644 akka-actor/src/main/mima-filters/2.6.3.backwards.excludes/issue-26690-same-thread-execution-context.excludes create mode 100644 akka-actor/src/main/scala-2.13+/akka/dispatch/internal/SameThreadExecutionContext.scala create mode 100644 akka-actor/src/main/scala-2.13-/akka/dispatch/internal/SameThreadExecutionContext.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala index 9e7df54f74..9d7fb67878 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorSystemSpec.scala @@ -20,7 +20,6 @@ import com.typesafe.config.{ Config, ConfigFactory } import scala.concurrent.duration._ import scala.concurrent.{ Await, Future } import scala.language.postfixOps -import scala.util.Properties object ActorSystemSpec { @@ -120,15 +119,6 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend "An ActorSystem" must { - "use scala.concurrent InternalCallbackExecutor/parasitic" in { - val ec = system.asInstanceOf[ActorSystemImpl].internalCallingThreadExecutionContext - val scalaVersion = Properties.versionNumberString - if (scalaVersion.startsWith("2.13") && scalaVersion != "2.13.0-M5") - ec.getClass.getName should ===("scala.concurrent.ExecutionContext$parasitic$") - else - ec.getClass.getName should ===("scala.concurrent.Future$InternalCallbackExecutor$") - } - "reject invalid names" in { for (n <- Seq( "-hallowelt", diff --git a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala index 0f4c22aef5..65c9897d6e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/CoordinatedShutdownSpec.scala @@ -308,10 +308,10 @@ class CoordinatedShutdownSpec Future { testProbe.ref ! BMessage("concurrentB") Done - }(ExecutionContexts.sameThreadExecutionContext) + }(ExecutionContexts.parasitic) } Done - }(ExecutionContexts.sameThreadExecutionContext) + }(ExecutionContexts.parasitic) val cancellationFut: Future[Done] = { val cancellables = (0 until 20).map { _ => diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala index ba52cad34e..9d9a85a374 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ExecutionContextSpec.scala @@ -139,7 +139,7 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout { } "work with same-thread executor plus blocking" in { - val ec = akka.dispatch.ExecutionContexts.sameThreadExecutionContext + val ec = akka.dispatch.ExecutionContexts.parasitic var x = 0 ec.execute(new Runnable { override def run = { diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/SameThreadExecutionContextSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/SameThreadExecutionContextSpec.scala new file mode 100644 index 0000000000..1da2d58228 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/dispatch/SameThreadExecutionContextSpec.scala @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.dispatch + +import akka.Done +import akka.dispatch.internal.SameThreadExecutionContext +import akka.testkit.AkkaSpec +import org.scalatest.matchers.should.Matchers + +import scala.concurrent.ExecutionContext +import scala.concurrent.Promise + +class SameThreadExecutionContextSpec extends AkkaSpec with Matchers { + + "The SameThreadExecutionContext" should { + + "return a Scala specific version" in { + val ec = SameThreadExecutionContext() + if (util.Properties.versionNumberString.startsWith("2.12")) { + ec.getClass.getName should ===("scala.concurrent.Future$InternalCallbackExecutor$") + } else { + // in 2.13 and higher parasitic is available + ec.getClass.getName should ===("scala.concurrent.ExecutionContext$parasitic$") + } + } + + "should run follow up future operations in the same dispatcher" in { + // covered by the respective impl test suites for sure but just in case + val promise = Promise[Done]() + val futureThreadNames = promise.future + .map { _ => + Thread.currentThread().getName + }(system.dispatcher) + .map(firstName => firstName -> Thread.currentThread().getName)(SameThreadExecutionContext()) + + promise.success(Done) + val (threadName1, threadName2) = futureThreadNames.futureValue + threadName1 should ===(threadName2) + } + + "should run follow up future operations in the same execution context" in { + // covered by the respective impl test suites for sure but just in case + val promise = Promise[Done]() + val futureThreadNames = promise.future + .map { _ => + Thread.currentThread().getName + }(ExecutionContext.global) + .map(firstName => firstName -> Thread.currentThread().getName)(SameThreadExecutionContext()) + + promise.success(Done) + val (threadName1, threadName2) = futureThreadNames.futureValue + threadName1 should ===(threadName2) + } + + } + +} diff --git a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala index b70c7ddd7a..67b7e6a9b2 100644 --- a/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala +++ b/akka-actor-tests/src/test/scala/akka/io/TcpIntegrationSpecSupport.scala @@ -23,7 +23,7 @@ trait TcpIntegrationSpecSupport { _: AkkaSpec => // terminate clientSystem after server system system.whenTerminated.onComplete { _ => res.terminate() - }(ExecutionContexts.sameThreadExecutionContext) + }(ExecutionContexts.parasitic) res } else system val bindHandler = TestProbe() diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala index 14286f8267..ce71d32ab8 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/ActorContextImpl.scala @@ -211,8 +211,7 @@ import org.slf4j.LoggerFactory // Scala API impl def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] => T): Unit = { - future.onComplete(value => self.unsafeUpcast ! AdaptMessage(value, mapResult))( - ExecutionContexts.sameThreadExecutionContext) + future.onComplete(value => self.unsafeUpcast ! AdaptMessage(value, mapResult))(ExecutionContexts.parasitic) } // Java API impl diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala index b27a1e01cb..7b8639d359 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/TimerSchedulerImpl.scala @@ -103,14 +103,13 @@ import org.slf4j.Logger val task = mode match { case SingleMode => - ctx.system.scheduler - .scheduleOnce(delay, () => ctx.self.unsafeUpcast ! timerMsg)(ExecutionContexts.sameThreadExecutionContext) + ctx.system.scheduler.scheduleOnce(delay, () => ctx.self.unsafeUpcast ! timerMsg)(ExecutionContexts.parasitic) case FixedDelayMode => ctx.system.scheduler.scheduleWithFixedDelay(delay, delay)(() => ctx.self.unsafeUpcast ! timerMsg)( - ExecutionContexts.sameThreadExecutionContext) + ExecutionContexts.parasitic) case FixedRateMode => ctx.system.scheduler.scheduleAtFixedRate(delay, delay)(() => ctx.self.unsafeUpcast ! timerMsg)( - ExecutionContexts.sameThreadExecutionContext) + ExecutionContexts.parasitic) } val nextTimer = Timer(key, msg, mode.repeat, nextGen, task) diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala index a6738a23a1..e9ed35ac97 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/adapter/ActorSystemAdapter.scala @@ -97,11 +97,11 @@ import org.slf4j.{ Logger, LoggerFactory } override def uptime: Long = classicSystem.uptime override def printTree: String = system.printTree - import akka.dispatch.ExecutionContexts.sameThreadExecutionContext + import akka.dispatch.ExecutionContexts.parasitic override def terminate(): Unit = system.terminate() override lazy val whenTerminated: scala.concurrent.Future[akka.Done] = - system.whenTerminated.map(_ => Done)(sameThreadExecutionContext) + system.whenTerminated.map(_ => Done)(parasitic) override lazy val getWhenTerminated: CompletionStage[akka.Done] = FutureConverters.toJava(whenTerminated) diff --git a/akka-actor/src/main/mima-filters/2.6.3.backwards.excludes/issue-26690-same-thread-execution-context.excludes b/akka-actor/src/main/mima-filters/2.6.3.backwards.excludes/issue-26690-same-thread-execution-context.excludes new file mode 100644 index 0000000000..1331a6b3c2 --- /dev/null +++ b/akka-actor/src/main/mima-filters/2.6.3.backwards.excludes/issue-26690-same-thread-execution-context.excludes @@ -0,0 +1,3 @@ +# Internals changed +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorSystemImpl.internalCallingThreadExecutionContext") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.PromiseActorRef.internalCallingThreadExecutionContext") \ No newline at end of file diff --git a/akka-actor/src/main/scala-2.13+/akka/dispatch/internal/SameThreadExecutionContext.scala b/akka-actor/src/main/scala-2.13+/akka/dispatch/internal/SameThreadExecutionContext.scala new file mode 100644 index 0000000000..005b0ee59d --- /dev/null +++ b/akka-actor/src/main/scala-2.13+/akka/dispatch/internal/SameThreadExecutionContext.scala @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.dispatch.internal + +import akka.annotation.InternalApi + +import scala.concurrent.ExecutionContext + +/** + * Factory to create same thread ec. Not intended to be called from any other site than to create [[akka.dispatch.ExecutionContexts#parasitic]] + * + * INTERNAL API + */ +@InternalApi +private[dispatch] object SameThreadExecutionContext { + def apply(): ExecutionContext = ExecutionContext.parasitic +} diff --git a/akka-actor/src/main/scala-2.13-/akka/dispatch/internal/SameThreadExecutionContext.scala b/akka-actor/src/main/scala-2.13-/akka/dispatch/internal/SameThreadExecutionContext.scala new file mode 100644 index 0000000000..e9be19ee60 --- /dev/null +++ b/akka-actor/src/main/scala-2.13-/akka/dispatch/internal/SameThreadExecutionContext.scala @@ -0,0 +1,37 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.dispatch.internal + +import akka.actor.ReflectiveDynamicAccess +import akka.annotation.InternalApi +import akka.dispatch.BatchingExecutor + +import scala.concurrent.ExecutionContext +import scala.util.control.NonFatal + +/** + * Factory to create same thread ec. Not intended to be called from any other site than to create [[akka.dispatch.ExecutionContexts#parasitic]] + * + * INTERNAL API + */ +@InternalApi +private[dispatch] object SameThreadExecutionContext { + def apply(): ExecutionContext = { + try { + // we don't want to introduce a dependency on the actor system to use the same thread execution context + val dynamicAccess = new ReflectiveDynamicAccess(getClass.getClassLoader) + dynamicAccess.getObjectFor[ExecutionContext]("scala.concurrent.Future$InternalCallbackExecutor$").get + } catch { + case NonFatal(_) => + // fallback to custom impl in case reflection is not available/possible + new ExecutionContext with BatchingExecutor { + override protected def unbatchedExecute(runnable: Runnable): Unit = runnable.run() + override protected def resubmitOnBlock: Boolean = false // No point since we execute on same thread + override def reportFailure(t: Throwable): Unit = + throw new IllegalStateException("exception in sameThreadExecutionContext", t) + } + } + } +} diff --git a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala index 704eae10eb..63c371ac3c 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSelection.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSelection.scala @@ -68,7 +68,7 @@ abstract class ActorSelection extends Serializable { * [[ActorRef]]. */ def resolveOne()(implicit timeout: Timeout): Future[ActorRef] = { - implicit val ec = ExecutionContexts.sameThreadExecutionContext + implicit val ec = ExecutionContexts.parasitic val p = Promise[ActorRef]() this.ask(Identify(None)).onComplete { case Success(ActorIdentity(_, Some(ref))) => p.success(ref) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 342a3d1c39..a8e1988299 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -965,14 +965,6 @@ private[akka] class ActorSystemImpl( val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher - val internalCallingThreadExecutionContext: ExecutionContext = - dynamicAccess - .getObjectFor[ExecutionContext]("scala.concurrent.Future$InternalCallbackExecutor$") - .getOrElse( - dynamicAccess - .getObjectFor[ExecutionContext]("scala.concurrent.ExecutionContext$parasitic$") - .getOrElse(ExecutionContexts.sameThreadExecutionContext)) - private[this] final val terminationCallbacks = new TerminationCallbacks(provider.terminationFuture)(dispatcher) override def whenTerminated: Future[Terminated] = terminationCallbacks.terminationFuture diff --git a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala index ac6a9f8254..e7f9731a58 100644 --- a/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala +++ b/akka-actor/src/main/scala/akka/actor/CoordinatedShutdown.scala @@ -251,7 +251,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi system.whenTerminated.map { _ => if (exitJvm && !runningJvmHook) System.exit(exitCode) Done - }(ExecutionContexts.sameThreadExecutionContext) + }(ExecutionContexts.parasitic) } else if (exitJvm) { System.exit(exitCode) Future.successful(Done) @@ -458,7 +458,7 @@ final class CoordinatedShutdown private[akka] ( override val size: Int = tasks.size override def run(recoverEnabled: Boolean)(implicit ec: ExecutionContext): Future[Done] = { - Future.sequence(tasks.map(_.run(recoverEnabled))).map(_ => Done)(ExecutionContexts.sameThreadExecutionContext) + Future.sequence(tasks.map(_.run(recoverEnabled))).map(_ => Done)(ExecutionContexts.parasitic) } // This method may be run multiple times during the compare-and-set loop of ConcurrentHashMap, so it must be side-effect-free diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 737eab92d9..712ccf61f0 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -16,7 +16,9 @@ import scala.util.{ Failure, Success, Try } import java.util.concurrent.CompletionStage import java.util.concurrent.CompletableFuture +import akka.annotation.InternalApi import akka.compat +import akka.dispatch.internal.SameThreadExecutionContext import akka.util.unused import com.github.ghik.silencer.silent @@ -80,13 +82,25 @@ object ExecutionContexts { * This is an execution context which runs everything on the calling thread. * It is very useful for actions which are known to be non-blocking and * non-throwing in order to save a round-trip to the thread pool. + * + * INTERNAL API */ + // Once Scala 2.12 is no longer supported this can be dropped in favour of directly using [[ExecutionContext.parasitic]] + @InternalApi + private[akka] val parasitic: ExecutionContext = SameThreadExecutionContext() + + /** + * INTERNAL API + */ + @InternalApi + @deprecated("Use ExecutionContexts.parasitic instead", "2.6.4") private[akka] object sameThreadExecutionContext extends ExecutionContext with BatchingExecutor { - override protected def unbatchedExecute(runnable: Runnable): Unit = runnable.run() + override protected def unbatchedExecute(runnable: Runnable): Unit = parasitic.execute(runnable) override protected def resubmitOnBlock: Boolean = false // No point since we execute on same thread override def reportFailure(t: Throwable): Unit = - throw new IllegalStateException("exception in sameThreadExecutionContext", t) + parasitic.reportFailure(t) } + } /** diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 1502bd2a9c..f546ab12c0 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -8,12 +8,13 @@ import java.util.concurrent.TimeoutException import akka.actor._ import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts import akka.dispatch.sysmsg._ import akka.util.{ Timeout, Unsafe } import com.github.ghik.silencer.silent import scala.annotation.tailrec -import scala.concurrent.{ ExecutionContext, Future, Promise } +import scala.concurrent.{ Future, Promise } import scala.language.implicitConversions import scala.util.{ Failure, Success } @@ -543,9 +544,6 @@ private[akka] final class PromiseActorRef private ( override def getParent: InternalActorRef = provider.tempContainer - def internalCallingThreadExecutionContext: ExecutionContext = - provider.guardian.underlying.systemImpl.internalCallingThreadExecutionContext - /** * Contract of this method: * Must always return the same ActorPath, which must have @@ -657,7 +655,7 @@ private[akka] object PromiseActorRef { val result = Promise[Any]() val scheduler = provider.guardian.underlying.system.scheduler val a = new PromiseActorRef(provider, result, messageClassName) - implicit val ec = a.internalCallingThreadExecutionContext + implicit val ec = ExecutionContexts.parasitic val f = scheduler.scheduleOnce(timeout.duration) { result.tryComplete { val wasSentBy = if (sender == ActorRef.noSender) "" else s" was sent by [$sender]" diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index 005f45f594..e19e5372a0 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -22,7 +22,7 @@ import scala.concurrent.duration._ import scala.concurrent.TimeoutException import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } -import akka.dispatch.ExecutionContexts.sameThreadExecutionContext +import akka.dispatch.ExecutionContexts.parasitic import com.github.ghik.silencer.silent import scala.compat.java8.FutureConverters @@ -49,7 +49,7 @@ object CircuitBreaker { maxFailures: Int, callTimeout: FiniteDuration, resetTimeout: FiniteDuration): CircuitBreaker = - new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(sameThreadExecutionContext) + new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(parasitic) /** * Java API: Create a new CircuitBreaker. @@ -751,8 +751,6 @@ class CircuitBreaker( val start = System.nanoTime() val p = Promise[T]() - implicit val ec = sameThreadExecutionContext - p.future.onComplete { fResult => if (defineFailureFn(fResult)) { callFails() @@ -760,13 +758,13 @@ class CircuitBreaker( notifyCallSuccessListeners(start) callSucceeds() } - } + }(parasitic) val timeout = scheduler.scheduleOnce(callTimeout) { if (p.tryFailure(timeoutEx)) { notifyCallTimeoutListeners(start) } - } + }(parasitic) materialize(body).onComplete { case Success(result) => @@ -777,7 +775,7 @@ class CircuitBreaker( notifyCallFailureListeners(start) } timeout.cancel - } + }(parasitic) p.future } } diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index f851576204..7d5ddf2402 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -5,8 +5,10 @@ package akka.pattern import akka.actor._ -import akka.util.{ Timeout } +import akka.dispatch.ExecutionContexts +import akka.util.Timeout import akka.dispatch.sysmsg.{ Unwatch, Watch } + import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration @@ -52,6 +54,6 @@ trait GracefulStopSupport { ref.result.future.transform({ case Terminated(t) if t.path == target.path => true case _ => { internalTarget.sendSystemMessage(Unwatch(target, ref)); false } - }, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ref.internalCallingThreadExecutionContext) + }, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ExecutionContexts.parasitic) } } diff --git a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala index 502dca30ce..ec27af01bd 100644 --- a/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala +++ b/akka-actor/src/main/scala/akka/routing/ScatterGatherFirstCompleted.scala @@ -43,13 +43,12 @@ private[akka] final case class ScatterGatherFirstCompletedRoutees( within: FiniteDuration) extends Routee { - override def send(message: Any, sender: ActorRef): Unit = + override def send(message: Any, sender: ActorRef): Unit = { + implicit val ec = ExecutionContexts.parasitic if (routees.isEmpty) { - implicit val ec = ExecutionContexts.sameThreadExecutionContext val reply = Future.failed(new TimeoutException("Timeout due to no routees")) reply.pipeTo(sender) } else { - implicit val ec = ExecutionContexts.sameThreadExecutionContext implicit val timeout = Timeout(within) val promise = Promise[Any]() routees.foreach { @@ -62,6 +61,7 @@ private[akka] final case class ScatterGatherFirstCompletedRoutees( promise.future.pipeTo(sender) } + } } /** diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 2f369fe40c..8f9fc8b13f 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -138,7 +138,7 @@ object ShardCoordinator { currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]], rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = { import akka.util.ccompat.JavaConverters._ - implicit val ec = ExecutionContexts.sameThreadExecutionContext + implicit val ec = ExecutionContexts.parasitic rebalance(currentShardAllocations.asJava, rebalanceInProgress.asJava).map(_.asScala.toSet) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala index 32bce9a7f0..6914c62a9a 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Handshake.scala @@ -295,7 +295,7 @@ private[remote] class InboundHandshake(inboundContext: InboundContext, inControl // periodically. thenInside() case None => - first.onComplete(_ => runInStage.invoke(thenInside))(ExecutionContexts.sameThreadExecutionContext) + first.onComplete(_ => runInStage.invoke(thenInside))(ExecutionContexts.parasitic) } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala index 248f6f1b51..91681e3467 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/SystemMessageDelivery.scala @@ -109,9 +109,7 @@ import akka.util.PrettyDuration.PrettyPrintableDuration if (isAvailable(out)) pull(in) // onPull from downstream already called } - outboundContext.controlSubject - .attach(this) - .foreach(callback.invoke)(ExecutionContexts.sameThreadExecutionContext) + outboundContext.controlSubject.attach(this).foreach(callback.invoke)(ExecutionContexts.parasitic) } override def postStop(): Unit = { diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala index 473576036b..487c262da3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -273,7 +273,7 @@ private[remote] class ArteryTcpTransport( s"Failed to bind TCP to [$bindHost:$bindPort] due to: " + e.getMessage, e)) - }(ExecutionContexts.sameThreadExecutionContext) + }(ExecutionContexts.parasitic) // only on initial startup, when ActorSystem is starting val b = Await.result(binding, settings.Bind.BindTimeout) diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index a69a8eedae..05cdeb879a 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -22,6 +22,8 @@ import akka.util.{ ByteString, Timeout } import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicReference +import akka.dispatch.ExecutionContexts + import scala.annotation.tailrec import scala.collection.immutable.Queue import scala.concurrent.{ Future, Promise } @@ -361,7 +363,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) ref.result.future.transform({ case Terminated(t) if t.path == target.path => SetThrottleAck case SetThrottleAck => { internalTarget.sendSystemMessage(Unwatch(target, ref)); SetThrottleAck } - }, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ref.internalCallingThreadExecutionContext) + }, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ExecutionContexts.parasitic) } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala index 0b2612b9ad..b180368cee 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/TestContext.scala @@ -54,7 +54,7 @@ private[remote] class TestInboundContext( val done = a.completeHandshake(peer) done.foreach { _ => associationsByUid.put(peer.uid, a) - }(ExecutionContexts.sameThreadExecutionContext) + }(ExecutionContexts.parasitic) done } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala index e41de761d2..1fb034e72a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/FileSinkSpec.scala @@ -204,9 +204,9 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with ScalaFutures .lazyInitAsync(() => Future.successful(FileIO.toPath(f))) // map a Future[Option[Future[IOResult]]] into a Future[Option[IOResult]] .mapMaterializedValue(_.flatMap { - case Some(future) => future.map(Some(_))(ExecutionContexts.sameThreadExecutionContext) + case Some(future) => future.map(Some(_))(ExecutionContexts.parasitic) case None => Future.successful(None) - }(ExecutionContexts.sameThreadExecutionContext))) + }(ExecutionContexts.parasitic))) Await.result(completion, 3.seconds) checkFileContents(f, TestLines.head) diff --git a/akka-stream/src/main/scala/akka/stream/KillSwitch.scala b/akka-stream/src/main/scala/akka/stream/KillSwitch.scala index 9d09505de9..9ae737576f 100644 --- a/akka-stream/src/main/scala/akka/stream/KillSwitch.scala +++ b/akka-stream/src/main/scala/akka/stream/KillSwitch.scala @@ -61,7 +61,7 @@ object KillSwitches { case _ => // callback.invoke is a simple actor send, so it is fine to run on the invoking thread terminationSignal.onComplete(getAsyncCallback[Try[Done]](onSwitch).invoke)( - akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + akka.dispatch.ExecutionContexts.parasitic) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala b/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala index 63f2ca10b7..b2d532b94e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/MaybeSource.scala @@ -39,8 +39,7 @@ import scala.util.Try handleCompletion(value) case None => // callback on future completion - promise.future.onComplete(getAsyncCallback(handleCompletion).invoke)( - ExecutionContexts.sameThreadExecutionContext) + promise.future.onComplete(getAsyncCallback(handleCompletion).invoke)(ExecutionContexts.parasitic) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala index 3896180804..5c7ef660dd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/QueueSource.scala @@ -223,7 +223,7 @@ import scala.concurrent.{ Future, Promise } .onComplete { case scala.util.Success(_) => case scala.util.Failure(e) => p.tryFailure(e) - }(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + }(akka.dispatch.ExecutionContexts.parasitic) p.future } override def complete(): Unit = callback.invoke(Completion) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 45cdb1bba5..158d6ac40a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -398,7 +398,7 @@ import scala.util.control.NonFatal .foreach { case NonFatal(e) => p.tryFailure(e) case _ => () - }(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + }(akka.dispatch.ExecutionContexts.parasitic) p.future } override def cancel(): Unit = { @@ -572,7 +572,7 @@ import scala.util.control.NonFatal failStage(e) } try { - sinkFactory(element).onComplete(cb.invoke)(ExecutionContexts.sameThreadExecutionContext) + sinkFactory(element).onComplete(cb.invoke)(ExecutionContexts.parasitic) } catch { case NonFatal(e) => promise.failure(e) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala index 540ab72ae3..896be90077 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala @@ -59,7 +59,7 @@ import scala.util.{ Failure, Success, Try } asyncHandler = ac.invoke } - def onPull(): Unit = f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + def onPull(): Unit = f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.parasitic) setHandler(out, this) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala index 9864a33450..3feb338de6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/UnfoldResourceSourceAsync.scala @@ -6,7 +6,7 @@ package akka.stream.impl import akka.Done import akka.annotation.InternalApi -import akka.dispatch.ExecutionContexts.sameThreadExecutionContext +import akka.dispatch.ExecutionContexts.parasitic import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream._ import akka.stream.impl.Stages.DefaultAttributes @@ -83,7 +83,7 @@ import scala.util.control.NonFatal state match { case Some(resource) => try { - readData(resource).onComplete(readCallback)(sameThreadExecutionContext) + readData(resource).onComplete(readCallback)(parasitic) } catch errorHandler case None => // we got a pull but there is no open resource, we are either diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala index 20e443d95c..47ceb941a2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphStages.scala @@ -303,7 +303,7 @@ import scala.concurrent.{ Future, Promise } onFutureSourceCompleted(it) case _ => val cb = getAsyncCallback[Try[Graph[SourceShape[T], M]]](onFutureSourceCompleted).invoke _ - futureSource.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) // could be optimised FastFuture-like + futureSource.onComplete(cb)(ExecutionContexts.parasitic) // could be optimised FastFuture-like } // initial handler (until future completes) @@ -387,7 +387,7 @@ import scala.concurrent.{ Future, Promise } onFutureCompleted(completed) case None => val cb = getAsyncCallback[Try[T]](onFutureCompleted).invoke _ - future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) + future.onComplete(cb)(ExecutionContexts.parasitic) } def onFutureCompleted(result: Try[T]): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index ebb3d3dc8d..934ed08262 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -469,8 +469,6 @@ private[stream] object Collect { private var current: Out = zero private var elementHandled: Boolean = false - private def ec = ExecutionContexts.sameThreadExecutionContext - private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider private val ZeroHandler: OutHandler with InHandler = new OutHandler with InHandler { @@ -544,7 +542,7 @@ private[stream] object Collect { eventualCurrent.value match { case Some(result) => futureCB(result) - case _ => eventualCurrent.onComplete(futureCB)(ec) + case _ => eventualCurrent.onComplete(futureCB)(ExecutionContexts.parasitic) } } catch { case NonFatal(ex) => @@ -652,8 +650,6 @@ private[stream] object Collect { aggregator = zero } - private def ec = ExecutionContexts.sameThreadExecutionContext - private val futureCB = getAsyncCallback[Try[Out]] { case Success(update) if update != null => aggregator = update @@ -711,7 +707,7 @@ private[stream] object Collect { private def handleAggregatingValue(): Unit = { aggregating.value match { case Some(result) => futureCB(result) // already completed - case _ => aggregating.onComplete(futureCB)(ec) + case _ => aggregating.onComplete(futureCB)(ExecutionContexts.parasitic) } } @@ -1291,7 +1287,7 @@ private[stream] object Collect { buffer.enqueue(holder) future.value match { - case None => future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + case None => future.onComplete(holder)(akka.dispatch.ExecutionContexts.parasitic) case Some(v) => // #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and // run the logic directly on this thread @@ -1405,7 +1401,7 @@ private[stream] object Collect { val future = f(grab(in)) inFlight += 1 future.value match { - case None => future.onComplete(invokeFutureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) + case None => future.onComplete(invokeFutureCB)(akka.dispatch.ExecutionContexts.parasitic) case Some(v) => futureCompleted(v) } } catch { @@ -2285,7 +2281,7 @@ private[stream] object Collect { onFlowFutureComplete(element)(completed) case None => val cb = getAsyncCallback[Try[Flow[I, O, M]]](onFlowFutureComplete(element)) - futureFlow.onComplete(cb.invoke)(ExecutionContexts.sameThreadExecutionContext) + futureFlow.onComplete(cb.invoke)(ExecutionContexts.parasitic) } } catch { case NonFatal(e) => diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala index b95e37dd9f..d69d1510ac 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/TcpStages.scala @@ -86,7 +86,7 @@ import scala.concurrent.{ Future, Promise } thisStage.tell(Unbind, thisStage) } unbindPromise.future - }, unbindPromise.future.map(_ => Done)(ExecutionContexts.sameThreadExecutionContext))) + }, unbindPromise.future.map(_ => Done)(ExecutionContexts.parasitic))) case f: CommandFailed => val ex = new BindFailedException { // cannot modify the actual exception class for compatibility reasons @@ -533,10 +533,7 @@ private[stream] object ConnectionSourceStage { remoteAddress, eagerMaterializer) - ( - logic, - localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))( - ExecutionContexts.sameThreadExecutionContext)) + (logic, localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))(ExecutionContexts.parasitic)) } override def toString = s"TCP-to($remoteAddress)" diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index ec89ba2925..12ab4fc8b2 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -263,8 +263,7 @@ object Flow { fallback: function.Creator[M]): Flow[I, O, M] = { import scala.compat.java8.FutureConverters._ val sflow = scaladsl.Flow - .fromGraph(new LazyFlow[I, O, M](t => - flowFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext))) + .fromGraph(new LazyFlow[I, O, M](t => flowFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.parasitic))) .mapMaterializedValue(_ => fallback.create()) new Flow(sflow) } @@ -291,13 +290,9 @@ object Flow { import scala.compat.java8.FutureConverters._ val sflow = scaladsl.Flow - .lazyInitAsync(() => flowFactory.create().toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext)) - .mapMaterializedValue( - fut => - fut - .map(_.fold[Optional[M]](Optional.empty())(m => Optional.ofNullable(m)))( - ExecutionContexts.sameThreadExecutionContext) - .toJava) + .lazyInitAsync(() => flowFactory.create().toScala.map(_.asScala)(ExecutionContexts.parasitic)) + .mapMaterializedValue(fut => + fut.map(_.fold[Optional[M]](Optional.empty())(m => Optional.ofNullable(m)))(ExecutionContexts.parasitic).toJava) new Flow(sflow) } @@ -353,8 +348,7 @@ object Flow { def lazyCompletionStageFlow[I, O, M]( create: Creator[CompletionStage[Flow[I, O, M]]]): Flow[I, O, CompletionStage[M]] = scaladsl.Flow - .lazyFutureFlow[I, O, M](() => - create.create().toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext)) + .lazyFutureFlow[I, O, M](() => create.create().toScala.map(_.asScala)(ExecutionContexts.parasitic)) .mapMaterializedValue(_.toJava) .asJava diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala index 0b392baeff..1bc96ba2e6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Queue.scala @@ -8,6 +8,7 @@ import java.util.Optional import java.util.concurrent.CompletionStage import akka.Done +import akka.dispatch.ExecutionContexts import akka.stream.QueueOfferResult import scala.compat.java8.FutureConverters._ @@ -128,10 +129,9 @@ object SinkQueueWithCancel { // would have been better to add `asScala` in SinkQueueWithCancel trait, but not doing // that for backwards compatibility reasons new akka.stream.scaladsl.SinkQueueWithCancel[T] { - import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext => same } override def pull(): Future[Option[T]] = - queue.pull().toScala.map(_.asScala)(same) + queue.pull().toScala.map(_.asScala)(ExecutionContexts.parasitic) override def cancel(): Unit = queue.cancel() } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index d5c0880a34..9e4638e8e5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -116,7 +116,7 @@ object Sink { f: function.Function[T, CompletionStage[Void]]): Sink[T, CompletionStage[Done]] = new Sink( scaladsl.Sink - .foreachAsync(parallelism)((x: T) => f(x).toScala.map(_ => ())(ExecutionContexts.sameThreadExecutionContext)) + .foreachAsync(parallelism)((x: T) => f(x).toScala.map(_ => ())(ExecutionContexts.parasitic)) .toCompletionStage()) /** @@ -163,10 +163,7 @@ object Sink { * See also [[head]]. */ def headOption[In](): Sink[In, CompletionStage[Optional[In]]] = - new Sink( - scaladsl.Sink - .headOption[In] - .mapMaterializedValue(_.map(_.asJava)(ExecutionContexts.sameThreadExecutionContext).toJava)) + new Sink(scaladsl.Sink.headOption[In].mapMaterializedValue(_.map(_.asJava)(ExecutionContexts.parasitic).toJava)) /** * A `Sink` that materializes into a `CompletionStage` of the last value received. @@ -186,10 +183,7 @@ object Sink { * See also [[head]], [[takeLast]]. */ def lastOption[In](): Sink[In, CompletionStage[Optional[In]]] = - new Sink( - scaladsl.Sink - .lastOption[In] - .mapMaterializedValue(_.map(_.asJava)(ExecutionContexts.sameThreadExecutionContext).toJava)) + new Sink(scaladsl.Sink.lastOption[In].mapMaterializedValue(_.map(_.asJava)(ExecutionContexts.parasitic).toJava)) /** * A `Sink` that materializes into a a `CompletionStage` of `List` containing the last `n` collected elements. @@ -203,7 +197,7 @@ object Sink { new Sink( scaladsl.Sink .takeLast[In](n) - .mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContexts.sameThreadExecutionContext).toJava)) + .mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContexts.parasitic).toJava)) } /** @@ -219,9 +213,7 @@ object Sink { def seq[In]: Sink[In, CompletionStage[java.util.List[In]]] = { import akka.util.ccompat.JavaConverters._ new Sink( - scaladsl.Sink - .seq[In] - .mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContexts.sameThreadExecutionContext).toJava)) + scaladsl.Sink.seq[In].mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContexts.parasitic).toJava)) } /** @@ -389,7 +381,7 @@ object Sink { new Sink( scaladsl.Sink .lazyInit[T, M]( - t => sinkFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext), + t => sinkFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.parasitic), () => fallback.create()) .mapMaterializedValue(_.toJava)) @@ -406,13 +398,9 @@ object Sink { def lazyInitAsync[T, M]( sinkFactory: function.Creator[CompletionStage[Sink[T, M]]]): Sink[T, CompletionStage[Optional[M]]] = { val sSink = scaladsl.Sink - .lazyInitAsync[T, M](() => - sinkFactory.create().toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext)) - .mapMaterializedValue( - fut => - fut - .map(_.fold(Optional.empty[M]())(m => Optional.ofNullable(m)))(ExecutionContexts.sameThreadExecutionContext) - .toJava) + .lazyInitAsync[T, M](() => sinkFactory.create().toScala.map(_.asScala)(ExecutionContexts.parasitic)) + .mapMaterializedValue(fut => + fut.map(_.fold(Optional.empty[M]())(m => Optional.ofNullable(m)))(ExecutionContexts.parasitic).toJava) new Sink(sSink) } @@ -449,7 +437,7 @@ object Sink { */ def lazyCompletionStageSink[T, M](create: Creator[CompletionStage[Sink[T, M]]]): Sink[T, CompletionStage[M]] = new Sink(scaladsl.Sink.lazyFutureSink { () => - create.create().toScala.map(_.asScala)((ExecutionContexts.sameThreadExecutionContext)) + create.create().toScala.map(_.asScala)((ExecutionContexts.parasitic)) }).mapMaterializedValue(_.toJava) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 6adafd8768..3efcc013bb 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -61,7 +61,7 @@ object Source { new Source(scaladsl.Source.maybe[T].mapMaterializedValue { scalaOptionPromise: Promise[Option[T]] => val javaOptionPromise = new CompletableFuture[Optional[T]]() scalaOptionPromise.completeWith( - javaOptionPromise.toScala.map(_.asScala)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)) + javaOptionPromise.toScala.map(_.asScala)(akka.dispatch.ExecutionContexts.parasitic)) javaOptionPromise }) @@ -255,7 +255,7 @@ object Source { */ def unfoldAsync[S, E](s: S, f: function.Function[S, CompletionStage[Optional[Pair[S, E]]]]): Source[E, NotUsed] = new Source(scaladsl.Source.unfoldAsync(s)((s: S) => - f.apply(s).toScala.map(_.asScala.map(_.toScala))(akka.dispatch.ExecutionContexts.sameThreadExecutionContext))) + f.apply(s).toScala.map(_.asScala.map(_.toScala))(akka.dispatch.ExecutionContexts.parasitic))) /** * Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`. @@ -305,7 +305,7 @@ object Source { */ def completionStageSource[T, M](completionStageSource: CompletionStage[Source[T, M]]): Source[T, CompletionStage[M]] = scaladsl.Source - .futureSource(completionStageSource.toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext)) + .futureSource(completionStageSource.toScala.map(_.asScala)(ExecutionContexts.parasitic)) .mapMaterializedValue(_.toJava) .asJava @@ -766,7 +766,7 @@ object Source { new Source( scaladsl.Source.unfoldResourceAsync[T, S]( () => create.create().toScala, - (s: S) => read.apply(s).toScala.map(_.asScala)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext), + (s: S) => read.apply(s).toScala.map(_.asScala)(akka.dispatch.ExecutionContexts.parasitic), (s: S) => close.apply(s).toScala)) /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala index 98e17483cc..f720e1d2e2 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Tcp.scala @@ -139,7 +139,7 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { import Tcp._ - import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext => ec } + import akka.dispatch.ExecutionContexts.parasitic private lazy val delegate: scaladsl.Tcp = scaladsl.Tcp(system) @@ -175,7 +175,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { delegate .bind(interface, port, backlog, immutableSeq(options), halfClose, optionalDurationToScala(idleTimeout)) .map(new IncomingConnection(_)) - .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).toJava)) /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`. @@ -221,7 +221,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { delegate .bind(interface, port) .map(new IncomingConnection(_)) - .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).toJava)) /** * Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. @@ -259,7 +259,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { halfClose, optionalDurationToScala(connectTimeout), optionalDurationToScala(idleTimeout)) - .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava)) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).toJava)) /** * Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint. @@ -309,7 +309,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { Flow.fromGraph( delegate .outgoingConnection(new InetSocketAddress(host, port)) - .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava)) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).toJava)) /** * Creates an [[Tcp.OutgoingConnection]] with TLS. @@ -330,7 +330,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { Flow.fromGraph( delegate .outgoingTlsConnection(host, port, sslContext, negotiateNewSession) - .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava)) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).toJava)) /** * Creates an [[Tcp.OutgoingConnection]] with TLS. @@ -363,7 +363,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { immutableSeq(options), connectTimeout, idleTimeout) - .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava)) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).toJava)) /** * Creates an [[Tcp.OutgoingConnection]] with TLS. @@ -381,7 +381,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { Flow.fromGraph( delegate .outgoingConnectionWithTls(remoteAddress, createSSLEngine = () => createSSLEngine.get()) - .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava)) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).toJava)) /** * Creates an [[Tcp.OutgoingConnection]] with TLS. @@ -417,7 +417,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { case Some(t) => Failure(t) }, closing) - .mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava)) + .mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).toJava)) } /** @@ -447,7 +447,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { delegate .bindTls(interface, port, sslContext, negotiateNewSession, backlog, immutableSeq(options), idleTimeout) .map(new IncomingConnection(_)) - .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).toJava)) /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` @@ -468,7 +468,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { delegate .bindTls(interface, port, sslContext, negotiateNewSession) .map(new IncomingConnection(_)) - .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).toJava)) /** * Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint` @@ -484,7 +484,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { delegate .bindWithTls(interface, port, createSSLEngine = () => createSSLEngine.get()) .map(new IncomingConnection(_)) - .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).toJava)) } /** @@ -518,7 +518,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension { }, closing) .map(new IncomingConnection(_)) - .mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava)) + .mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).toJava)) } private def optionalDurationToScala(duration: Optional[java.time.Duration]) = { diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 382d4a72e2..c3d79c9e34 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -605,7 +605,7 @@ object Flow { @deprecated("Use 'Flow.lazyFutureFlow' instead", "2.6.0") def lazyInitAsync[I, O, M](flowFactory: () => Future[Flow[I, O, M]]): Flow[I, O, Future[Option[M]]] = Flow.fromGraph(new LazyFlow[I, O, M](_ => flowFactory())).mapMaterializedValue { v => - implicit val ec = akka.dispatch.ExecutionContexts.sameThreadExecutionContext + implicit val ec = akka.dispatch.ExecutionContexts.parasitic v.map[Option[M]](Some.apply _).recover { case _: NeverMaterializedException => None } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala index 109f42a4d2..343904d5e6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala @@ -73,7 +73,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { */ def mapAsync[Out2](parallelism: Int)(f: Out => Future[Out2]): Repr[Out2, Ctx] = via(flow.mapAsync(parallelism) { - case (e, ctx) => f(e).map(o => (o, ctx))(ExecutionContexts.sameThreadExecutionContext) + case (e, ctx) => f(e).map(o => (o, ctx))(ExecutionContexts.parasitic) }) /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala index b425b9280d..7902877bfc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Queue.scala @@ -8,13 +8,13 @@ import java.util.Optional import java.util.concurrent.CompletionStage import scala.concurrent.Future - import akka.Done import akka.stream.QueueOfferResult + import scala.compat.java8.FutureConverters._ import scala.compat.java8.OptionConverters._ - import akka.annotation.InternalApi +import akka.dispatch.ExecutionContexts /** * This trait allows to have a queue as a data source for some stream. @@ -145,9 +145,8 @@ object SinkQueueWithCancel { */ @InternalApi private[akka] def asJava[T](queue: SinkQueueWithCancel[T]): akka.stream.javadsl.SinkQueueWithCancel[T] = new akka.stream.javadsl.SinkQueueWithCancel[T] { - import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext => same } override def pull(): CompletionStage[Optional[T]] = - queue.pull().map(_.asJava)(same).toJava + queue.pull().map(_.asJava)(ExecutionContexts.parasitic).toJava override def cancel(): Unit = queue.cancel() } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index 91e2ec0a12..713cb6deb3 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -194,8 +194,7 @@ object Sink { .fromGraph(new HeadOptionStage[T]) .withAttributes(DefaultAttributes.headSink) .mapMaterializedValue(e => - e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))( - ExecutionContexts.sameThreadExecutionContext)) + e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContexts.parasitic)) /** * A `Sink` that materializes into a `Future` of the optional first value received. @@ -217,7 +216,7 @@ object Sink { def last[T]: Sink[T, Future[T]] = { Sink.fromGraph(new TakeLastStage[T](1)).withAttributes(DefaultAttributes.lastSink).mapMaterializedValue { e => e.map(_.headOption.getOrElse(throw new NoSuchElementException("last of empty stream")))( - ExecutionContexts.sameThreadExecutionContext) + ExecutionContexts.parasitic) } } @@ -230,7 +229,7 @@ object Sink { */ def lastOption[T]: Sink[T, Future[Option[T]]] = { Sink.fromGraph(new TakeLastStage[T](1)).withAttributes(DefaultAttributes.lastOptionSink).mapMaterializedValue { e => - e.map(_.headOption)(ExecutionContexts.sameThreadExecutionContext) + e.map(_.headOption)(ExecutionContexts.parasitic) } } @@ -607,8 +606,7 @@ object Sink { def lazyInit[T, M](sinkFactory: T => Future[Sink[T, M]], fallback: () => M): Sink[T, Future[M]] = Sink .fromGraph(new LazySink[T, M](sinkFactory)) - .mapMaterializedValue( - _.recover { case _: NeverMaterializedException => fallback() }(ExecutionContexts.sameThreadExecutionContext)) + .mapMaterializedValue(_.recover { case _: NeverMaterializedException => fallback() }(ExecutionContexts.parasitic)) /** * Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements, @@ -622,7 +620,7 @@ object Sink { @deprecated("Use 'Sink.lazyFutureSink' instead", "2.6.0") def lazyInitAsync[T, M](sinkFactory: () => Future[Sink[T, M]]): Sink[T, Future[Option[M]]] = Sink.fromGraph(new LazySink[T, M](_ => sinkFactory())).mapMaterializedValue { m => - implicit val ec = ExecutionContexts.sameThreadExecutionContext + implicit val ec = ExecutionContexts.parasitic m.map(Option.apply _).recover { case _: NeverMaterializedException => None } } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 833a4d288d..6610e726db 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -6,23 +6,25 @@ package akka.testkit import java.lang.reflect.Modifier -import org.scalactic.{ CanEqual, TypeCheckedTripleEquals } - -import language.postfixOps -import org.scalatest.BeforeAndAfterAll -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.AnyWordSpecLike import akka.actor.ActorSystem -import akka.event.{ Logging, LoggingAdapter } - -import scala.concurrent.duration._ -import scala.concurrent.Future - -import com.typesafe.config.{ Config, ConfigFactory } import akka.dispatch.Dispatchers +import akka.event.Logging +import akka.event.LoggingAdapter import akka.testkit.TestEvent._ +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import org.scalactic.CanEqual +import org.scalactic.TypeCheckedTripleEquals +import org.scalatest.BeforeAndAfterAll import org.scalatest.concurrent.ScalaFutures -import org.scalatest.time.{ Millis, Span } +import org.scalatest.matchers.should.Matchers +import org.scalatest.time.Millis +import org.scalatest.time.Span +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.language.postfixOps object AkkaSpec { val testConf: Config = ConfigFactory.parseString("""