chore: Remove pekko.dispatch.ExecutionContexts.parasitic (#2208)

* chore: Remove pekko.dispatch.ExecutionContexts.parasitic

* .
This commit is contained in:
He-Pin(kerr) 2025-09-19 22:17:20 +08:00 committed by GitHub
parent 5acdace08c
commit 8c48393cd3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
54 changed files with 124 additions and 200 deletions

View file

@ -18,7 +18,6 @@ import pekko.ConfigurationException
import pekko.Done
import pekko.actor.CoordinatedShutdown.Phase
import pekko.actor.CoordinatedShutdown.UnknownReason
import pekko.dispatch.ExecutionContexts
import pekko.testkit.PekkoSpec
import pekko.testkit.EventFilter
import pekko.testkit.TestKit
@ -323,10 +322,10 @@ class CoordinatedShutdownSpec
Future {
testProbe.ref ! BMessage("concurrentB")
Done
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}
Done
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
val cancellationFut: Future[Done] = {
val cancellables = (0 until 20).map { _ =>

View file

@ -151,7 +151,7 @@ class ExecutionContextSpec extends PekkoSpec with DefaultTimeout {
}
"work with same-thread executor plus blocking" in {
val ec = pekko.dispatch.ExecutionContexts.parasitic
val ec = scala.concurrent.ExecutionContext.parasitic
var x = 0
ec.execute(new Runnable {
override def run = {

View file

@ -20,7 +20,6 @@ import org.scalatest.matchers.should.Matchers
import org.apache.pekko
import pekko.Done
import pekko.dispatch.internal.SameThreadExecutionContext
import pekko.testkit.PekkoSpec
class SameThreadExecutionContextSpec extends PekkoSpec with Matchers {
@ -28,7 +27,7 @@ class SameThreadExecutionContextSpec extends PekkoSpec with Matchers {
"The SameThreadExecutionContext" should {
"return a Scala specific version" in {
val ec = SameThreadExecutionContext()
val ec = ExecutionContext.parasitic
// in Scala 2.13 and higher parasitic is available
ec.getClass.getName should ===("scala.concurrent.ExecutionContext$parasitic$")
}
@ -40,7 +39,7 @@ class SameThreadExecutionContextSpec extends PekkoSpec with Matchers {
.map { _ =>
Thread.currentThread().getName
}(system.dispatcher)
.map(firstName => firstName -> Thread.currentThread().getName)(SameThreadExecutionContext())
.map(firstName => firstName -> Thread.currentThread().getName)(ExecutionContext.parasitic)
promise.success(Done)
val (threadName1, threadName2) = futureThreadNames.futureValue
@ -54,7 +53,7 @@ class SameThreadExecutionContextSpec extends PekkoSpec with Matchers {
.map { _ =>
Thread.currentThread().getName
}(ExecutionContext.global)
.map(firstName => firstName -> Thread.currentThread().getName)(SameThreadExecutionContext())
.map(firstName => firstName -> Thread.currentThread().getName)(ExecutionContext.parasitic)
promise.success(Done)
val (threadName1, threadName2) = futureThreadNames.futureValue

View file

@ -21,7 +21,7 @@ import Tcp._
import org.apache.pekko
import pekko.actor.ActorRef
import pekko.actor.ActorSystem
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.io.Inet.SocketOption
import pekko.testkit.{ PekkoSpec, TestProbe }
import pekko.testkit.SocketUtil.temporaryServerAddress
@ -35,7 +35,7 @@ trait TcpIntegrationSpecSupport { this: PekkoSpec =>
// terminate clientSystem after server system
system.whenTerminated.onComplete { _ =>
res.terminate()
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
res
} else system
val bindHandler = TestProbe()

View file

@ -28,7 +28,7 @@ import org.apache.pekko
import pekko.actor.Address
import pekko.actor.typed.internal.adapter.ActorSystemAdapter
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.pattern.StatusReply
import pekko.util.BoxedType
import pekko.util.JavaDurationConverters._
@ -277,7 +277,7 @@ import scala.util.Success
// Scala API impl
def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] => T): Unit = {
future.onComplete(value => self.unsafeUpcast ! AdaptMessage(value, mapResult))(ExecutionContexts.parasitic)
future.onComplete(value => self.unsafeUpcast ! AdaptMessage(value, mapResult))(ExecutionContext.parasitic)
}
// Java API impl

View file

@ -20,7 +20,7 @@ import org.apache.pekko
import pekko.actor.{ Cancellable, NotInfluenceReceiveTimeout }
import pekko.actor.typed.scaladsl.{ ActorContext, LoggerOps }
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.util.OptionVal
import org.slf4j.Logger
@ -123,13 +123,13 @@ import scala.concurrent.duration.FiniteDuration
val task = mode match {
case SingleMode =>
ctx.system.scheduler.scheduleOnce(delay, () => ctx.self.unsafeUpcast ! timerMsg)(ExecutionContexts.parasitic)
ctx.system.scheduler.scheduleOnce(delay, () => ctx.self.unsafeUpcast ! timerMsg)(ExecutionContext.parasitic)
case m: FixedDelayMode =>
ctx.system.scheduler.scheduleWithFixedDelay(m.initialDelay, delay)(() => ctx.self.unsafeUpcast ! timerMsg)(
ExecutionContexts.parasitic)
ExecutionContext.parasitic)
case m: FixedRateMode =>
ctx.system.scheduler.scheduleAtFixedRate(m.initialDelay, delay)(() => ctx.self.unsafeUpcast ! timerMsg)(
ExecutionContexts.parasitic)
ExecutionContext.parasitic)
}
val nextTimer = Timer(key, msg, mode.repeat, nextGen, task)

View file

@ -114,7 +114,7 @@ import pekko.util.FutureConverters._
override def uptime: Long = classicSystem.uptime
override def printTree: String = system.printTree
import org.apache.pekko.dispatch.ExecutionContexts.parasitic
import scala.concurrent.ExecutionContext.parasitic
override def terminate(): Unit = system.terminate()
override lazy val whenTerminated: scala.concurrent.Future[pekko.Done] =

View file

@ -20,3 +20,6 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.util.Future
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.util.FutureConverters#CompletionStageOps.asScala")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.util.FutureConverters#FutureOps.asJava$extension")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.util.FutureConverters#FutureOps.asJava")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.dispatch.internal.SameThreadExecutionContext")
ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.dispatch.internal.SameThreadExecutionContext$")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.pekko.dispatch.ExecutionContexts.parasitic")

View file

@ -1,29 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.dispatch.internal
import scala.concurrent.ExecutionContext
import org.apache.pekko
import pekko.annotation.InternalApi
/**
* Factory to create same thread ec. Not intended to be called from any other site than to create [[pekko.dispatch.ExecutionContexts#parasitic]]
*
* INTERNAL API
*/
@InternalApi
private[dispatch] object SameThreadExecutionContext {
@inline def apply(): ExecutionContext = ExecutionContext.parasitic
}

View file

@ -1,30 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package org.apache.pekko.dispatch.internal
import scala.concurrent.ExecutionContext
import org.apache.pekko
import pekko.annotation.InternalApi
/**
* Factory to create same thread ec. Not intended to be called from any other site than to create [[pekko.dispatch.ExecutionContexts#parasitic]]
*
* INTERNAL API
*/
@InternalApi
private[dispatch] object SameThreadExecutionContext {
inline def apply(): ExecutionContext = ExecutionContext.parasitic
}

View file

@ -27,7 +27,7 @@ import scala.util.Success
import scala.annotation.nowarn
import org.apache.pekko
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.pattern.ask
import pekko.routing.MurmurHash
import pekko.util.{ Helpers, JavaDurationConverters, Timeout }
@ -75,7 +75,7 @@ abstract class ActorSelection extends Serializable {
* [[ActorRef]].
*/
def resolveOne()(implicit timeout: Timeout): Future[ActorRef] = {
implicit val ec = ExecutionContexts.parasitic
implicit val ec = ExecutionContext.parasitic
val p = Promise[ActorRef]()
this.ask(Identify(None)).onComplete {
case Success(ActorIdentity(_, Some(ref))) => p.success(ref)

View file

@ -32,7 +32,6 @@ import com.typesafe.config.ConfigFactory
import org.apache.pekko
import pekko.Done
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import pekko.event.Logging
import pekko.pattern.after
import pekko.util.OptionConverters._
@ -267,7 +266,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
system.whenTerminated.map { _ =>
if (exitJvm && !runningJvmHook) System.exit(exitCode)
Done
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
} else if (exitJvm) {
System.exit(exitCode)
Future.successful(Done)
@ -493,7 +492,7 @@ final class CoordinatedShutdown private[pekko] (
override val size: Int = tasks.size
override def run(recoverEnabled: Boolean)(implicit ec: ExecutionContext): Future[Done] = {
Future.sequence(tasks.map(_.run(recoverEnabled))).map(_ => Done)(ExecutionContexts.parasitic)
Future.sequence(tasks.map(_.run(recoverEnabled))).map(_ => Done)(ExecutionContext.parasitic)
}
// This method may be run multiple times during the compare-and-set loop of ConcurrentHashMap, so it must be side-effect-free

View file

@ -20,8 +20,6 @@ import java.util.concurrent.CompletionStage
import scala.concurrent.{ ExecutionContext, ExecutionContextExecutor, ExecutionContextExecutorService, Future, Promise }
import org.apache.pekko
import pekko.annotation.InternalStableApi
import pekko.dispatch.internal.SameThreadExecutionContext
import pekko.japi.function.Procedure
/**
@ -77,21 +75,6 @@ object ExecutionContexts {
* @return a reference to the global ExecutionContext
*/
def global(): ExecutionContextExecutor = ExecutionContext.global
/**
* INTERNAL API
*
* WARNING: Not A General Purpose ExecutionContext!
*
* This is an execution context which runs everything on the calling thread.
* It is very useful for actions which are known to be non-blocking and
* non-throwing in order to save a round-trip to the thread pool.
*
* Once Scala 2.12 is no longer supported this can be dropped in favour of directly using `ExecutionContext.parasitic`
*/
@InternalStableApi
private[pekko] val parasitic: ExecutionContext = SameThreadExecutionContext()
}
/**

View file

@ -26,7 +26,7 @@ import scala.util.control.NoStackTrace
import org.apache.pekko
import pekko.actor._
import pekko.annotation.{ InternalApi, InternalStableApi }
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.dispatch.sysmsg._
import pekko.util.{ unused, ByteString, Timeout }
@ -717,7 +717,7 @@ private[pekko] object PromiseActorRef {
val result = Promise[Any]()
val scheduler = provider.guardian.underlying.system.scheduler
val a = new PromiseActorRef(provider, result, messageClassName, refPathPrefix)
implicit val ec = ExecutionContexts.parasitic
implicit val ec = ExecutionContext.parasitic
val f = scheduler.scheduleOnce(timeout.duration) {
val timedOut = result.tryComplete {
val wasSentBy = if (sender == ActorRef.noSender) "" else s" was sent by [$sender]"

View file

@ -28,7 +28,7 @@ import scala.util.control.NonFatal
import org.apache.pekko
import pekko.PekkoException
import pekko.actor.{ ExtendedActorSystem, Scheduler }
import pekko.dispatch.ExecutionContexts.parasitic
import scala.concurrent.ExecutionContext.parasitic
import pekko.pattern.internal.{ CircuitBreakerNoopTelemetry, CircuitBreakerTelemetry }
import pekko.annotation.InternalApi
import pekko.util.FutureConverters._

View file

@ -116,7 +116,7 @@ trait FutureTimeoutSupport {
future.onComplete { result =>
timeout.cancel()
p.tryComplete(result)
}(pekko.dispatch.ExecutionContexts.parasitic)
}(scala.concurrent.ExecutionContext.parasitic)
p.future
}
}

View file

@ -18,7 +18,7 @@ import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
import pekko.actor._
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.dispatch.sysmsg.{ Unwatch, Watch }
import pekko.util.Timeout
@ -65,6 +65,6 @@ trait GracefulStopSupport {
ref.result.future.transform({
case Terminated(t) if t.path == target.path => true
case _ => { internalTarget.sendSystemMessage(Unwatch(target, ref)); false }
}, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ExecutionContexts.parasitic)
}, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ExecutionContext.parasitic)
}
}

View file

@ -22,7 +22,7 @@ import org.apache.pekko
import pekko.Done
import pekko.actor.InvalidMessageException
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
/**
* Generic top-level message type for replies that signal failure or success. Convenient to use together with the
@ -180,5 +180,5 @@ object StatusReply {
ScalaFailure(new IllegalArgumentException(s"Unexpected status reply success value: $unexpected"))
}
case fail @ ScalaFailure(_) => fail.asInstanceOf[Try[T]]
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
}

View file

@ -27,7 +27,7 @@ import pekko.actor.ActorRef
import pekko.actor.ActorSystem
import pekko.actor.SupervisorStrategy
import pekko.dispatch.Dispatchers
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.japi.Util.immutableSeq
import pekko.pattern.ask
import pekko.pattern.pipe
@ -57,7 +57,7 @@ private[pekko] final case class ScatterGatherFirstCompletedRoutees(
extends Routee {
override def send(message: Any, sender: ActorRef): Unit = {
implicit val ec = ExecutionContexts.parasitic
implicit val ec = ExecutionContext.parasitic
if (routees.isEmpty) {
val reply = Future.failed(new TimeoutException("Timeout due to no routees"))
reply.pipeTo(sender)

View file

@ -39,7 +39,7 @@ import pekko.cluster.sharding.internal.{
RememberEntitiesCoordinatorStore,
RememberEntitiesProvider
}
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.event.{ BusLogging, Logging }
import pekko.pattern.{ pipe, AskTimeoutException }
import pekko.persistence._
@ -219,7 +219,7 @@ object ShardCoordinator {
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
import pekko.util.ccompat.JavaConverters._
implicit val ec = ExecutionContexts.parasitic
implicit val ec = ExecutionContext.parasitic
rebalance(currentShardAllocations.asJava, rebalanceInProgress.asJava).map(_.asScala.toSet)
}

View file

@ -19,7 +19,7 @@ import scala.concurrent.Future
import org.apache.pekko
import pekko.actor.Actor
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.persistence.PersistentRepr
import pekko.persistence.journal.{ AsyncRecovery => SAsyncReplay }
import pekko.util.ConstantFun.scalaAnyToUnit
@ -35,10 +35,10 @@ abstract class AsyncRecovery extends SAsyncReplay with AsyncRecoveryPlugin { thi
doAsyncReplayMessages(persistenceId, fromSequenceNr, toSequenceNr, max,
new Consumer[PersistentRepr] {
def accept(p: PersistentRepr) = replayCallback(p)
}).asScala.map(scalaAnyToUnit)(ExecutionContexts.parasitic)
}).asScala.map(scalaAnyToUnit)(ExecutionContext.parasitic)
final def asyncReadHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): Future[Long] =
doAsyncReadHighestSequenceNr(persistenceId, fromSequenceNr: Long)
.asScala
.map(_.longValue)(ExecutionContexts.parasitic)
.map(_.longValue)(ExecutionContext.parasitic)
}

View file

@ -19,7 +19,7 @@ import scala.util.Failure
import scala.util.Try
import org.apache.pekko
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.persistence._
import pekko.persistence.journal.{ AsyncWriteJournal => SAsyncWriteJournal }
import pekko.util.ConstantFun.scalaAnyToUnit
@ -40,9 +40,9 @@ abstract class AsyncWriteJournal extends AsyncRecovery with SAsyncWriteJournal w
else successUnit
}
.to(immutable.IndexedSeq)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
final def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: Long): Future[Unit] = {
doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).asScala.map(scalaAnyToUnit)(ExecutionContexts.parasitic)
doAsyncDeleteMessagesTo(persistenceId, toSequenceNr).asScala.map(scalaAnyToUnit)(ExecutionContext.parasitic)
}
}

View file

@ -16,7 +16,7 @@ package org.apache.pekko.persistence.snapshot.japi
import scala.concurrent.Future
import org.apache.pekko
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.persistence._
import pekko.persistence.snapshot.{ SnapshotStore => SSnapshotStore }
import pekko.util.ConstantFun.scalaAnyToUnit
@ -31,17 +31,17 @@ abstract class SnapshotStore extends SSnapshotStore with SnapshotStorePlugin {
persistenceId: String,
criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]] = {
import pekko.util.OptionConverters._
doLoadAsync(persistenceId, criteria).asScala.map(_.toScala)(ExecutionContexts.parasitic)
doLoadAsync(persistenceId, criteria).asScala.map(_.toScala)(ExecutionContext.parasitic)
}
override final def saveAsync(metadata: SnapshotMetadata, snapshot: Any): Future[Unit] =
doSaveAsync(metadata, snapshot).asScala.map(scalaAnyToUnit)(ExecutionContexts.parasitic)
doSaveAsync(metadata, snapshot).asScala.map(scalaAnyToUnit)(ExecutionContext.parasitic)
override final def deleteAsync(metadata: SnapshotMetadata): Future[Unit] =
doDeleteAsync(metadata).asScala.map(scalaAnyToUnit)(ExecutionContexts.parasitic)
doDeleteAsync(metadata).asScala.map(scalaAnyToUnit)(ExecutionContext.parasitic)
override final def deleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Unit] =
doDeleteAsync(persistenceId: String, criteria: SnapshotSelectionCriteria).asScala.map(scalaAnyToUnit)(
ExecutionContexts.parasitic)
ExecutionContext.parasitic)
}

View file

@ -21,7 +21,7 @@ import org.apache.pekko
import pekko.Done
import pekko.actor.ActorSystem
import pekko.actor.Address
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.remote.UniqueAddress
import pekko.stream.Attributes
import pekko.stream.FlowShape
@ -308,7 +308,7 @@ private[remote] class InboundHandshake(inboundContext: InboundContext, inControl
// periodically.
thenInside()
case None =>
first.onComplete(_ => runInStage.invoke(thenInside))(ExecutionContexts.parasitic)
first.onComplete(_ => runInStage.invoke(thenInside))(ExecutionContext.parasitic)
}
}

View file

@ -26,7 +26,7 @@ import org.apache.pekko
import pekko.Done
import pekko.actor.ActorRef
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.dispatch.sysmsg.SystemMessage
import pekko.event.Logging
import pekko.remote.UniqueAddress
@ -119,7 +119,7 @@ import pekko.util.PrettyDuration.PrettyPrintableDuration
if (isAvailable(out))
pull(in) // onPull from downstream already called
}
outboundContext.controlSubject.attach(this).foreach(callback.invoke)(ExecutionContexts.parasitic)
outboundContext.controlSubject.attach(this).foreach(callback.invoke)(ExecutionContext.parasitic)
}
override def postStop(): Unit = {

View file

@ -33,7 +33,6 @@ import pekko.Done
import pekko.NotUsed
import pekko.actor.ActorSystem
import pekko.actor.ExtendedActorSystem
import pekko.dispatch.ExecutionContexts
import pekko.event.Logging
import pekko.remote.RemoteActorRefProvider
import pekko.remote.RemoteLogMarker
@ -279,7 +278,7 @@ private[remote] class ArteryTcpTransport(
s"Failed to bind TCP to [$bindHost:$bindPort] due to: " +
e.getMessage,
e))
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
// only on initial startup, when ActorSystem is starting
val b = Await.result(binding, settings.Bind.BindTimeout)

View file

@ -29,7 +29,7 @@ import scala.annotation.nowarn
import org.apache.pekko
import pekko.actor._
import pekko.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics }
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.dispatch.sysmsg.{ Unwatch, Watch }
import pekko.event.LoggingAdapter
import pekko.pattern.{ ask, pipe, PromiseActorRef }
@ -393,7 +393,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport)
SetThrottleAck
case _ =>
throw new IllegalArgumentException() // won't happen, compiler exhaustiveness check pleaser
}, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ExecutionContexts.parasitic)
}, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ExecutionContext.parasitic)
}
}

View file

@ -25,7 +25,7 @@ import org.apache.pekko
import pekko.Done
import pekko.actor.ActorRef
import pekko.actor.Address
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.remote.UniqueAddress
import pekko.remote.artery.InboundControlJunction.ControlMessageObserver
import pekko.remote.artery.InboundControlJunction.ControlMessageSubject
@ -65,7 +65,7 @@ private[remote] class TestInboundContext(
val done = a.completeHandshake(peer)
done.foreach { _ =>
associationsByUid.put(peer.uid, a)
}(ExecutionContexts.parasitic)
}(ExecutionContext.parasitic)
done
}

View file

@ -19,7 +19,7 @@ import scala.util.Success
import org.apache.pekko
import pekko.Done
import pekko.NotUsed
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.stream.Attributes
import pekko.stream.FlowShape
import pekko.stream.Inlet
@ -62,7 +62,7 @@ class SubInletOutletSpec extends StreamSpec {
override def preStart(): Unit = {
sideChannel
.watchTermination() { (_, done) =>
done.onComplete(c => subCompletion = c)(ExecutionContexts.parasitic)
done.onComplete(c => subCompletion = c)(ExecutionContext.parasitic)
NotUsed
}
.runWith(Sink.fromGraph(subIn.sink))
@ -169,7 +169,7 @@ class SubInletOutletSpec extends StreamSpec {
Source
.fromGraph(subOut.source)
.runWith(Sink.ignore)
.onComplete(t => subCompletion = t)(ExecutionContexts.parasitic)
.onComplete(t => subCompletion = t)(ExecutionContext.parasitic)
subOut.setHandler(new OutHandler {
override def onPull(): Unit = pull(in)
})

View file

@ -96,7 +96,7 @@ class FlowFlatMapConcatParallelismSpec extends StreamSpec("""
Source(sources)
.flatMapConcat(i, identity(_)) // scala 2.12 can't infer the type of identity
.runWith(Sink.seq)
.map(_.sum)(pekko.dispatch.ExecutionContexts.parasitic)
.map(_.sum)(scala.concurrent.ExecutionContext.parasitic)
.futureValue shouldBe sum
}
}
@ -114,7 +114,7 @@ class FlowFlatMapConcatParallelismSpec extends StreamSpec("""
(current, current)
}, _ => None)
.runWith(Sink.seq)
.map(_.sum)(pekko.dispatch.ExecutionContexts.parasitic)
.map(_.sum)(scala.concurrent.ExecutionContext.parasitic)
.futureValue shouldBe sum
}
}

View file

@ -142,7 +142,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
"combine many sinks to one" in {
val source = Source(List(0, 1, 2, 3, 4, 5))
implicit val ex = org.apache.pekko.dispatch.ExecutionContexts.parasitic
implicit val ex = scala.concurrent.ExecutionContext.parasitic
val sink = Sink
.combine(
List(
@ -155,7 +155,7 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
}
"combine two sinks with combineMat" in {
implicit val ex = org.apache.pekko.dispatch.ExecutionContexts.parasitic
implicit val ex = scala.concurrent.ExecutionContext.parasitic
Source(List(0, 1, 2, 3, 4, 5))
.toMat(Sink.combineMat(Sink.reduce[Int]((a, b) => a + b), Sink.reduce[Int]((a, b) => a + b))(Broadcast[Int](_))(
(f1, f2) => {

View file

@ -18,7 +18,7 @@ import scala.concurrent.Future
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.typed.ActorRef
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.pattern.{ AskTimeoutException, StatusReply }
import pekko.stream._
import pekko.stream.scaladsl._
@ -182,7 +182,7 @@ object ActorFlow {
implicit timeout: Timeout): Flow[(I, Ctx), (A, Ctx), NotUsed] =
askImpl[(I, Ctx), Q, A, (A, Ctx)](parallelism)(ref)(
(in, r) => makeMessage(in._1, r),
(in, o: Future[A]) => o.map(a => a -> in._2)(ExecutionContexts.parasitic))
(in, o: Future[A]) => o.map(a => a -> in._2)(ExecutionContext.parasitic))
/**
* Use for messages whose response is known to be a [[pekko.pattern.StatusReply]]. When a [[pekko.pattern.StatusReply#success]] response
@ -202,7 +202,7 @@ object ActorFlow {
makeMessage: (I, ActorRef[StatusReply[A]]) => Q)(implicit timeout: Timeout): Flow[(I, Ctx), (A, Ctx), NotUsed] = {
askImpl[(I, Ctx), Q, StatusReply[A], (StatusReply[A], Ctx)](parallelism)(ref)(
(in, r) => makeMessage(in._1, r),
(in, o: Future[StatusReply[A]]) => o.map(a => a -> in._2)(ExecutionContexts.parasitic)).map {
(in, o: Future[StatusReply[A]]) => o.map(a => a -> in._2)(ExecutionContext.parasitic)).map {
case (StatusReply.Success(a), ctx) => a.asInstanceOf[A] -> ctx
case (StatusReply.Error(err), _) => throw err
case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser

View file

@ -70,7 +70,7 @@ object KillSwitches {
case _ =>
// callback.invoke is a simple actor send, so it is fine to run on the invoking thread
terminationSignal.onComplete(getAsyncCallback[Try[Done]](onSwitch).invoke)(
pekko.dispatch.ExecutionContexts.parasitic)
scala.concurrent.ExecutionContext.parasitic)
}
}

View file

@ -24,7 +24,7 @@ import scala.util.control.{ NoStackTrace, NonFatal }
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.stage._
import pekko.util.OptionVal
@ -157,7 +157,7 @@ private[stream] final class MapAsyncPartitioned[In, Out, Partition](
partitionsInProgress += partition
future.value match {
case None => future.onComplete(holder)(ExecutionContexts.parasitic)
case None => future.onComplete(holder)(ExecutionContext.parasitic)
case Some(v) =>
// #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and
// run the logic directly on this thread

View file

@ -18,7 +18,7 @@ import scala.util.Try
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.stream._
import pekko.stream.impl.Stages.DefaultAttributes
import pekko.stream.stage.{ GraphStageLogic, GraphStageWithMaterializedValue, OutHandler }
@ -49,7 +49,7 @@ import pekko.util.OptionVal
handleCompletion(value)
case None =>
// callback on future completion
promise.future.onComplete(getAsyncCallback(handleCompletion).invoke)(ExecutionContexts.parasitic)
promise.future.onComplete(getAsyncCallback(handleCompletion).invoke)(ExecutionContext.parasitic)
}
}

View file

@ -220,7 +220,7 @@ import pekko.stream.stage._
.onComplete {
case scala.util.Success(_) =>
case scala.util.Failure(e) => p.tryFailure(e)
}(pekko.dispatch.ExecutionContexts.parasitic)
}(scala.concurrent.ExecutionContext.parasitic)
p.future
}
override def complete(): Unit = callback.invoke(Completion)

View file

@ -28,7 +28,7 @@ import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.DoNotInherit
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.event.Logging
import pekko.stream._
import pekko.stream.ActorAttributes.StreamSubscriptionTimeout
@ -385,7 +385,7 @@ import org.reactivestreams.Subscriber
.foreach {
case NonFatal(e) => p.tryFailure(e)
case _ => ()
}(pekko.dispatch.ExecutionContexts.parasitic)
}(scala.concurrent.ExecutionContext.parasitic)
p.future
}
override def cancel(): Unit = {
@ -561,7 +561,7 @@ import org.reactivestreams.Subscriber
failStage(e)
}
try {
sinkFactory(element).onComplete(cb.invoke)(ExecutionContexts.parasitic)
sinkFactory(element).onComplete(cb.invoke)(ExecutionContext.parasitic)
} catch {
case NonFatal(e) =>
promise.failure(e)

View file

@ -111,7 +111,7 @@ private[pekko] final class UnfoldJava[S, E](s: S, f: function.Function[S, Option
future.value match {
case Some(value) => handle(value)
case None =>
future.onComplete(asyncHandler)(pekko.dispatch.ExecutionContexts.parasitic)
future.onComplete(asyncHandler)(scala.concurrent.ExecutionContext.parasitic)
}
}

View file

@ -20,7 +20,7 @@ import scala.util.control.NonFatal
import org.apache.pekko
import pekko.Done
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts.parasitic
import scala.concurrent.ExecutionContext.parasitic
import pekko.stream._
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.Attributes.SourceLocation

View file

@ -232,7 +232,7 @@ private[pekko] final class FlattenConcat[T, M](parallelism: Int)
private def addPendingFutureElem(future: Future[T]): Unit = {
val inflightSource = new InflightPendingFutureSource[T](invokeCb)
future.onComplete(inflightSource)(pekko.dispatch.ExecutionContexts.parasitic)
future.onComplete(inflightSource)(scala.concurrent.ExecutionContext.parasitic)
queue.enqueue(inflightSource)
}

View file

@ -19,7 +19,7 @@ import scala.util.control.NonFatal
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.stream.{
AbruptStageTerminationException,
Attributes,
@ -62,7 +62,7 @@ import pekko.util.OptionVal
Initializing.onFuture(tryFlow)
case None =>
val cb = getAsyncCallback(Initializing.onFuture)
futureFlow.onComplete(cb.invoke)(ExecutionContexts.parasitic)
futureFlow.onComplete(cb.invoke)(ExecutionContext.parasitic)
// in case both ports are closed before future completion
setKeepGoing(true)
}

View file

@ -24,7 +24,7 @@ import org.apache.pekko
import pekko.Done
import pekko.actor.Cancellable
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.event.Logging
import pekko.stream.{ Shape, _ }
import pekko.stream.FlowMonitorState._
@ -316,7 +316,7 @@ import pekko.stream.stage._
onFutureSourceCompleted(it)
case _ =>
val cb = getAsyncCallback[Try[Graph[SourceShape[T], M]]](onFutureSourceCompleted).invoke _
futureSource.onComplete(cb)(ExecutionContexts.parasitic) // could be optimised FastFuture-like
futureSource.onComplete(cb)(ExecutionContext.parasitic) // could be optimised FastFuture-like
}
// initial handler (until future completes)
@ -397,7 +397,7 @@ import pekko.stream.stage._
handle(completed)
case None =>
val cb = getAsyncCallback[Try[T]](handle).invoke _
future.onComplete(cb)(ExecutionContexts.parasitic)
future.onComplete(cb)(ExecutionContext.parasitic)
}
}

View file

@ -21,7 +21,7 @@ import scala.concurrent.Future
import scala.util.Try
import org.apache.pekko
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.stream.Attributes
import pekko.stream.Attributes.SourceLocation
import pekko.stream.Outlet
@ -50,7 +50,7 @@ private[pekko] final class LazyFutureSource[T](f: () => Future[T]) extends Graph
case Some(result) => handle(result)
case None =>
val cb = getAsyncCallback[Try[T]](handle).invoke _
future.onComplete(cb)(ExecutionContexts.parasitic)
future.onComplete(cb)(ExecutionContext.parasitic)
}
}

View file

@ -468,7 +468,7 @@ private[stream] object Collect {
@InternalApi private[pekko] final case class ScanAsync[In, Out](zero: Out, f: (Out, In) => Future[Out])
extends GraphStage[FlowShape[In, Out]] {
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
val in = Inlet[In]("ScanAsync.in")
val out = Outlet[Out]("ScanAsync.out")
@ -560,7 +560,7 @@ private[stream] object Collect {
eventualCurrent.value match {
case Some(result) => futureCB(result)
case _ => eventualCurrent.onComplete(futureCB)(ExecutionContexts.parasitic)
case _ => eventualCurrent.onComplete(futureCB)(ExecutionContext.parasitic)
}
} catch {
case NonFatal(ex) =>
@ -660,7 +660,7 @@ private[stream] object Collect {
@InternalApi private[pekko] final class FoldAsync[In, Out](zero: Out, f: (Out, In) => Future[Out])
extends GraphStage[FlowShape[In, Out]] {
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
val in = Inlet[In]("FoldAsync.in")
val out = Outlet[Out]("FoldAsync.out")
@ -740,7 +740,7 @@ private[stream] object Collect {
private def handleAggregatingValue(): Unit = {
aggregating.value match {
case Some(result) => futureCB(result) // already completed
case _ => aggregating.onComplete(futureCB)(ExecutionContexts.parasitic)
case _ => aggregating.onComplete(futureCB)(ExecutionContext.parasitic)
}
}
@ -1321,7 +1321,7 @@ private[stream] object Collect {
buffer.enqueue(holder)
future.value match {
case None => future.onComplete(holder)(pekko.dispatch.ExecutionContexts.parasitic)
case None => future.onComplete(holder)(scala.concurrent.ExecutionContext.parasitic)
case Some(v) =>
// #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and
// run the logic directly on this thread
@ -1438,7 +1438,7 @@ private[stream] object Collect {
val future = f(grab(in))
inFlight += 1
future.value match {
case None => future.onComplete(invokeFutureCB)(pekko.dispatch.ExecutionContexts.parasitic)
case None => future.onComplete(invokeFutureCB)(scala.concurrent.ExecutionContext.parasitic)
case Some(v) => futureCompleted(v)
}
} catch {

View file

@ -26,7 +26,7 @@ import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.{ ActorRef, Terminated }
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.io.Inet.SocketOption
import pekko.io.Tcp
import pekko.io.Tcp._
@ -99,7 +99,7 @@ import pekko.util.ByteString
thisStage.tell(Unbind, thisStage)
}
unbindPromise.future
}, unbindPromise.future.map(_ => Done)(ExecutionContexts.parasitic)))
}, unbindPromise.future.map(_ => Done)(ExecutionContext.parasitic)))
case f: CommandFailed =>
val ex = new BindFailedException {
// cannot modify the actual exception class for compatibility reasons
@ -591,7 +591,7 @@ private[stream] object ConnectionSourceStage {
remoteAddress,
eagerMaterializer)
(logic, localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))(ExecutionContexts.parasitic))
(logic, localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))(ExecutionContext.parasitic))
}
override def toString = s"TCP-to($remoteAddress)"

View file

@ -27,7 +27,7 @@ import pekko.Done
import pekko.NotUsed
import pekko.actor.ActorRef
import pekko.actor.ClassicActorSystemProvider
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.Pair
import pekko.japi.function
@ -275,7 +275,7 @@ object Flow {
def completionStageFlow[I, O, M](flow: CompletionStage[Flow[I, O, M]]): Flow[I, O, CompletionStage[M]] = {
import pekko.util.FutureConverters._
val sflow =
scaladsl.Flow.futureFlow(flow.asScala.map(_.asScala)(ExecutionContexts.parasitic)).mapMaterializedValue(_.asJava)
scaladsl.Flow.futureFlow(flow.asScala.map(_.asScala)(ExecutionContext.parasitic)).mapMaterializedValue(_.asJava)
new javadsl.Flow(sflow)
}
@ -328,7 +328,7 @@ object Flow {
def lazyCompletionStageFlow[I, O, M](
create: Creator[CompletionStage[Flow[I, O, M]]]): Flow[I, O, CompletionStage[M]] =
scaladsl.Flow
.lazyFutureFlow[I, O, M](() => create.create().asScala.map(_.asScala)(ExecutionContexts.parasitic))
.lazyFutureFlow[I, O, M](() => create.create().asScala.map(_.asScala)(ExecutionContext.parasitic))
.mapMaterializedValue(_.asJava)
.asJava

View file

@ -20,7 +20,7 @@ import scala.concurrent.Future
import org.apache.pekko
import pekko.Done
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.stream.QueueOfferResult
import pekko.util.FutureConverters._
import pekko.util.OptionConverters._
@ -141,7 +141,7 @@ object SinkQueueWithCancel {
new pekko.stream.scaladsl.SinkQueueWithCancel[T] {
override def pull(): Future[Option[T]] =
queue.pull().asScala.map(_.toScala)(ExecutionContexts.parasitic)
queue.pull().asScala.map(_.toScala)(ExecutionContext.parasitic)
override def cancel(): Unit = queue.cancel()
}

View file

@ -24,7 +24,7 @@ import scala.util.Try
import org.apache.pekko
import pekko._
import pekko.actor.{ ActorRef, ClassicActorSystemProvider, Status }
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.japi.function
import pekko.japi.function.Creator
import pekko.stream._
@ -96,7 +96,7 @@ object Sink {
def forall[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = {
import pekko.util.FutureConverters._
new Sink(scaladsl.Sink.forall[In](p.test)
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContext.parasitic).asJava))
}
/**
@ -121,7 +121,7 @@ object Sink {
def none[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = {
import pekko.util.FutureConverters._
new Sink(scaladsl.Sink.none[In](p.test)
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContext.parasitic).asJava))
}
/**
@ -146,7 +146,7 @@ object Sink {
def exists[In](p: function.Predicate[In]): javadsl.Sink[In, CompletionStage[java.lang.Boolean]] = {
import pekko.util.FutureConverters._
new Sink(scaladsl.Sink.exists[In](p.test)
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContexts.parasitic).asJava))
.mapMaterializedValue(_.map(Boolean.box)(ExecutionContext.parasitic).asJava))
}
/**
@ -231,7 +231,7 @@ object Sink {
f: function.Function[T, CompletionStage[Void]]): Sink[T, CompletionStage[Done]] =
new Sink(
scaladsl.Sink
.foreachAsync(parallelism)((x: T) => f(x).asScala.map(scalaAnyToUnit)(ExecutionContexts.parasitic))
.foreachAsync(parallelism)((x: T) => f(x).asScala.map(scalaAnyToUnit)(ExecutionContext.parasitic))
.toCompletionStage())
/**
@ -260,7 +260,7 @@ object Sink {
* See also [[head]].
*/
def headOption[In](): Sink[In, CompletionStage[Optional[In]]] =
new Sink(scaladsl.Sink.headOption[In].mapMaterializedValue(_.map(_.toJava)(ExecutionContexts.parasitic).asJava))
new Sink(scaladsl.Sink.headOption[In].mapMaterializedValue(_.map(_.toJava)(ExecutionContext.parasitic).asJava))
/**
* A `Sink` that materializes into a `CompletionStage` of the last value received.
@ -280,7 +280,7 @@ object Sink {
* See also [[head]], [[takeLast]].
*/
def lastOption[In](): Sink[In, CompletionStage[Optional[In]]] =
new Sink(scaladsl.Sink.lastOption[In].mapMaterializedValue(_.map(_.toJava)(ExecutionContexts.parasitic).asJava))
new Sink(scaladsl.Sink.lastOption[In].mapMaterializedValue(_.map(_.toJava)(ExecutionContext.parasitic).asJava))
/**
* A `Sink` that materializes into a `CompletionStage` of `List<In>` containing the last `n` collected elements.
@ -294,7 +294,7 @@ object Sink {
new Sink(
scaladsl.Sink
.takeLast[In](n)
.mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContexts.parasitic).asJava))
.mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContext.parasitic).asJava))
}
/**
@ -310,7 +310,7 @@ object Sink {
def seq[In]: Sink[In, CompletionStage[java.util.List[In]]] = {
import pekko.util.ccompat.JavaConverters._
new Sink(
scaladsl.Sink.seq[In].mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContexts.parasitic).asJava))
scaladsl.Sink.seq[In].mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContext.parasitic).asJava))
}
/**
@ -512,7 +512,7 @@ object Sink {
*/
def lazyCompletionStageSink[T, M](create: Creator[CompletionStage[Sink[T, M]]]): Sink[T, CompletionStage[M]] =
new Sink(scaladsl.Sink.lazyFutureSink { () =>
create.create().asScala.map(_.asScala)(ExecutionContexts.parasitic)
create.create().asScala.map(_.asScala)(ExecutionContext.parasitic)
}).mapMaterializedValue(_.asJava)
}

View file

@ -26,7 +26,7 @@ import scala.reflect.ClassTag
import org.apache.pekko
import pekko.{ Done, NotUsed }
import pekko.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider }
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.japi.{ function, JavaPartialFunction, Pair }
import pekko.japi.function.Creator
@ -71,7 +71,7 @@ object Source {
new Source(scaladsl.Source.maybe[T].mapMaterializedValue { (scalaOptionPromise: Promise[Option[T]]) =>
val javaOptionPromise = new CompletableFuture[Optional[T]]()
scalaOptionPromise.completeWith(
javaOptionPromise.asScala.map(_.toScala)(pekko.dispatch.ExecutionContexts.parasitic))
javaOptionPromise.asScala.map(_.toScala)(scala.concurrent.ExecutionContext.parasitic))
javaOptionPromise
})
@ -313,7 +313,7 @@ object Source {
*/
def completionStageSource[T, M](completionStageSource: CompletionStage[Source[T, M]]): Source[T, CompletionStage[M]] =
scaladsl.Source
.futureSource(completionStageSource.asScala.map(_.asScala)(ExecutionContexts.parasitic))
.futureSource(completionStageSource.asScala.map(_.asScala)(ExecutionContext.parasitic))
.mapMaterializedValue(_.asJava)
.asJava
@ -773,7 +773,7 @@ object Source {
new Source(
scaladsl.Source.unfoldResourceAsync[T, R](
() => create.create().asScala,
(resource: R) => read.apply(resource).asScala.map(_.toScala)(pekko.dispatch.ExecutionContexts.parasitic),
(resource: R) => read.apply(resource).asScala.map(_.toScala)(scala.concurrent.ExecutionContext.parasitic),
(resource: R) => close.apply(resource).asScala))
/**

View file

@ -143,7 +143,7 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
class Tcp(system: ExtendedActorSystem) extends pekko.actor.Extension {
import Tcp._
import org.apache.pekko.dispatch.ExecutionContexts.parasitic
import scala.concurrent.ExecutionContext.parasitic
private lazy val delegate: scaladsl.Tcp = scaladsl.Tcp(system)

View file

@ -754,7 +754,8 @@ object Flow {
case Seq(a) =>
val f: Flow[I, O, Future[M]] =
futureFlow(create()
.map(Flow[I].prepend(Source.single(a)).viaMat(_)(Keep.right))(pekko.dispatch.ExecutionContexts.parasitic))
.map(Flow[I].prepend(Source.single(a)).viaMat(_)(Keep.right))(
scala.concurrent.ExecutionContext.parasitic))
f
case Nil =>
val f: Flow[I, O, Future[M]] = Flow[I]

View file

@ -21,7 +21,7 @@ import scala.concurrent.duration.FiniteDuration
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.ApiMayChange
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter }
import pekko.stream._
import pekko.stream.impl.Throttle
@ -141,7 +141,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
*/
def mapAsync[Out2](parallelism: Int)(f: Out => Future[Out2]): Repr[Out2, Ctx] =
via(flow.mapAsync(parallelism) {
case (e, ctx) => f(e).map(o => (o, ctx))(ExecutionContexts.parasitic)
case (e, ctx) => f(e).map(o => (o, ctx))(ExecutionContext.parasitic)
})
/**
@ -155,7 +155,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
f: (Out, P) => Future[Out2]): Repr[Out2, Ctx] = {
via(flow[Out, Ctx].mapAsyncPartitioned(parallelism)(pair => partitioner(pair._1)) {
(pair, partition) =>
f(pair._1, partition).map((_, pair._2))(ExecutionContexts.parasitic)
f(pair._1, partition).map((_, pair._2))(ExecutionContext.parasitic)
})
}
@ -170,7 +170,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
f: (Out, P) => Future[Out2]): Repr[Out2, Ctx] = {
via(flow[Out, Ctx].mapAsyncPartitionedUnordered(parallelism)(pair => partitioner(pair._1)) {
(pair, partition) =>
f(pair._1, partition).map((_, pair._2))(ExecutionContexts.parasitic)
f(pair._1, partition).map((_, pair._2))(ExecutionContext.parasitic)
})
}

View file

@ -21,7 +21,7 @@ import scala.concurrent.Future
import org.apache.pekko
import pekko.Done
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.stream.QueueOfferResult
import pekko.util.FutureConverters._
import pekko.util.OptionConverters._
@ -157,7 +157,7 @@ object SinkQueueWithCancel {
queue: SinkQueueWithCancel[T]): pekko.stream.javadsl.SinkQueueWithCancel[T] =
new pekko.stream.javadsl.SinkQueueWithCancel[T] {
override def pull(): CompletionStage[Optional[T]] =
queue.pull().map(_.toJava)(ExecutionContexts.parasitic).asJava
queue.pull().map(_.toJava)(ExecutionContext.parasitic).asJava
override def cancel(): Unit = queue.cancel()
}
}

View file

@ -23,7 +23,7 @@ import org.apache.pekko
import pekko.{ util, Done, NotUsed }
import pekko.actor.ActorRef
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import scala.concurrent.ExecutionContext
import pekko.stream._
import pekko.stream.impl._
import pekko.stream.impl.Stages.DefaultAttributes
@ -201,7 +201,7 @@ object Sink {
.fromGraph(new HeadOptionStage[T])
.withAttributes(DefaultAttributes.headSink)
.mapMaterializedValue(e =>
e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContexts.parasitic))
e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContext.parasitic))
/**
* A `Sink` that materializes into a `Future` of the optional first value received.
@ -223,7 +223,7 @@ object Sink {
def last[T]: Sink[T, Future[T]] = {
Sink.fromGraph(new TakeLastStage[T](1)).withAttributes(DefaultAttributes.lastSink).mapMaterializedValue { e =>
e.map(_.headOption.getOrElse(throw new NoSuchElementException("last of empty stream")))(
ExecutionContexts.parasitic)
ExecutionContext.parasitic)
}
}
@ -236,7 +236,7 @@ object Sink {
*/
def lastOption[T]: Sink[T, Future[Option[T]]] = {
Sink.fromGraph(new TakeLastStage[T](1)).withAttributes(DefaultAttributes.lastOptionSink).mapMaterializedValue { e =>
e.map(_.headOption)(ExecutionContexts.parasitic)
e.map(_.headOption)(ExecutionContext.parasitic)
}
}