From 8c48393cd3dbfecb36b17baa2dd6a1ae7a8d5f09 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Fri, 19 Sep 2025 22:17:20 +0800 Subject: [PATCH] chore: Remove pekko.dispatch.ExecutionContexts.parasitic (#2208) * chore: Remove pekko.dispatch.ExecutionContexts.parasitic * . --- .../pekko/actor/CoordinatedShutdownSpec.scala | 5 ++-- .../pekko/dispatch/ExecutionContextSpec.scala | 2 +- .../SameThreadExecutionContextSpec.scala | 7 ++--- .../pekko/io/TcpIntegrationSpecSupport.scala | 4 +-- .../typed/internal/ActorContextImpl.scala | 4 +-- .../typed/internal/TimerSchedulerImpl.scala | 8 ++--- .../internal/adapter/ActorSystemAdapter.scala | 2 +- .../future-converters.excludes | 3 ++ .../internal/SameThreadExecutionContext.scala | 29 ------------------ .../internal/SameThreadExecutionContext.scala | 30 ------------------- .../apache/pekko/actor/ActorSelection.scala | 4 +-- .../pekko/actor/CoordinatedShutdown.scala | 5 ++-- .../org/apache/pekko/dispatch/Future.scala | 17 ----------- .../org/apache/pekko/pattern/AskSupport.scala | 4 +-- .../apache/pekko/pattern/CircuitBreaker.scala | 2 +- .../pekko/pattern/FutureTimeoutSupport.scala | 2 +- .../pekko/pattern/GracefulStopSupport.scala | 4 +-- .../apache/pekko/pattern/StatusReply.scala | 4 +-- .../routing/ScatterGatherFirstCompleted.scala | 4 +-- .../cluster/sharding/ShardCoordinator.scala | 4 +-- .../journal/japi/AsyncRecovery.scala | 6 ++-- .../journal/japi/AsyncWriteJournal.scala | 6 ++-- .../snapshot/japi/SnapshotStore.scala | 10 +++---- .../pekko/remote/artery/Handshake.scala | 4 +-- .../remote/artery/SystemMessageDelivery.scala | 4 +-- .../artery/tcp/ArteryTcpTransport.scala | 3 +- .../transport/ThrottlerTransportAdapter.scala | 4 +-- .../pekko/remote/artery/TestContext.scala | 4 +-- .../stream/impl/SubInletOutletSpec.scala | 6 ++-- .../FlowFlatMapConcatParallelismSpec.scala | 4 +-- .../pekko/stream/scaladsl/SinkSpec.scala | 4 +-- .../stream/typed/scaladsl/ActorFlow.scala | 6 ++-- .../org/apache/pekko/stream/KillSwitch.scala | 2 +- .../pekko/stream/MapAsyncPartitioned.scala | 4 +-- .../pekko/stream/impl/MaybeSource.scala | 4 +-- .../pekko/stream/impl/QueueSource.scala | 2 +- .../org/apache/pekko/stream/impl/Sinks.scala | 6 ++-- .../org/apache/pekko/stream/impl/Unfold.scala | 2 +- .../impl/UnfoldResourceSourceAsync.scala | 2 +- .../stream/impl/fusing/FlattenConcat.scala | 2 +- .../pekko/stream/impl/fusing/FutureFlow.scala | 4 +-- .../stream/impl/fusing/GraphStages.scala | 6 ++-- .../stream/impl/fusing/LazyFutureSource.scala | 4 +-- .../apache/pekko/stream/impl/fusing/Ops.scala | 12 ++++---- .../pekko/stream/impl/io/TcpStages.scala | 6 ++-- .../apache/pekko/stream/javadsl/Flow.scala | 6 ++-- .../apache/pekko/stream/javadsl/Queue.scala | 4 +-- .../apache/pekko/stream/javadsl/Sink.scala | 20 ++++++------- .../apache/pekko/stream/javadsl/Source.scala | 8 ++--- .../org/apache/pekko/stream/javadsl/Tcp.scala | 2 +- .../apache/pekko/stream/scaladsl/Flow.scala | 3 +- .../stream/scaladsl/FlowWithContextOps.scala | 8 ++--- .../apache/pekko/stream/scaladsl/Queue.scala | 4 +-- .../apache/pekko/stream/scaladsl/Sink.scala | 8 ++--- 54 files changed, 124 insertions(+), 200 deletions(-) delete mode 100644 actor/src/main/scala-2.13/org/apache/pekko/dispatch/internal/SameThreadExecutionContext.scala delete mode 100644 actor/src/main/scala-3/org/apache/pekko/dispatch/internal/SameThreadExecutionContext.scala diff --git a/actor-tests/src/test/scala/org/apache/pekko/actor/CoordinatedShutdownSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/actor/CoordinatedShutdownSpec.scala index d2d7554421..9cb9e5ca75 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/actor/CoordinatedShutdownSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/actor/CoordinatedShutdownSpec.scala @@ -18,7 +18,6 @@ import pekko.ConfigurationException import pekko.Done import pekko.actor.CoordinatedShutdown.Phase import pekko.actor.CoordinatedShutdown.UnknownReason -import pekko.dispatch.ExecutionContexts import pekko.testkit.PekkoSpec import pekko.testkit.EventFilter import pekko.testkit.TestKit @@ -323,10 +322,10 @@ class CoordinatedShutdownSpec Future { testProbe.ref ! BMessage("concurrentB") Done - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) } Done - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) val cancellationFut: Future[Done] = { val cancellables = (0 until 20).map { _ => diff --git a/actor-tests/src/test/scala/org/apache/pekko/dispatch/ExecutionContextSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/dispatch/ExecutionContextSpec.scala index 0463b8c595..7cc6c33f12 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/dispatch/ExecutionContextSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/dispatch/ExecutionContextSpec.scala @@ -151,7 +151,7 @@ class ExecutionContextSpec extends PekkoSpec with DefaultTimeout { } "work with same-thread executor plus blocking" in { - val ec = pekko.dispatch.ExecutionContexts.parasitic + val ec = scala.concurrent.ExecutionContext.parasitic var x = 0 ec.execute(new Runnable { override def run = { diff --git a/actor-tests/src/test/scala/org/apache/pekko/dispatch/SameThreadExecutionContextSpec.scala b/actor-tests/src/test/scala/org/apache/pekko/dispatch/SameThreadExecutionContextSpec.scala index 740afaea67..f4c7c04e88 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/dispatch/SameThreadExecutionContextSpec.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/dispatch/SameThreadExecutionContextSpec.scala @@ -20,7 +20,6 @@ import org.scalatest.matchers.should.Matchers import org.apache.pekko import pekko.Done -import pekko.dispatch.internal.SameThreadExecutionContext import pekko.testkit.PekkoSpec class SameThreadExecutionContextSpec extends PekkoSpec with Matchers { @@ -28,7 +27,7 @@ class SameThreadExecutionContextSpec extends PekkoSpec with Matchers { "The SameThreadExecutionContext" should { "return a Scala specific version" in { - val ec = SameThreadExecutionContext() + val ec = ExecutionContext.parasitic // in Scala 2.13 and higher parasitic is available ec.getClass.getName should ===("scala.concurrent.ExecutionContext$parasitic$") } @@ -40,7 +39,7 @@ class SameThreadExecutionContextSpec extends PekkoSpec with Matchers { .map { _ => Thread.currentThread().getName }(system.dispatcher) - .map(firstName => firstName -> Thread.currentThread().getName)(SameThreadExecutionContext()) + .map(firstName => firstName -> Thread.currentThread().getName)(ExecutionContext.parasitic) promise.success(Done) val (threadName1, threadName2) = futureThreadNames.futureValue @@ -54,7 +53,7 @@ class SameThreadExecutionContextSpec extends PekkoSpec with Matchers { .map { _ => Thread.currentThread().getName }(ExecutionContext.global) - .map(firstName => firstName -> Thread.currentThread().getName)(SameThreadExecutionContext()) + .map(firstName => firstName -> Thread.currentThread().getName)(ExecutionContext.parasitic) promise.success(Done) val (threadName1, threadName2) = futureThreadNames.futureValue diff --git a/actor-tests/src/test/scala/org/apache/pekko/io/TcpIntegrationSpecSupport.scala b/actor-tests/src/test/scala/org/apache/pekko/io/TcpIntegrationSpecSupport.scala index 25f76c5b81..170d949fca 100644 --- a/actor-tests/src/test/scala/org/apache/pekko/io/TcpIntegrationSpecSupport.scala +++ b/actor-tests/src/test/scala/org/apache/pekko/io/TcpIntegrationSpecSupport.scala @@ -21,7 +21,7 @@ import Tcp._ import org.apache.pekko import pekko.actor.ActorRef import pekko.actor.ActorSystem -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.io.Inet.SocketOption import pekko.testkit.{ PekkoSpec, TestProbe } import pekko.testkit.SocketUtil.temporaryServerAddress @@ -35,7 +35,7 @@ trait TcpIntegrationSpecSupport { this: PekkoSpec => // terminate clientSystem after server system system.whenTerminated.onComplete { _ => res.terminate() - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) res } else system val bindHandler = TestProbe() diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/ActorContextImpl.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/ActorContextImpl.scala index ec875b6d2d..171b9a4f00 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/ActorContextImpl.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/ActorContextImpl.scala @@ -28,7 +28,7 @@ import org.apache.pekko import pekko.actor.Address import pekko.actor.typed.internal.adapter.ActorSystemAdapter import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.pattern.StatusReply import pekko.util.BoxedType import pekko.util.JavaDurationConverters._ @@ -277,7 +277,7 @@ import scala.util.Success // Scala API impl def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] => T): Unit = { - future.onComplete(value => self.unsafeUpcast ! AdaptMessage(value, mapResult))(ExecutionContexts.parasitic) + future.onComplete(value => self.unsafeUpcast ! AdaptMessage(value, mapResult))(ExecutionContext.parasitic) } // Java API impl diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/TimerSchedulerImpl.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/TimerSchedulerImpl.scala index b9b09d7605..71ac6fec3b 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/TimerSchedulerImpl.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/TimerSchedulerImpl.scala @@ -20,7 +20,7 @@ import org.apache.pekko import pekko.actor.{ Cancellable, NotInfluenceReceiveTimeout } import pekko.actor.typed.scaladsl.{ ActorContext, LoggerOps } import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.util.OptionVal import org.slf4j.Logger @@ -123,13 +123,13 @@ import scala.concurrent.duration.FiniteDuration val task = mode match { case SingleMode => - ctx.system.scheduler.scheduleOnce(delay, () => ctx.self.unsafeUpcast ! timerMsg)(ExecutionContexts.parasitic) + ctx.system.scheduler.scheduleOnce(delay, () => ctx.self.unsafeUpcast ! timerMsg)(ExecutionContext.parasitic) case m: FixedDelayMode => ctx.system.scheduler.scheduleWithFixedDelay(m.initialDelay, delay)(() => ctx.self.unsafeUpcast ! timerMsg)( - ExecutionContexts.parasitic) + ExecutionContext.parasitic) case m: FixedRateMode => ctx.system.scheduler.scheduleAtFixedRate(m.initialDelay, delay)(() => ctx.self.unsafeUpcast ! timerMsg)( - ExecutionContexts.parasitic) + ExecutionContext.parasitic) } val nextTimer = Timer(key, msg, mode.repeat, nextGen, task) diff --git a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/adapter/ActorSystemAdapter.scala b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/adapter/ActorSystemAdapter.scala index 039da8f7be..0dcfff735f 100644 --- a/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/adapter/ActorSystemAdapter.scala +++ b/actor-typed/src/main/scala/org/apache/pekko/actor/typed/internal/adapter/ActorSystemAdapter.scala @@ -114,7 +114,7 @@ import pekko.util.FutureConverters._ override def uptime: Long = classicSystem.uptime override def printTree: String = system.printTree - import org.apache.pekko.dispatch.ExecutionContexts.parasitic + import scala.concurrent.ExecutionContext.parasitic override def terminate(): Unit = system.terminate() override lazy val whenTerminated: scala.concurrent.Future[pekko.Done] = diff --git a/actor/src/main/mima-filters/2.0.x.backwards.excludes/future-converters.excludes b/actor/src/main/mima-filters/2.0.x.backwards.excludes/future-converters.excludes index 7e7d8dc784..d602e4f310 100644 --- a/actor/src/main/mima-filters/2.0.x.backwards.excludes/future-converters.excludes +++ b/actor/src/main/mima-filters/2.0.x.backwards.excludes/future-converters.excludes @@ -20,3 +20,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.util.Future ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.util.FutureConverters#CompletionStageOps.asScala") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.util.FutureConverters#FutureOps.asJava$extension") ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.util.FutureConverters#FutureOps.asJava") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.dispatch.internal.SameThreadExecutionContext") +ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.dispatch.internal.SameThreadExecutionContext$") +ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ExecutionContexts.parasitic") diff --git a/actor/src/main/scala-2.13/org/apache/pekko/dispatch/internal/SameThreadExecutionContext.scala b/actor/src/main/scala-2.13/org/apache/pekko/dispatch/internal/SameThreadExecutionContext.scala deleted file mode 100644 index 983f155df3..0000000000 --- a/actor/src/main/scala-2.13/org/apache/pekko/dispatch/internal/SameThreadExecutionContext.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2009-2022 Lightbend Inc. - */ - -package org.apache.pekko.dispatch.internal - -import scala.concurrent.ExecutionContext - -import org.apache.pekko -import pekko.annotation.InternalApi - -/** - * Factory to create same thread ec. Not intended to be called from any other site than to create [[pekko.dispatch.ExecutionContexts#parasitic]] - * - * INTERNAL API - */ -@InternalApi -private[dispatch] object SameThreadExecutionContext { - @inline def apply(): ExecutionContext = ExecutionContext.parasitic -} diff --git a/actor/src/main/scala-3/org/apache/pekko/dispatch/internal/SameThreadExecutionContext.scala b/actor/src/main/scala-3/org/apache/pekko/dispatch/internal/SameThreadExecutionContext.scala deleted file mode 100644 index 7cf421236d..0000000000 --- a/actor/src/main/scala-3/org/apache/pekko/dispatch/internal/SameThreadExecutionContext.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * license agreements; and to You under the Apache License, version 2.0: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * This file is part of the Apache Pekko project, which was derived from Akka. - */ - -/* - * Copyright (C) 2009-2022 Lightbend Inc. - */ - -package org.apache.pekko.dispatch.internal - -import scala.concurrent.ExecutionContext - -import org.apache.pekko -import pekko.annotation.InternalApi - -/** - * Factory to create same thread ec. Not intended to be called from any other site than to create [[pekko.dispatch.ExecutionContexts#parasitic]] - * - * INTERNAL API - */ -@InternalApi -private[dispatch] object SameThreadExecutionContext { - inline def apply(): ExecutionContext = ExecutionContext.parasitic - -} diff --git a/actor/src/main/scala/org/apache/pekko/actor/ActorSelection.scala b/actor/src/main/scala/org/apache/pekko/actor/ActorSelection.scala index e20602b447..80d5142d0d 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/ActorSelection.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/ActorSelection.scala @@ -27,7 +27,7 @@ import scala.util.Success import scala.annotation.nowarn import org.apache.pekko -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.pattern.ask import pekko.routing.MurmurHash import pekko.util.{ Helpers, JavaDurationConverters, Timeout } @@ -75,7 +75,7 @@ abstract class ActorSelection extends Serializable { * [[ActorRef]]. */ def resolveOne()(implicit timeout: Timeout): Future[ActorRef] = { - implicit val ec = ExecutionContexts.parasitic + implicit val ec = ExecutionContext.parasitic val p = Promise[ActorRef]() this.ask(Identify(None)).onComplete { case Success(ActorIdentity(_, Some(ref))) => p.success(ref) diff --git a/actor/src/main/scala/org/apache/pekko/actor/CoordinatedShutdown.scala b/actor/src/main/scala/org/apache/pekko/actor/CoordinatedShutdown.scala index 088b212568..d1db9000bf 100644 --- a/actor/src/main/scala/org/apache/pekko/actor/CoordinatedShutdown.scala +++ b/actor/src/main/scala/org/apache/pekko/actor/CoordinatedShutdown.scala @@ -32,7 +32,6 @@ import com.typesafe.config.ConfigFactory import org.apache.pekko import pekko.Done import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts import pekko.event.Logging import pekko.pattern.after import pekko.util.OptionConverters._ @@ -267,7 +266,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi system.whenTerminated.map { _ => if (exitJvm && !runningJvmHook) System.exit(exitCode) Done - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) } else if (exitJvm) { System.exit(exitCode) Future.successful(Done) @@ -493,7 +492,7 @@ final class CoordinatedShutdown private[pekko] ( override val size: Int = tasks.size override def run(recoverEnabled: Boolean)(implicit ec: ExecutionContext): Future[Done] = { - Future.sequence(tasks.map(_.run(recoverEnabled))).map(_ => Done)(ExecutionContexts.parasitic) + Future.sequence(tasks.map(_.run(recoverEnabled))).map(_ => Done)(ExecutionContext.parasitic) } // This method may be run multiple times during the compare-and-set loop of ConcurrentHashMap, so it must be side-effect-free diff --git a/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala b/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala index 833b2d5bd6..0871e6c028 100644 --- a/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala +++ b/actor/src/main/scala/org/apache/pekko/dispatch/Future.scala @@ -20,8 +20,6 @@ import java.util.concurrent.CompletionStage import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, ExecutionContextExecutorService, Future, Promise } import org.apache.pekko -import pekko.annotation.InternalStableApi -import pekko.dispatch.internal.SameThreadExecutionContext import pekko.japi.function.Procedure /** @@ -77,21 +75,6 @@ object ExecutionContexts { * @return a reference to the global ExecutionContext */ def global(): ExecutionContextExecutor = ExecutionContext.global - - /** - * INTERNAL API - * - * WARNING: Not A General Purpose ExecutionContext! - * - * This is an execution context which runs everything on the calling thread. - * It is very useful for actions which are known to be non-blocking and - * non-throwing in order to save a round-trip to the thread pool. - * - * Once Scala 2.12 is no longer supported this can be dropped in favour of directly using `ExecutionContext.parasitic` - */ - @InternalStableApi - private[pekko] val parasitic: ExecutionContext = SameThreadExecutionContext() - } /** diff --git a/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala b/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala index 54ddadca92..515461f475 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/AskSupport.scala @@ -26,7 +26,7 @@ import scala.util.control.NoStackTrace import org.apache.pekko import pekko.actor._ import pekko.annotation.{ InternalApi, InternalStableApi } -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.dispatch.sysmsg._ import pekko.util.{ unused, ByteString, Timeout } @@ -717,7 +717,7 @@ private[pekko] object PromiseActorRef { val result = Promise[Any]() val scheduler = provider.guardian.underlying.system.scheduler val a = new PromiseActorRef(provider, result, messageClassName, refPathPrefix) - implicit val ec = ExecutionContexts.parasitic + implicit val ec = ExecutionContext.parasitic val f = scheduler.scheduleOnce(timeout.duration) { val timedOut = result.tryComplete { val wasSentBy = if (sender == ActorRef.noSender) "" else s" was sent by [$sender]" diff --git a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala index f9e73f7b7a..72e10a1d79 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala @@ -28,7 +28,7 @@ import scala.util.control.NonFatal import org.apache.pekko import pekko.PekkoException import pekko.actor.{ ExtendedActorSystem, Scheduler } -import pekko.dispatch.ExecutionContexts.parasitic +import scala.concurrent.ExecutionContext.parasitic import pekko.pattern.internal.{ CircuitBreakerNoopTelemetry, CircuitBreakerTelemetry } import pekko.annotation.InternalApi import pekko.util.FutureConverters._ diff --git a/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala b/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala index 298a177105..52709500f6 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/FutureTimeoutSupport.scala @@ -116,7 +116,7 @@ trait FutureTimeoutSupport { future.onComplete { result => timeout.cancel() p.tryComplete(result) - }(pekko.dispatch.ExecutionContexts.parasitic) + }(scala.concurrent.ExecutionContext.parasitic) p.future } } diff --git a/actor/src/main/scala/org/apache/pekko/pattern/GracefulStopSupport.scala b/actor/src/main/scala/org/apache/pekko/pattern/GracefulStopSupport.scala index aa1e83931f..e7dfcf7f09 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/GracefulStopSupport.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/GracefulStopSupport.scala @@ -18,7 +18,7 @@ import scala.concurrent.duration.FiniteDuration import org.apache.pekko import pekko.actor._ -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.dispatch.sysmsg.{ Unwatch, Watch } import pekko.util.Timeout @@ -65,6 +65,6 @@ trait GracefulStopSupport { ref.result.future.transform({ case Terminated(t) if t.path == target.path => true case _ => { internalTarget.sendSystemMessage(Unwatch(target, ref)); false } - }, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ExecutionContexts.parasitic) + }, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ExecutionContext.parasitic) } } diff --git a/actor/src/main/scala/org/apache/pekko/pattern/StatusReply.scala b/actor/src/main/scala/org/apache/pekko/pattern/StatusReply.scala index 93c739d12b..6a2aa767a2 100644 --- a/actor/src/main/scala/org/apache/pekko/pattern/StatusReply.scala +++ b/actor/src/main/scala/org/apache/pekko/pattern/StatusReply.scala @@ -22,7 +22,7 @@ import org.apache.pekko import pekko.Done import pekko.actor.InvalidMessageException import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext /** * Generic top-level message type for replies that signal failure or success. Convenient to use together with the @@ -180,5 +180,5 @@ object StatusReply { ScalaFailure(new IllegalArgumentException(s"Unexpected status reply success value: $unexpected")) } case fail @ ScalaFailure(_) => fail.asInstanceOf[Try[T]] - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) } diff --git a/actor/src/main/scala/org/apache/pekko/routing/ScatterGatherFirstCompleted.scala b/actor/src/main/scala/org/apache/pekko/routing/ScatterGatherFirstCompleted.scala index 5a24ae78a0..5acec9c72b 100644 --- a/actor/src/main/scala/org/apache/pekko/routing/ScatterGatherFirstCompleted.scala +++ b/actor/src/main/scala/org/apache/pekko/routing/ScatterGatherFirstCompleted.scala @@ -27,7 +27,7 @@ import pekko.actor.ActorRef import pekko.actor.ActorSystem import pekko.actor.SupervisorStrategy import pekko.dispatch.Dispatchers -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.japi.Util.immutableSeq import pekko.pattern.ask import pekko.pattern.pipe @@ -57,7 +57,7 @@ private[pekko] final case class ScatterGatherFirstCompletedRoutees( extends Routee { override def send(message: Any, sender: ActorRef): Unit = { - implicit val ec = ExecutionContexts.parasitic + implicit val ec = ExecutionContext.parasitic if (routees.isEmpty) { val reply = Future.failed(new TimeoutException("Timeout due to no routees")) reply.pipeTo(sender) diff --git a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala index b94a7e2cfb..5efad4bb56 100644 --- a/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala +++ b/cluster-sharding/src/main/scala/org/apache/pekko/cluster/sharding/ShardCoordinator.scala @@ -39,7 +39,7 @@ import pekko.cluster.sharding.internal.{ RememberEntitiesCoordinatorStore, RememberEntitiesProvider } -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.event.{ BusLogging, Logging } import pekko.pattern.{ pipe, AskTimeoutException } import pekko.persistence._ @@ -219,7 +219,7 @@ object ShardCoordinator { currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]], rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = { import pekko.util.ccompat.JavaConverters._ - implicit val ec = ExecutionContexts.parasitic + implicit val ec = ExecutionContext.parasitic rebalance(currentShardAllocations.asJava, rebalanceInProgress.asJava).map(_.asScala.toSet) } diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncRecovery.scala b/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncRecovery.scala index 5621134175..8fb9fcbefd 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncRecovery.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncRecovery.scala @@ -19,7 +19,7 @@ import scala.concurrent.Future import org.apache.pekko import pekko.actor.Actor -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.persistence.PersistentRepr import pekko.persistence.journal.{ AsyncRecovery => SAsyncReplay } import pekko.util.ConstantFun.scalaAnyToUnit @@ -35,10 +35,10 @@ abstract class AsyncRecovery extends SAsyncReplay with AsyncRecoveryPlugin { thi doAsyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max, new Consumer[PersistentRepr] { def accept(p: PersistentRepr) = replayCallback(p) - }).asScala.map(scalaAnyToUnit)(ExecutionContexts.parasitic) + }).asScala.map(scalaAnyToUnit)(ExecutionContext.parasitic) final def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] = doAsyncReadHighestSequenceNr(persistenceId, fromSequenceNr: Long) .asScala - .map(_.longValue)(ExecutionContexts.parasitic) + .map(_.longValue)(ExecutionContext.parasitic) } diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala b/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala index 7d2b0e2b2b..4da28a61c5 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/journal/japi/AsyncWriteJournal.scala @@ -19,7 +19,7 @@ import scala.util.Failure import scala.util.Try import org.apache.pekko -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.persistence._ import pekko.persistence.journal.{ AsyncWriteJournal => SAsyncWriteJournal } import pekko.util.ConstantFun.scalaAnyToUnit @@ -40,9 +40,9 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w else successUnit } .to(immutable.IndexedSeq) - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = { - doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).asScala.map(scalaAnyToUnit)(ExecutionContexts.parasitic) + doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).asScala.map(scalaAnyToUnit)(ExecutionContext.parasitic) } } diff --git a/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala b/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala index cfab91bca9..7ff382d8c3 100644 --- a/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala +++ b/persistence/src/main/scala/org/apache/pekko/persistence/snapshot/japi/SnapshotStore.scala @@ -16,7 +16,7 @@ package org.apache.pekko.persistence.snapshot.japi import scala.concurrent.Future import org.apache.pekko -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.persistence._ import pekko.persistence.snapshot.{ SnapshotStore => SSnapshotStore } import pekko.util.ConstantFun.scalaAnyToUnit @@ -31,17 +31,17 @@ abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin { persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = { import pekko.util.OptionConverters._ - doLoadAsync(persistenceId, criteria).asScala.map(_.toScala)(ExecutionContexts.parasitic) + doLoadAsync(persistenceId, criteria).asScala.map(_.toScala)(ExecutionContext.parasitic) } override final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] = - doSaveAsync(metadata, snapshot).asScala.map(scalaAnyToUnit)(ExecutionContexts.parasitic) + doSaveAsync(metadata, snapshot).asScala.map(scalaAnyToUnit)(ExecutionContext.parasitic) override final def deleteAsync(metadata: SnapshotMetadata): Future[Unit] = - doDeleteAsync(metadata).asScala.map(scalaAnyToUnit)(ExecutionContexts.parasitic) + doDeleteAsync(metadata).asScala.map(scalaAnyToUnit)(ExecutionContext.parasitic) override final def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] = doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).asScala.map(scalaAnyToUnit)( - ExecutionContexts.parasitic) + ExecutionContext.parasitic) } diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/Handshake.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/Handshake.scala index bb372849aa..913027ba89 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/Handshake.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/Handshake.scala @@ -21,7 +21,7 @@ import org.apache.pekko import pekko.Done import pekko.actor.ActorSystem import pekko.actor.Address -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.remote.UniqueAddress import pekko.stream.Attributes import pekko.stream.FlowShape @@ -308,7 +308,7 @@ private[remote] class InboundHandshake(inboundContext: InboundContext, inControl // periodically. thenInside() case None => - first.onComplete(_ => runInStage.invoke(thenInside))(ExecutionContexts.parasitic) + first.onComplete(_ => runInStage.invoke(thenInside))(ExecutionContext.parasitic) } } diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/SystemMessageDelivery.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/SystemMessageDelivery.scala index 2bcbc21afa..bdf339beb1 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/SystemMessageDelivery.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/SystemMessageDelivery.scala @@ -26,7 +26,7 @@ import org.apache.pekko import pekko.Done import pekko.actor.ActorRef import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.dispatch.sysmsg.SystemMessage import pekko.event.Logging import pekko.remote.UniqueAddress @@ -119,7 +119,7 @@ import pekko.util.PrettyDuration.PrettyPrintableDuration if (isAvailable(out)) pull(in) // onPull from downstream already called } - outboundContext.controlSubject.attach(this).foreach(callback.invoke)(ExecutionContexts.parasitic) + outboundContext.controlSubject.attach(this).foreach(callback.invoke)(ExecutionContext.parasitic) } override def postStop(): Unit = { diff --git a/remote/src/main/scala/org/apache/pekko/remote/artery/tcp/ArteryTcpTransport.scala b/remote/src/main/scala/org/apache/pekko/remote/artery/tcp/ArteryTcpTransport.scala index 965d399ccf..5f1879bcd2 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/artery/tcp/ArteryTcpTransport.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/artery/tcp/ArteryTcpTransport.scala @@ -33,7 +33,6 @@ import pekko.Done import pekko.NotUsed import pekko.actor.ActorSystem import pekko.actor.ExtendedActorSystem -import pekko.dispatch.ExecutionContexts import pekko.event.Logging import pekko.remote.RemoteActorRefProvider import pekko.remote.RemoteLogMarker @@ -279,7 +278,7 @@ private[remote] class ArteryTcpTransport( s"Failed to bind TCP to [$bindHost:$bindPort] due to: " + e.getMessage, e)) - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) // only on initial startup, when ActorSystem is starting val b = Await.result(binding, settings.Bind.BindTimeout) diff --git a/remote/src/main/scala/org/apache/pekko/remote/transport/ThrottlerTransportAdapter.scala b/remote/src/main/scala/org/apache/pekko/remote/transport/ThrottlerTransportAdapter.scala index c6a044d1fb..cf5a535c9c 100644 --- a/remote/src/main/scala/org/apache/pekko/remote/transport/ThrottlerTransportAdapter.scala +++ b/remote/src/main/scala/org/apache/pekko/remote/transport/ThrottlerTransportAdapter.scala @@ -29,7 +29,7 @@ import scala.annotation.nowarn import org.apache.pekko import pekko.actor._ import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.dispatch.sysmsg.{ Unwatch, Watch } import pekko.event.LoggingAdapter import pekko.pattern.{ ask, pipe, PromiseActorRef } @@ -393,7 +393,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) SetThrottleAck case _ => throw new IllegalArgumentException() // won't happen, compiler exhaustiveness check pleaser - }, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ExecutionContexts.parasitic) + }, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ExecutionContext.parasitic) } } diff --git a/remote/src/test/scala/org/apache/pekko/remote/artery/TestContext.scala b/remote/src/test/scala/org/apache/pekko/remote/artery/TestContext.scala index bbbb2b8c17..b83fec37e3 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/artery/TestContext.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/artery/TestContext.scala @@ -25,7 +25,7 @@ import org.apache.pekko import pekko.Done import pekko.actor.ActorRef import pekko.actor.Address -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.remote.UniqueAddress import pekko.remote.artery.InboundControlJunction.ControlMessageObserver import pekko.remote.artery.InboundControlJunction.ControlMessageSubject @@ -65,7 +65,7 @@ private[remote] class TestInboundContext( val done = a.completeHandshake(peer) done.foreach { _ => associationsByUid.put(peer.uid, a) - }(ExecutionContexts.parasitic) + }(ExecutionContext.parasitic) done } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/SubInletOutletSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/SubInletOutletSpec.scala index 6b2e440341..619bef28fb 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/impl/SubInletOutletSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/impl/SubInletOutletSpec.scala @@ -19,7 +19,7 @@ import scala.util.Success import org.apache.pekko import pekko.Done import pekko.NotUsed -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.stream.Attributes import pekko.stream.FlowShape import pekko.stream.Inlet @@ -62,7 +62,7 @@ class SubInletOutletSpec extends StreamSpec { override def preStart(): Unit = { sideChannel .watchTermination() { (_, done) => - done.onComplete(c => subCompletion = c)(ExecutionContexts.parasitic) + done.onComplete(c => subCompletion = c)(ExecutionContext.parasitic) NotUsed } .runWith(Sink.fromGraph(subIn.sink)) @@ -169,7 +169,7 @@ class SubInletOutletSpec extends StreamSpec { Source .fromGraph(subOut.source) .runWith(Sink.ignore) - .onComplete(t => subCompletion = t)(ExecutionContexts.parasitic) + .onComplete(t => subCompletion = t)(ExecutionContext.parasitic) subOut.setHandler(new OutHandler { override def onPull(): Unit = pull(in) }) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala index 30995d31d2..996f702d74 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/FlowFlatMapConcatParallelismSpec.scala @@ -96,7 +96,7 @@ class FlowFlatMapConcatParallelismSpec extends StreamSpec(""" Source(sources) .flatMapConcat(i, identity(_)) // scala 2.12 can't infer the type of identity .runWith(Sink.seq) - .map(_.sum)(pekko.dispatch.ExecutionContexts.parasitic) + .map(_.sum)(scala.concurrent.ExecutionContext.parasitic) .futureValue shouldBe sum } } @@ -114,7 +114,7 @@ class FlowFlatMapConcatParallelismSpec extends StreamSpec(""" (current, current) }, _ => None) .runWith(Sink.seq) - .map(_.sum)(pekko.dispatch.ExecutionContexts.parasitic) + .map(_.sum)(scala.concurrent.ExecutionContext.parasitic) .futureValue shouldBe sum } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala index e81c44168a..38c62edd73 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/scaladsl/SinkSpec.scala @@ -142,7 +142,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { "combine many sinks to one" in { val source = Source(List(0, 1, 2, 3, 4, 5)) - implicit val ex = org.apache.pekko.dispatch.ExecutionContexts.parasitic + implicit val ex = scala.concurrent.ExecutionContext.parasitic val sink = Sink .combine( List( @@ -155,7 +155,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures { } "combine two sinks with combineMat" in { - implicit val ex = org.apache.pekko.dispatch.ExecutionContexts.parasitic + implicit val ex = scala.concurrent.ExecutionContext.parasitic Source(List(0, 1, 2, 3, 4, 5)) .toMat(Sink.combineMat(Sink.reduce[Int]((a, b) => a + b), Sink.reduce[Int]((a, b) => a + b))(Broadcast[Int](_))( (f1, f2) => { diff --git a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorFlow.scala b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorFlow.scala index 1d4ddb9906..833d5b085b 100644 --- a/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorFlow.scala +++ b/stream-typed/src/main/scala/org/apache/pekko/stream/typed/scaladsl/ActorFlow.scala @@ -18,7 +18,7 @@ import scala.concurrent.Future import org.apache.pekko import pekko.NotUsed import pekko.actor.typed.ActorRef -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.pattern.{ AskTimeoutException, StatusReply } import pekko.stream._ import pekko.stream.scaladsl._ @@ -182,7 +182,7 @@ object ActorFlow { implicit timeout: Timeout): Flow[(I, Ctx), (A, Ctx), NotUsed] = askImpl[(I, Ctx), Q, A, (A, Ctx)](parallelism)(ref)( (in, r) => makeMessage(in._1, r), - (in, o: Future[A]) => o.map(a => a -> in._2)(ExecutionContexts.parasitic)) + (in, o: Future[A]) => o.map(a => a -> in._2)(ExecutionContext.parasitic)) /** * Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a [[pekko.pattern.StatusReply#success]] response @@ -202,7 +202,7 @@ object ActorFlow { makeMessage: (I, ActorRef[StatusReply[A]]) => Q)(implicit timeout: Timeout): Flow[(I, Ctx), (A, Ctx), NotUsed] = { askImpl[(I, Ctx), Q, StatusReply[A], (StatusReply[A], Ctx)](parallelism)(ref)( (in, r) => makeMessage(in._1, r), - (in, o: Future[StatusReply[A]]) => o.map(a => a -> in._2)(ExecutionContexts.parasitic)).map { + (in, o: Future[StatusReply[A]]) => o.map(a => a -> in._2)(ExecutionContext.parasitic)).map { case (StatusReply.Success(a), ctx) => a.asInstanceOf[A] -> ctx case (StatusReply.Error(err), _) => throw err case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser diff --git a/stream/src/main/scala/org/apache/pekko/stream/KillSwitch.scala b/stream/src/main/scala/org/apache/pekko/stream/KillSwitch.scala index 2d7f8a19c3..cd120bf0e1 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/KillSwitch.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/KillSwitch.scala @@ -70,7 +70,7 @@ object KillSwitches { case _ => // callback.invoke is a simple actor send, so it is fine to run on the invoking thread terminationSignal.onComplete(getAsyncCallback[Try[Done]](onSwitch).invoke)( - pekko.dispatch.ExecutionContexts.parasitic) + scala.concurrent.ExecutionContext.parasitic) } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala index 84b07a1c03..c8852c83b8 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala @@ -24,7 +24,7 @@ import scala.util.control.{ NoStackTrace, NonFatal } import org.apache.pekko import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.stream.ActorAttributes.SupervisionStrategy import pekko.stream.stage._ import pekko.util.OptionVal @@ -157,7 +157,7 @@ private[stream] final class MapAsyncPartitioned[In, Out, Partition]( partitionsInProgress += partition future.value match { - case None => future.onComplete(holder)(ExecutionContexts.parasitic) + case None => future.onComplete(holder)(ExecutionContext.parasitic) case Some(v) => // #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and // run the logic directly on this thread diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/MaybeSource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/MaybeSource.scala index 659b9a580d..56d1d0ebe8 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/MaybeSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/MaybeSource.scala @@ -18,7 +18,7 @@ import scala.util.Try import org.apache.pekko import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.stream._ import pekko.stream.impl.Stages.DefaultAttributes import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, OutHandler } @@ -49,7 +49,7 @@ import pekko.util.OptionVal handleCompletion(value) case None => // callback on future completion - promise.future.onComplete(getAsyncCallback(handleCompletion).invoke)(ExecutionContexts.parasitic) + promise.future.onComplete(getAsyncCallback(handleCompletion).invoke)(ExecutionContext.parasitic) } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala index 4b2e5c4bfc..d2b7e37dbd 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/QueueSource.scala @@ -220,7 +220,7 @@ import pekko.stream.stage._ .onComplete { case scala.util.Success(_) => case scala.util.Failure(e) => p.tryFailure(e) - }(pekko.dispatch.ExecutionContexts.parasitic) + }(scala.concurrent.ExecutionContext.parasitic) p.future } override def complete(): Unit = callback.invoke(Completion) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala index 380b27e2db..2aeb19e862 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Sinks.scala @@ -28,7 +28,7 @@ import org.apache.pekko import pekko.NotUsed import pekko.annotation.DoNotInherit import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.event.Logging import pekko.stream._ import pekko.stream.ActorAttributes.StreamSubscriptionTimeout @@ -385,7 +385,7 @@ import org.reactivestreams.Subscriber .foreach { case NonFatal(e) => p.tryFailure(e) case _ => () - }(pekko.dispatch.ExecutionContexts.parasitic) + }(scala.concurrent.ExecutionContext.parasitic) p.future } override def cancel(): Unit = { @@ -561,7 +561,7 @@ import org.reactivestreams.Subscriber failStage(e) } try { - sinkFactory(element).onComplete(cb.invoke)(ExecutionContexts.parasitic) + sinkFactory(element).onComplete(cb.invoke)(ExecutionContext.parasitic) } catch { case NonFatal(e) => promise.failure(e) diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala index 20003b3f7d..44cf73d6d7 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/Unfold.scala @@ -111,7 +111,7 @@ private[pekko] final class UnfoldJava[S, E](s: S, f: function.Function[S, Option future.value match { case Some(value) => handle(value) case None => - future.onComplete(asyncHandler)(pekko.dispatch.ExecutionContexts.parasitic) + future.onComplete(asyncHandler)(scala.concurrent.ExecutionContext.parasitic) } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala index f7a6719b48..0905adb913 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/UnfoldResourceSourceAsync.scala @@ -20,7 +20,7 @@ import scala.util.control.NonFatal import org.apache.pekko import pekko.Done import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts.parasitic +import scala.concurrent.ExecutionContext.parasitic import pekko.stream._ import pekko.stream.ActorAttributes.SupervisionStrategy import pekko.stream.Attributes.SourceLocation diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala index dfaae6d434..9c8aa5b825 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FlattenConcat.scala @@ -232,7 +232,7 @@ private[pekko] final class FlattenConcat[T, M](parallelism: Int) private def addPendingFutureElem(future: Future[T]): Unit = { val inflightSource = new InflightPendingFutureSource[T](invokeCb) - future.onComplete(inflightSource)(pekko.dispatch.ExecutionContexts.parasitic) + future.onComplete(inflightSource)(scala.concurrent.ExecutionContext.parasitic) queue.enqueue(inflightSource) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FutureFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FutureFlow.scala index 6535747d97..2333c25c93 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FutureFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/FutureFlow.scala @@ -19,7 +19,7 @@ import scala.util.control.NonFatal import org.apache.pekko import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.stream.{ AbruptStageTerminationException, Attributes, @@ -62,7 +62,7 @@ import pekko.util.OptionVal Initializing.onFuture(tryFlow) case None => val cb = getAsyncCallback(Initializing.onFuture) - futureFlow.onComplete(cb.invoke)(ExecutionContexts.parasitic) + futureFlow.onComplete(cb.invoke)(ExecutionContext.parasitic) // in case both ports are closed before future completion setKeepGoing(true) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala index f3319b0a7e..c54e81f0a9 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/GraphStages.scala @@ -24,7 +24,7 @@ import org.apache.pekko import pekko.Done import pekko.actor.Cancellable import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.event.Logging import pekko.stream.{ Shape, _ } import pekko.stream.FlowMonitorState._ @@ -316,7 +316,7 @@ import pekko.stream.stage._ onFutureSourceCompleted(it) case _ => val cb = getAsyncCallback[Try[Graph[SourceShape[T], M]]](onFutureSourceCompleted).invoke _ - futureSource.onComplete(cb)(ExecutionContexts.parasitic) // could be optimised FastFuture-like + futureSource.onComplete(cb)(ExecutionContext.parasitic) // could be optimised FastFuture-like } // initial handler (until future completes) @@ -397,7 +397,7 @@ import pekko.stream.stage._ handle(completed) case None => val cb = getAsyncCallback[Try[T]](handle).invoke _ - future.onComplete(cb)(ExecutionContexts.parasitic) + future.onComplete(cb)(ExecutionContext.parasitic) } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazyFutureSource.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazyFutureSource.scala index 23f48ec892..cd543d4ce7 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazyFutureSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/LazyFutureSource.scala @@ -21,7 +21,7 @@ import scala.concurrent.Future import scala.util.Try import org.apache.pekko -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.stream.Attributes import pekko.stream.Attributes.SourceLocation import pekko.stream.Outlet @@ -50,7 +50,7 @@ private[pekko] final class LazyFutureSource[T](f: () => Future[T]) extends Graph case Some(result) => handle(result) case None => val cb = getAsyncCallback[Try[T]](handle).invoke _ - future.onComplete(cb)(ExecutionContexts.parasitic) + future.onComplete(cb)(ExecutionContext.parasitic) } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala index 63f83777f9..ea8c833f25 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/fusing/Ops.scala @@ -468,7 +468,7 @@ private[stream] object Collect { @InternalApi private[pekko] final case class ScanAsync[In, Out](zero: Out, f: (Out, In) => Future[Out]) extends GraphStage[FlowShape[In, Out]] { - import pekko.dispatch.ExecutionContexts + import scala.concurrent.ExecutionContext val in = Inlet[In]("ScanAsync.in") val out = Outlet[Out]("ScanAsync.out") @@ -560,7 +560,7 @@ private[stream] object Collect { eventualCurrent.value match { case Some(result) => futureCB(result) - case _ => eventualCurrent.onComplete(futureCB)(ExecutionContexts.parasitic) + case _ => eventualCurrent.onComplete(futureCB)(ExecutionContext.parasitic) } } catch { case NonFatal(ex) => @@ -660,7 +660,7 @@ private[stream] object Collect { @InternalApi private[pekko] final class FoldAsync[In, Out](zero: Out, f: (Out, In) => Future[Out]) extends GraphStage[FlowShape[In, Out]] { - import pekko.dispatch.ExecutionContexts + import scala.concurrent.ExecutionContext val in = Inlet[In]("FoldAsync.in") val out = Outlet[Out]("FoldAsync.out") @@ -740,7 +740,7 @@ private[stream] object Collect { private def handleAggregatingValue(): Unit = { aggregating.value match { case Some(result) => futureCB(result) // already completed - case _ => aggregating.onComplete(futureCB)(ExecutionContexts.parasitic) + case _ => aggregating.onComplete(futureCB)(ExecutionContext.parasitic) } } @@ -1321,7 +1321,7 @@ private[stream] object Collect { buffer.enqueue(holder) future.value match { - case None => future.onComplete(holder)(pekko.dispatch.ExecutionContexts.parasitic) + case None => future.onComplete(holder)(scala.concurrent.ExecutionContext.parasitic) case Some(v) => // #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and // run the logic directly on this thread @@ -1438,7 +1438,7 @@ private[stream] object Collect { val future = f(grab(in)) inFlight += 1 future.value match { - case None => future.onComplete(invokeFutureCB)(pekko.dispatch.ExecutionContexts.parasitic) + case None => future.onComplete(invokeFutureCB)(scala.concurrent.ExecutionContext.parasitic) case Some(v) => futureCompleted(v) } } catch { diff --git a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala index c130e19d78..e7b87b1b8c 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/impl/io/TcpStages.scala @@ -26,7 +26,7 @@ import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.actor.{ ActorRef, Terminated } import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.io.Inet.SocketOption import pekko.io.Tcp import pekko.io.Tcp._ @@ -99,7 +99,7 @@ import pekko.util.ByteString thisStage.tell(Unbind, thisStage) } unbindPromise.future - }, unbindPromise.future.map(_ => Done)(ExecutionContexts.parasitic))) + }, unbindPromise.future.map(_ => Done)(ExecutionContext.parasitic))) case f: CommandFailed => val ex = new BindFailedException { // cannot modify the actual exception class for compatibility reasons @@ -591,7 +591,7 @@ private[stream] object ConnectionSourceStage { remoteAddress, eagerMaterializer) - (logic, localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))(ExecutionContexts.parasitic)) + (logic, localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))(ExecutionContext.parasitic)) } override def toString = s"TCP-to($remoteAddress)" diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 197223a46b..43b5461459 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -27,7 +27,7 @@ import pekko.Done import pekko.NotUsed import pekko.actor.ActorRef import pekko.actor.ClassicActorSystemProvider -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.Pair import pekko.japi.function @@ -275,7 +275,7 @@ object Flow { def completionStageFlow[I, O, M](flow: CompletionStage[Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] = { import pekko.util.FutureConverters._ val sflow = - scaladsl.Flow.futureFlow(flow.asScala.map(_.asScala)(ExecutionContexts.parasitic)).mapMaterializedValue(_.asJava) + scaladsl.Flow.futureFlow(flow.asScala.map(_.asScala)(ExecutionContext.parasitic)).mapMaterializedValue(_.asJava) new javadsl.Flow(sflow) } @@ -328,7 +328,7 @@ object Flow { def lazyCompletionStageFlow[I, O, M]( create: Creator[CompletionStage[Flow[I, O, M]]]): Flow[I, O, CompletionStage[M]] = scaladsl.Flow - .lazyFutureFlow[I, O, M](() => create.create().asScala.map(_.asScala)(ExecutionContexts.parasitic)) + .lazyFutureFlow[I, O, M](() => create.create().asScala.map(_.asScala)(ExecutionContext.parasitic)) .mapMaterializedValue(_.asJava) .asJava diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Queue.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Queue.scala index 87a8dccbb7..d8c433cf7b 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Queue.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Queue.scala @@ -20,7 +20,7 @@ import scala.concurrent.Future import org.apache.pekko import pekko.Done -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.stream.QueueOfferResult import pekko.util.FutureConverters._ import pekko.util.OptionConverters._ @@ -141,7 +141,7 @@ object SinkQueueWithCancel { new pekko.stream.scaladsl.SinkQueueWithCancel[T] { override def pull(): Future[Option[T]] = - queue.pull().asScala.map(_.toScala)(ExecutionContexts.parasitic) + queue.pull().asScala.map(_.toScala)(ExecutionContext.parasitic) override def cancel(): Unit = queue.cancel() } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala index 76db0ef98c..bc5e1306ac 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Sink.scala @@ -24,7 +24,7 @@ import scala.util.Try import org.apache.pekko import pekko._ import pekko.actor.{ ActorRef, ClassicActorSystemProvider, Status } -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.japi.function import pekko.japi.function.Creator import pekko.stream._ @@ -96,7 +96,7 @@ object Sink { def forall[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = { import pekko.util.FutureConverters._ new Sink(scaladsl.Sink.forall[In](p.test) - .mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava)) + .mapMaterializedValue(_.map(Boolean.box)(ExecutionContext.parasitic).asJava)) } /** @@ -121,7 +121,7 @@ object Sink { def none[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = { import pekko.util.FutureConverters._ new Sink(scaladsl.Sink.none[In](p.test) - .mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava)) + .mapMaterializedValue(_.map(Boolean.box)(ExecutionContext.parasitic).asJava)) } /** @@ -146,7 +146,7 @@ object Sink { def exists[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = { import pekko.util.FutureConverters._ new Sink(scaladsl.Sink.exists[In](p.test) - .mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava)) + .mapMaterializedValue(_.map(Boolean.box)(ExecutionContext.parasitic).asJava)) } /** @@ -231,7 +231,7 @@ object Sink { f: function.Function[T, CompletionStage[Void]]): Sink[T, CompletionStage[Done]] = new Sink( scaladsl.Sink - .foreachAsync(parallelism)((x: T) => f(x).asScala.map(scalaAnyToUnit)(ExecutionContexts.parasitic)) + .foreachAsync(parallelism)((x: T) => f(x).asScala.map(scalaAnyToUnit)(ExecutionContext.parasitic)) .toCompletionStage()) /** @@ -260,7 +260,7 @@ object Sink { * See also [[head]]. */ def headOption[In](): Sink[In, CompletionStage[Optional[In]]] = - new Sink(scaladsl.Sink.headOption[In].mapMaterializedValue(_.map(_.toJava)(ExecutionContexts.parasitic).asJava)) + new Sink(scaladsl.Sink.headOption[In].mapMaterializedValue(_.map(_.toJava)(ExecutionContext.parasitic).asJava)) /** * A `Sink` that materializes into a `CompletionStage` of the last value received. @@ -280,7 +280,7 @@ object Sink { * See also [[head]], [[takeLast]]. */ def lastOption[In](): Sink[In, CompletionStage[Optional[In]]] = - new Sink(scaladsl.Sink.lastOption[In].mapMaterializedValue(_.map(_.toJava)(ExecutionContexts.parasitic).asJava)) + new Sink(scaladsl.Sink.lastOption[In].mapMaterializedValue(_.map(_.toJava)(ExecutionContext.parasitic).asJava)) /** * A `Sink` that materializes into a `CompletionStage` of `List` containing the last `n` collected elements. @@ -294,7 +294,7 @@ object Sink { new Sink( scaladsl.Sink .takeLast[In](n) - .mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContexts.parasitic).asJava)) + .mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContext.parasitic).asJava)) } /** @@ -310,7 +310,7 @@ object Sink { def seq[In]: Sink[In, CompletionStage[java.util.List[In]]] = { import pekko.util.ccompat.JavaConverters._ new Sink( - scaladsl.Sink.seq[In].mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContexts.parasitic).asJava)) + scaladsl.Sink.seq[In].mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContext.parasitic).asJava)) } /** @@ -512,7 +512,7 @@ object Sink { */ def lazyCompletionStageSink[T, M](create: Creator[CompletionStage[Sink[T, M]]]): Sink[T, CompletionStage[M]] = new Sink(scaladsl.Sink.lazyFutureSink { () => - create.create().asScala.map(_.asScala)(ExecutionContexts.parasitic) + create.create().asScala.map(_.asScala)(ExecutionContext.parasitic) }).mapMaterializedValue(_.asJava) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index 1f14162908..db56847b21 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -26,7 +26,7 @@ import scala.reflect.ClassTag import org.apache.pekko import pekko.{ Done, NotUsed } import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.japi.{ function, JavaPartialFunction, Pair } import pekko.japi.function.Creator @@ -71,7 +71,7 @@ object Source { new Source(scaladsl.Source.maybe[T].mapMaterializedValue { (scalaOptionPromise: Promise[Option[T]]) => val javaOptionPromise = new CompletableFuture[Optional[T]]() scalaOptionPromise.completeWith( - javaOptionPromise.asScala.map(_.toScala)(pekko.dispatch.ExecutionContexts.parasitic)) + javaOptionPromise.asScala.map(_.toScala)(scala.concurrent.ExecutionContext.parasitic)) javaOptionPromise }) @@ -313,7 +313,7 @@ object Source { */ def completionStageSource[T, M](completionStageSource: CompletionStage[Source[T, M]]): Source[T, CompletionStage[M]] = scaladsl.Source - .futureSource(completionStageSource.asScala.map(_.asScala)(ExecutionContexts.parasitic)) + .futureSource(completionStageSource.asScala.map(_.asScala)(ExecutionContext.parasitic)) .mapMaterializedValue(_.asJava) .asJava @@ -773,7 +773,7 @@ object Source { new Source( scaladsl.Source.unfoldResourceAsync[T, R]( () => create.create().asScala, - (resource: R) => read.apply(resource).asScala.map(_.toScala)(pekko.dispatch.ExecutionContexts.parasitic), + (resource: R) => read.apply(resource).asScala.map(_.toScala)(scala.concurrent.ExecutionContext.parasitic), (resource: R) => close.apply(resource).asScala)) /** diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala index 7255a02e6a..53b618bcae 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Tcp.scala @@ -143,7 +143,7 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider { class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension { import Tcp._ - import org.apache.pekko.dispatch.ExecutionContexts.parasitic + import scala.concurrent.ExecutionContext.parasitic private lazy val delegate: scaladsl.Tcp = scaladsl.Tcp(system) diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 708efdbb5a..4e2030b254 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -754,7 +754,8 @@ object Flow { case Seq(a) => val f: Flow[I, O, Future[M]] = futureFlow(create() - .map(Flow[I].prepend(Source.single(a)).viaMat(_)(Keep.right))(pekko.dispatch.ExecutionContexts.parasitic)) + .map(Flow[I].prepend(Source.single(a)).viaMat(_)(Keep.right))( + scala.concurrent.ExecutionContext.parasitic)) f case Nil => val f: Flow[I, O, Future[M]] = Flow[I] diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala index 54e2dbfaf2..f568602337 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContextOps.scala @@ -21,7 +21,7 @@ import scala.concurrent.duration.FiniteDuration import org.apache.pekko import pekko.NotUsed import pekko.annotation.ApiMayChange -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } import pekko.stream._ import pekko.stream.impl.Throttle @@ -141,7 +141,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { */ def mapAsync[Out2](parallelism: Int)(f: Out => Future[Out2]): Repr[Out2, Ctx] = via(flow.mapAsync(parallelism) { - case (e, ctx) => f(e).map(o => (o, ctx))(ExecutionContexts.parasitic) + case (e, ctx) => f(e).map(o => (o, ctx))(ExecutionContext.parasitic) }) /** @@ -155,7 +155,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { f: (Out, P) => Future[Out2]): Repr[Out2, Ctx] = { via(flow[Out, Ctx].mapAsyncPartitioned(parallelism)(pair => partitioner(pair._1)) { (pair, partition) => - f(pair._1, partition).map((_, pair._2))(ExecutionContexts.parasitic) + f(pair._1, partition).map((_, pair._2))(ExecutionContext.parasitic) }) } @@ -170,7 +170,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { f: (Out, P) => Future[Out2]): Repr[Out2, Ctx] = { via(flow[Out, Ctx].mapAsyncPartitionedUnordered(parallelism)(pair => partitioner(pair._1)) { (pair, partition) => - f(pair._1, partition).map((_, pair._2))(ExecutionContexts.parasitic) + f(pair._1, partition).map((_, pair._2))(ExecutionContext.parasitic) }) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Queue.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Queue.scala index 07caf01432..ebdeda7db4 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Queue.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Queue.scala @@ -21,7 +21,7 @@ import scala.concurrent.Future import org.apache.pekko import pekko.Done import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.stream.QueueOfferResult import pekko.util.FutureConverters._ import pekko.util.OptionConverters._ @@ -157,7 +157,7 @@ object SinkQueueWithCancel { queue: SinkQueueWithCancel[T]): pekko.stream.javadsl.SinkQueueWithCancel[T] = new pekko.stream.javadsl.SinkQueueWithCancel[T] { override def pull(): CompletionStage[Optional[T]] = - queue.pull().map(_.toJava)(ExecutionContexts.parasitic).asJava + queue.pull().map(_.toJava)(ExecutionContext.parasitic).asJava override def cancel(): Unit = queue.cancel() } } diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala index 5112a69db2..d92f87a547 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Sink.scala @@ -23,7 +23,7 @@ import org.apache.pekko import pekko.{ util, Done, NotUsed } import pekko.actor.ActorRef import pekko.annotation.InternalApi -import pekko.dispatch.ExecutionContexts +import scala.concurrent.ExecutionContext import pekko.stream._ import pekko.stream.impl._ import pekko.stream.impl.Stages.DefaultAttributes @@ -201,7 +201,7 @@ object Sink { .fromGraph(new HeadOptionStage[T]) .withAttributes(DefaultAttributes.headSink) .mapMaterializedValue(e => - e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContexts.parasitic)) + e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContext.parasitic)) /** * A `Sink` that materializes into a `Future` of the optional first value received. @@ -223,7 +223,7 @@ object Sink { def last[T]: Sink[T, Future[T]] = { Sink.fromGraph(new TakeLastStage[T](1)).withAttributes(DefaultAttributes.lastSink).mapMaterializedValue { e => e.map(_.headOption.getOrElse(throw new NoSuchElementException("last of empty stream")))( - ExecutionContexts.parasitic) + ExecutionContext.parasitic) } } @@ -236,7 +236,7 @@ object Sink { */ def lastOption[T]: Sink[T, Future[Option[T]]] = { Sink.fromGraph(new TakeLastStage[T](1)).withAttributes(DefaultAttributes.lastOptionSink).mapMaterializedValue { e => - e.map(_.headOption)(ExecutionContexts.parasitic) + e.map(_.headOption)(ExecutionContext.parasitic) } }