Clean up same thread execution contexts #26690
* deprecate internal sameThread ec and use a new one for all internal use sites * Use the respective Scala version standard library "same thread" ec * fallback to the old inline impl on 2.12 when reflection isn't possible
This commit is contained in:
parent
59ce257209
commit
d26453b5e8
45 changed files with 248 additions and 165 deletions
|
|
@ -20,7 +20,6 @@ import com.typesafe.config.{ Config, ConfigFactory }
|
|||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ Await, Future }
|
||||
import scala.language.postfixOps
|
||||
import scala.util.Properties
|
||||
|
||||
object ActorSystemSpec {
|
||||
|
||||
|
|
@ -120,15 +119,6 @@ class ActorSystemSpec extends AkkaSpec(ActorSystemSpec.config) with ImplicitSend
|
|||
|
||||
"An ActorSystem" must {
|
||||
|
||||
"use scala.concurrent InternalCallbackExecutor/parasitic" in {
|
||||
val ec = system.asInstanceOf[ActorSystemImpl].internalCallingThreadExecutionContext
|
||||
val scalaVersion = Properties.versionNumberString
|
||||
if (scalaVersion.startsWith("2.13") && scalaVersion != "2.13.0-M5")
|
||||
ec.getClass.getName should ===("scala.concurrent.ExecutionContext$parasitic$")
|
||||
else
|
||||
ec.getClass.getName should ===("scala.concurrent.Future$InternalCallbackExecutor$")
|
||||
}
|
||||
|
||||
"reject invalid names" in {
|
||||
for (n <- Seq(
|
||||
"-hallowelt",
|
||||
|
|
|
|||
|
|
@ -308,10 +308,10 @@ class CoordinatedShutdownSpec
|
|||
Future {
|
||||
testProbe.ref ! BMessage("concurrentB")
|
||||
Done
|
||||
}(ExecutionContexts.sameThreadExecutionContext)
|
||||
}(ExecutionContexts.parasitic)
|
||||
}
|
||||
Done
|
||||
}(ExecutionContexts.sameThreadExecutionContext)
|
||||
}(ExecutionContexts.parasitic)
|
||||
|
||||
val cancellationFut: Future[Done] = {
|
||||
val cancellables = (0 until 20).map { _ =>
|
||||
|
|
|
|||
|
|
@ -139,7 +139,7 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
|
|||
}
|
||||
|
||||
"work with same-thread executor plus blocking" in {
|
||||
val ec = akka.dispatch.ExecutionContexts.sameThreadExecutionContext
|
||||
val ec = akka.dispatch.ExecutionContexts.parasitic
|
||||
var x = 0
|
||||
ec.execute(new Runnable {
|
||||
override def run = {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,59 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch
|
||||
|
||||
import akka.Done
|
||||
import akka.dispatch.internal.SameThreadExecutionContext
|
||||
import akka.testkit.AkkaSpec
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.Promise
|
||||
|
||||
class SameThreadExecutionContextSpec extends AkkaSpec with Matchers {
|
||||
|
||||
"The SameThreadExecutionContext" should {
|
||||
|
||||
"return a Scala specific version" in {
|
||||
val ec = SameThreadExecutionContext()
|
||||
if (util.Properties.versionNumberString.startsWith("2.12")) {
|
||||
ec.getClass.getName should ===("scala.concurrent.Future$InternalCallbackExecutor$")
|
||||
} else {
|
||||
// in 2.13 and higher parasitic is available
|
||||
ec.getClass.getName should ===("scala.concurrent.ExecutionContext$parasitic$")
|
||||
}
|
||||
}
|
||||
|
||||
"should run follow up future operations in the same dispatcher" in {
|
||||
// covered by the respective impl test suites for sure but just in case
|
||||
val promise = Promise[Done]()
|
||||
val futureThreadNames = promise.future
|
||||
.map { _ =>
|
||||
Thread.currentThread().getName
|
||||
}(system.dispatcher)
|
||||
.map(firstName => firstName -> Thread.currentThread().getName)(SameThreadExecutionContext())
|
||||
|
||||
promise.success(Done)
|
||||
val (threadName1, threadName2) = futureThreadNames.futureValue
|
||||
threadName1 should ===(threadName2)
|
||||
}
|
||||
|
||||
"should run follow up future operations in the same execution context" in {
|
||||
// covered by the respective impl test suites for sure but just in case
|
||||
val promise = Promise[Done]()
|
||||
val futureThreadNames = promise.future
|
||||
.map { _ =>
|
||||
Thread.currentThread().getName
|
||||
}(ExecutionContext.global)
|
||||
.map(firstName => firstName -> Thread.currentThread().getName)(SameThreadExecutionContext())
|
||||
|
||||
promise.success(Done)
|
||||
val (threadName1, threadName2) = futureThreadNames.futureValue
|
||||
threadName1 should ===(threadName2)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -23,7 +23,7 @@ trait TcpIntegrationSpecSupport { _: AkkaSpec =>
|
|||
// terminate clientSystem after server system
|
||||
system.whenTerminated.onComplete { _ =>
|
||||
res.terminate()
|
||||
}(ExecutionContexts.sameThreadExecutionContext)
|
||||
}(ExecutionContexts.parasitic)
|
||||
res
|
||||
} else system
|
||||
val bindHandler = TestProbe()
|
||||
|
|
|
|||
|
|
@ -211,8 +211,7 @@ import org.slf4j.LoggerFactory
|
|||
|
||||
// Scala API impl
|
||||
def pipeToSelf[Value](future: Future[Value])(mapResult: Try[Value] => T): Unit = {
|
||||
future.onComplete(value => self.unsafeUpcast ! AdaptMessage(value, mapResult))(
|
||||
ExecutionContexts.sameThreadExecutionContext)
|
||||
future.onComplete(value => self.unsafeUpcast ! AdaptMessage(value, mapResult))(ExecutionContexts.parasitic)
|
||||
}
|
||||
|
||||
// Java API impl
|
||||
|
|
|
|||
|
|
@ -103,14 +103,13 @@ import org.slf4j.Logger
|
|||
|
||||
val task = mode match {
|
||||
case SingleMode =>
|
||||
ctx.system.scheduler
|
||||
.scheduleOnce(delay, () => ctx.self.unsafeUpcast ! timerMsg)(ExecutionContexts.sameThreadExecutionContext)
|
||||
ctx.system.scheduler.scheduleOnce(delay, () => ctx.self.unsafeUpcast ! timerMsg)(ExecutionContexts.parasitic)
|
||||
case FixedDelayMode =>
|
||||
ctx.system.scheduler.scheduleWithFixedDelay(delay, delay)(() => ctx.self.unsafeUpcast ! timerMsg)(
|
||||
ExecutionContexts.sameThreadExecutionContext)
|
||||
ExecutionContexts.parasitic)
|
||||
case FixedRateMode =>
|
||||
ctx.system.scheduler.scheduleAtFixedRate(delay, delay)(() => ctx.self.unsafeUpcast ! timerMsg)(
|
||||
ExecutionContexts.sameThreadExecutionContext)
|
||||
ExecutionContexts.parasitic)
|
||||
}
|
||||
|
||||
val nextTimer = Timer(key, msg, mode.repeat, nextGen, task)
|
||||
|
|
|
|||
|
|
@ -97,11 +97,11 @@ import org.slf4j.{ Logger, LoggerFactory }
|
|||
override def uptime: Long = classicSystem.uptime
|
||||
override def printTree: String = system.printTree
|
||||
|
||||
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
|
||||
import akka.dispatch.ExecutionContexts.parasitic
|
||||
|
||||
override def terminate(): Unit = system.terminate()
|
||||
override lazy val whenTerminated: scala.concurrent.Future[akka.Done] =
|
||||
system.whenTerminated.map(_ => Done)(sameThreadExecutionContext)
|
||||
system.whenTerminated.map(_ => Done)(parasitic)
|
||||
override lazy val getWhenTerminated: CompletionStage[akka.Done] =
|
||||
FutureConverters.toJava(whenTerminated)
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,3 @@
|
|||
# Internals changed
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.actor.ActorSystemImpl.internalCallingThreadExecutionContext")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.pattern.PromiseActorRef.internalCallingThreadExecutionContext")
|
||||
|
|
@ -0,0 +1,19 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch.internal
|
||||
|
||||
import akka.annotation.InternalApi
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
|
||||
/**
|
||||
* Factory to create same thread ec. Not intended to be called from any other site than to create [[akka.dispatch.ExecutionContexts#parasitic]]
|
||||
*
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[dispatch] object SameThreadExecutionContext {
|
||||
def apply(): ExecutionContext = ExecutionContext.parasitic
|
||||
}
|
||||
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||
*/
|
||||
|
||||
package akka.dispatch.internal
|
||||
|
||||
import akka.actor.ReflectiveDynamicAccess
|
||||
import akka.annotation.InternalApi
|
||||
import akka.dispatch.BatchingExecutor
|
||||
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
/**
|
||||
* Factory to create same thread ec. Not intended to be called from any other site than to create [[akka.dispatch.ExecutionContexts#parasitic]]
|
||||
*
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[dispatch] object SameThreadExecutionContext {
|
||||
def apply(): ExecutionContext = {
|
||||
try {
|
||||
// we don't want to introduce a dependency on the actor system to use the same thread execution context
|
||||
val dynamicAccess = new ReflectiveDynamicAccess(getClass.getClassLoader)
|
||||
dynamicAccess.getObjectFor[ExecutionContext]("scala.concurrent.Future$InternalCallbackExecutor$").get
|
||||
} catch {
|
||||
case NonFatal(_) =>
|
||||
// fallback to custom impl in case reflection is not available/possible
|
||||
new ExecutionContext with BatchingExecutor {
|
||||
override protected def unbatchedExecute(runnable: Runnable): Unit = runnable.run()
|
||||
override protected def resubmitOnBlock: Boolean = false // No point since we execute on same thread
|
||||
override def reportFailure(t: Throwable): Unit =
|
||||
throw new IllegalStateException("exception in sameThreadExecutionContext", t)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -68,7 +68,7 @@ abstract class ActorSelection extends Serializable {
|
|||
* [[ActorRef]].
|
||||
*/
|
||||
def resolveOne()(implicit timeout: Timeout): Future[ActorRef] = {
|
||||
implicit val ec = ExecutionContexts.sameThreadExecutionContext
|
||||
implicit val ec = ExecutionContexts.parasitic
|
||||
val p = Promise[ActorRef]()
|
||||
this.ask(Identify(None)).onComplete {
|
||||
case Success(ActorIdentity(_, Some(ref))) => p.success(ref)
|
||||
|
|
|
|||
|
|
@ -965,14 +965,6 @@ private[akka] class ActorSystemImpl(
|
|||
|
||||
val dispatcher: ExecutionContextExecutor = dispatchers.defaultGlobalDispatcher
|
||||
|
||||
val internalCallingThreadExecutionContext: ExecutionContext =
|
||||
dynamicAccess
|
||||
.getObjectFor[ExecutionContext]("scala.concurrent.Future$InternalCallbackExecutor$")
|
||||
.getOrElse(
|
||||
dynamicAccess
|
||||
.getObjectFor[ExecutionContext]("scala.concurrent.ExecutionContext$parasitic$")
|
||||
.getOrElse(ExecutionContexts.sameThreadExecutionContext))
|
||||
|
||||
private[this] final val terminationCallbacks = new TerminationCallbacks(provider.terminationFuture)(dispatcher)
|
||||
|
||||
override def whenTerminated: Future[Terminated] = terminationCallbacks.terminationFuture
|
||||
|
|
|
|||
|
|
@ -251,7 +251,7 @@ object CoordinatedShutdown extends ExtensionId[CoordinatedShutdown] with Extensi
|
|||
system.whenTerminated.map { _ =>
|
||||
if (exitJvm && !runningJvmHook) System.exit(exitCode)
|
||||
Done
|
||||
}(ExecutionContexts.sameThreadExecutionContext)
|
||||
}(ExecutionContexts.parasitic)
|
||||
} else if (exitJvm) {
|
||||
System.exit(exitCode)
|
||||
Future.successful(Done)
|
||||
|
|
@ -458,7 +458,7 @@ final class CoordinatedShutdown private[akka] (
|
|||
override val size: Int = tasks.size
|
||||
|
||||
override def run(recoverEnabled: Boolean)(implicit ec: ExecutionContext): Future[Done] = {
|
||||
Future.sequence(tasks.map(_.run(recoverEnabled))).map(_ => Done)(ExecutionContexts.sameThreadExecutionContext)
|
||||
Future.sequence(tasks.map(_.run(recoverEnabled))).map(_ => Done)(ExecutionContexts.parasitic)
|
||||
}
|
||||
|
||||
// This method may be run multiple times during the compare-and-set loop of ConcurrentHashMap, so it must be side-effect-free
|
||||
|
|
|
|||
|
|
@ -16,7 +16,9 @@ import scala.util.{ Failure, Success, Try }
|
|||
import java.util.concurrent.CompletionStage
|
||||
import java.util.concurrent.CompletableFuture
|
||||
|
||||
import akka.annotation.InternalApi
|
||||
import akka.compat
|
||||
import akka.dispatch.internal.SameThreadExecutionContext
|
||||
import akka.util.unused
|
||||
import com.github.ghik.silencer.silent
|
||||
|
||||
|
|
@ -80,13 +82,25 @@ object ExecutionContexts {
|
|||
* This is an execution context which runs everything on the calling thread.
|
||||
* It is very useful for actions which are known to be non-blocking and
|
||||
* non-throwing in order to save a round-trip to the thread pool.
|
||||
*
|
||||
* INTERNAL API
|
||||
*/
|
||||
// Once Scala 2.12 is no longer supported this can be dropped in favour of directly using [[ExecutionContext.parasitic]]
|
||||
@InternalApi
|
||||
private[akka] val parasitic: ExecutionContext = SameThreadExecutionContext()
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
@deprecated("Use ExecutionContexts.parasitic instead", "2.6.4")
|
||||
private[akka] object sameThreadExecutionContext extends ExecutionContext with BatchingExecutor {
|
||||
override protected def unbatchedExecute(runnable: Runnable): Unit = runnable.run()
|
||||
override protected def unbatchedExecute(runnable: Runnable): Unit = parasitic.execute(runnable)
|
||||
override protected def resubmitOnBlock: Boolean = false // No point since we execute on same thread
|
||||
override def reportFailure(t: Throwable): Unit =
|
||||
throw new IllegalStateException("exception in sameThreadExecutionContext", t)
|
||||
parasitic.reportFailure(t)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -8,12 +8,13 @@ import java.util.concurrent.TimeoutException
|
|||
|
||||
import akka.actor._
|
||||
import akka.annotation.InternalApi
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import akka.dispatch.sysmsg._
|
||||
import akka.util.{ Timeout, Unsafe }
|
||||
import com.github.ghik.silencer.silent
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.{ ExecutionContext, Future, Promise }
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
import scala.language.implicitConversions
|
||||
import scala.util.{ Failure, Success }
|
||||
|
||||
|
|
@ -543,9 +544,6 @@ private[akka] final class PromiseActorRef private (
|
|||
|
||||
override def getParent: InternalActorRef = provider.tempContainer
|
||||
|
||||
def internalCallingThreadExecutionContext: ExecutionContext =
|
||||
provider.guardian.underlying.systemImpl.internalCallingThreadExecutionContext
|
||||
|
||||
/**
|
||||
* Contract of this method:
|
||||
* Must always return the same ActorPath, which must have
|
||||
|
|
@ -657,7 +655,7 @@ private[akka] object PromiseActorRef {
|
|||
val result = Promise[Any]()
|
||||
val scheduler = provider.guardian.underlying.system.scheduler
|
||||
val a = new PromiseActorRef(provider, result, messageClassName)
|
||||
implicit val ec = a.internalCallingThreadExecutionContext
|
||||
implicit val ec = ExecutionContexts.parasitic
|
||||
val f = scheduler.scheduleOnce(timeout.duration) {
|
||||
result.tryComplete {
|
||||
val wasSentBy = if (sender == ActorRef.noSender) "" else s" was sent by [$sender]"
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ import scala.concurrent.duration._
|
|||
import scala.concurrent.TimeoutException
|
||||
import scala.util.control.NonFatal
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
|
||||
import akka.dispatch.ExecutionContexts.parasitic
|
||||
import com.github.ghik.silencer.silent
|
||||
|
||||
import scala.compat.java8.FutureConverters
|
||||
|
|
@ -49,7 +49,7 @@ object CircuitBreaker {
|
|||
maxFailures: Int,
|
||||
callTimeout: FiniteDuration,
|
||||
resetTimeout: FiniteDuration): CircuitBreaker =
|
||||
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(sameThreadExecutionContext)
|
||||
new CircuitBreaker(scheduler, maxFailures, callTimeout, resetTimeout)(parasitic)
|
||||
|
||||
/**
|
||||
* Java API: Create a new CircuitBreaker.
|
||||
|
|
@ -751,8 +751,6 @@ class CircuitBreaker(
|
|||
val start = System.nanoTime()
|
||||
val p = Promise[T]()
|
||||
|
||||
implicit val ec = sameThreadExecutionContext
|
||||
|
||||
p.future.onComplete { fResult =>
|
||||
if (defineFailureFn(fResult)) {
|
||||
callFails()
|
||||
|
|
@ -760,13 +758,13 @@ class CircuitBreaker(
|
|||
notifyCallSuccessListeners(start)
|
||||
callSucceeds()
|
||||
}
|
||||
}
|
||||
}(parasitic)
|
||||
|
||||
val timeout = scheduler.scheduleOnce(callTimeout) {
|
||||
if (p.tryFailure(timeoutEx)) {
|
||||
notifyCallTimeoutListeners(start)
|
||||
}
|
||||
}
|
||||
}(parasitic)
|
||||
|
||||
materialize(body).onComplete {
|
||||
case Success(result) =>
|
||||
|
|
@ -777,7 +775,7 @@ class CircuitBreaker(
|
|||
notifyCallFailureListeners(start)
|
||||
}
|
||||
timeout.cancel
|
||||
}
|
||||
}(parasitic)
|
||||
p.future
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,8 +5,10 @@
|
|||
package akka.pattern
|
||||
|
||||
import akka.actor._
|
||||
import akka.util.{ Timeout }
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import akka.util.Timeout
|
||||
import akka.dispatch.sysmsg.{ Unwatch, Watch }
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
|
|
@ -52,6 +54,6 @@ trait GracefulStopSupport {
|
|||
ref.result.future.transform({
|
||||
case Terminated(t) if t.path == target.path => true
|
||||
case _ => { internalTarget.sendSystemMessage(Unwatch(target, ref)); false }
|
||||
}, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ref.internalCallingThreadExecutionContext)
|
||||
}, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ExecutionContexts.parasitic)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,13 +43,12 @@ private[akka] final case class ScatterGatherFirstCompletedRoutees(
|
|||
within: FiniteDuration)
|
||||
extends Routee {
|
||||
|
||||
override def send(message: Any, sender: ActorRef): Unit =
|
||||
override def send(message: Any, sender: ActorRef): Unit = {
|
||||
implicit val ec = ExecutionContexts.parasitic
|
||||
if (routees.isEmpty) {
|
||||
implicit val ec = ExecutionContexts.sameThreadExecutionContext
|
||||
val reply = Future.failed(new TimeoutException("Timeout due to no routees"))
|
||||
reply.pipeTo(sender)
|
||||
} else {
|
||||
implicit val ec = ExecutionContexts.sameThreadExecutionContext
|
||||
implicit val timeout = Timeout(within)
|
||||
val promise = Promise[Any]()
|
||||
routees.foreach {
|
||||
|
|
@ -63,6 +62,7 @@ private[akka] final case class ScatterGatherFirstCompletedRoutees(
|
|||
promise.future.pipeTo(sender)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A router pool that broadcasts the message to all routees, and replies with the first response.
|
||||
|
|
|
|||
|
|
@ -138,7 +138,7 @@ object ShardCoordinator {
|
|||
currentShardAllocations: Map[ActorRef, immutable.IndexedSeq[ShardId]],
|
||||
rebalanceInProgress: Set[ShardId]): Future[Set[ShardId]] = {
|
||||
import akka.util.ccompat.JavaConverters._
|
||||
implicit val ec = ExecutionContexts.sameThreadExecutionContext
|
||||
implicit val ec = ExecutionContexts.parasitic
|
||||
rebalance(currentShardAllocations.asJava, rebalanceInProgress.asJava).map(_.asScala.toSet)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -295,7 +295,7 @@ private[remote] class InboundHandshake(inboundContext: InboundContext, inControl
|
|||
// periodically.
|
||||
thenInside()
|
||||
case None =>
|
||||
first.onComplete(_ => runInStage.invoke(thenInside))(ExecutionContexts.sameThreadExecutionContext)
|
||||
first.onComplete(_ => runInStage.invoke(thenInside))(ExecutionContexts.parasitic)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -109,9 +109,7 @@ import akka.util.PrettyDuration.PrettyPrintableDuration
|
|||
if (isAvailable(out))
|
||||
pull(in) // onPull from downstream already called
|
||||
}
|
||||
outboundContext.controlSubject
|
||||
.attach(this)
|
||||
.foreach(callback.invoke)(ExecutionContexts.sameThreadExecutionContext)
|
||||
outboundContext.controlSubject.attach(this).foreach(callback.invoke)(ExecutionContexts.parasitic)
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
|
|
|
|||
|
|
@ -273,7 +273,7 @@ private[remote] class ArteryTcpTransport(
|
|||
s"Failed to bind TCP to [$bindHost:$bindPort] due to: " +
|
||||
e.getMessage,
|
||||
e))
|
||||
}(ExecutionContexts.sameThreadExecutionContext)
|
||||
}(ExecutionContexts.parasitic)
|
||||
|
||||
// only on initial startup, when ActorSystem is starting
|
||||
val b = Await.result(binding, settings.Bind.BindTimeout)
|
||||
|
|
|
|||
|
|
@ -22,6 +22,8 @@ import akka.util.{ ByteString, Timeout }
|
|||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import akka.dispatch.ExecutionContexts
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable.Queue
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
|
|
@ -361,7 +363,7 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport)
|
|||
ref.result.future.transform({
|
||||
case Terminated(t) if t.path == target.path => SetThrottleAck
|
||||
case SetThrottleAck => { internalTarget.sendSystemMessage(Unwatch(target, ref)); SetThrottleAck }
|
||||
}, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ref.internalCallingThreadExecutionContext)
|
||||
}, t => { internalTarget.sendSystemMessage(Unwatch(target, ref)); t })(ExecutionContexts.parasitic)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ private[remote] class TestInboundContext(
|
|||
val done = a.completeHandshake(peer)
|
||||
done.foreach { _ =>
|
||||
associationsByUid.put(peer.uid, a)
|
||||
}(ExecutionContexts.sameThreadExecutionContext)
|
||||
}(ExecutionContexts.parasitic)
|
||||
done
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -204,9 +204,9 @@ class FileSinkSpec extends StreamSpec(UnboundedMailboxConfig) with ScalaFutures
|
|||
.lazyInitAsync(() => Future.successful(FileIO.toPath(f)))
|
||||
// map a Future[Option[Future[IOResult]]] into a Future[Option[IOResult]]
|
||||
.mapMaterializedValue(_.flatMap {
|
||||
case Some(future) => future.map(Some(_))(ExecutionContexts.sameThreadExecutionContext)
|
||||
case Some(future) => future.map(Some(_))(ExecutionContexts.parasitic)
|
||||
case None => Future.successful(None)
|
||||
}(ExecutionContexts.sameThreadExecutionContext)))
|
||||
}(ExecutionContexts.parasitic)))
|
||||
|
||||
Await.result(completion, 3.seconds)
|
||||
checkFileContents(f, TestLines.head)
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ object KillSwitches {
|
|||
case _ =>
|
||||
// callback.invoke is a simple actor send, so it is fine to run on the invoking thread
|
||||
terminationSignal.onComplete(getAsyncCallback[Try[Done]](onSwitch).invoke)(
|
||||
akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
akka.dispatch.ExecutionContexts.parasitic)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -39,8 +39,7 @@ import scala.util.Try
|
|||
handleCompletion(value)
|
||||
case None =>
|
||||
// callback on future completion
|
||||
promise.future.onComplete(getAsyncCallback(handleCompletion).invoke)(
|
||||
ExecutionContexts.sameThreadExecutionContext)
|
||||
promise.future.onComplete(getAsyncCallback(handleCompletion).invoke)(ExecutionContexts.parasitic)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -223,7 +223,7 @@ import scala.concurrent.{ Future, Promise }
|
|||
.onComplete {
|
||||
case scala.util.Success(_) =>
|
||||
case scala.util.Failure(e) => p.tryFailure(e)
|
||||
}(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
}(akka.dispatch.ExecutionContexts.parasitic)
|
||||
p.future
|
||||
}
|
||||
override def complete(): Unit = callback.invoke(Completion)
|
||||
|
|
|
|||
|
|
@ -398,7 +398,7 @@ import scala.util.control.NonFatal
|
|||
.foreach {
|
||||
case NonFatal(e) => p.tryFailure(e)
|
||||
case _ => ()
|
||||
}(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
}(akka.dispatch.ExecutionContexts.parasitic)
|
||||
p.future
|
||||
}
|
||||
override def cancel(): Unit = {
|
||||
|
|
@ -572,7 +572,7 @@ import scala.util.control.NonFatal
|
|||
failStage(e)
|
||||
}
|
||||
try {
|
||||
sinkFactory(element).onComplete(cb.invoke)(ExecutionContexts.sameThreadExecutionContext)
|
||||
sinkFactory(element).onComplete(cb.invoke)(ExecutionContexts.parasitic)
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
promise.failure(e)
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ import scala.util.{ Failure, Success, Try }
|
|||
asyncHandler = ac.invoke
|
||||
}
|
||||
|
||||
def onPull(): Unit = f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
def onPull(): Unit = f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.parasitic)
|
||||
|
||||
setHandler(out, this)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package akka.stream.impl
|
|||
|
||||
import akka.Done
|
||||
import akka.annotation.InternalApi
|
||||
import akka.dispatch.ExecutionContexts.sameThreadExecutionContext
|
||||
import akka.dispatch.ExecutionContexts.parasitic
|
||||
import akka.stream.ActorAttributes.SupervisionStrategy
|
||||
import akka.stream._
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
|
|
@ -83,7 +83,7 @@ import scala.util.control.NonFatal
|
|||
state match {
|
||||
case Some(resource) =>
|
||||
try {
|
||||
readData(resource).onComplete(readCallback)(sameThreadExecutionContext)
|
||||
readData(resource).onComplete(readCallback)(parasitic)
|
||||
} catch errorHandler
|
||||
case None =>
|
||||
// we got a pull but there is no open resource, we are either
|
||||
|
|
|
|||
|
|
@ -303,7 +303,7 @@ import scala.concurrent.{ Future, Promise }
|
|||
onFutureSourceCompleted(it)
|
||||
case _ =>
|
||||
val cb = getAsyncCallback[Try[Graph[SourceShape[T], M]]](onFutureSourceCompleted).invoke _
|
||||
futureSource.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext) // could be optimised FastFuture-like
|
||||
futureSource.onComplete(cb)(ExecutionContexts.parasitic) // could be optimised FastFuture-like
|
||||
}
|
||||
|
||||
// initial handler (until future completes)
|
||||
|
|
@ -387,7 +387,7 @@ import scala.concurrent.{ Future, Promise }
|
|||
onFutureCompleted(completed)
|
||||
case None =>
|
||||
val cb = getAsyncCallback[Try[T]](onFutureCompleted).invoke _
|
||||
future.onComplete(cb)(ExecutionContexts.sameThreadExecutionContext)
|
||||
future.onComplete(cb)(ExecutionContexts.parasitic)
|
||||
}
|
||||
|
||||
def onFutureCompleted(result: Try[T]): Unit = {
|
||||
|
|
|
|||
|
|
@ -469,8 +469,6 @@ private[stream] object Collect {
|
|||
private var current: Out = zero
|
||||
private var elementHandled: Boolean = false
|
||||
|
||||
private def ec = ExecutionContexts.sameThreadExecutionContext
|
||||
|
||||
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
|
||||
|
||||
private val ZeroHandler: OutHandler with InHandler = new OutHandler with InHandler {
|
||||
|
|
@ -544,7 +542,7 @@ private[stream] object Collect {
|
|||
|
||||
eventualCurrent.value match {
|
||||
case Some(result) => futureCB(result)
|
||||
case _ => eventualCurrent.onComplete(futureCB)(ec)
|
||||
case _ => eventualCurrent.onComplete(futureCB)(ExecutionContexts.parasitic)
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(ex) =>
|
||||
|
|
@ -652,8 +650,6 @@ private[stream] object Collect {
|
|||
aggregator = zero
|
||||
}
|
||||
|
||||
private def ec = ExecutionContexts.sameThreadExecutionContext
|
||||
|
||||
private val futureCB = getAsyncCallback[Try[Out]] {
|
||||
case Success(update) if update != null =>
|
||||
aggregator = update
|
||||
|
|
@ -711,7 +707,7 @@ private[stream] object Collect {
|
|||
private def handleAggregatingValue(): Unit = {
|
||||
aggregating.value match {
|
||||
case Some(result) => futureCB(result) // already completed
|
||||
case _ => aggregating.onComplete(futureCB)(ec)
|
||||
case _ => aggregating.onComplete(futureCB)(ExecutionContexts.parasitic)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1291,7 +1287,7 @@ private[stream] object Collect {
|
|||
buffer.enqueue(holder)
|
||||
|
||||
future.value match {
|
||||
case None => future.onComplete(holder)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
case None => future.onComplete(holder)(akka.dispatch.ExecutionContexts.parasitic)
|
||||
case Some(v) =>
|
||||
// #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and
|
||||
// run the logic directly on this thread
|
||||
|
|
@ -1405,7 +1401,7 @@ private[stream] object Collect {
|
|||
val future = f(grab(in))
|
||||
inFlight += 1
|
||||
future.value match {
|
||||
case None => future.onComplete(invokeFutureCB)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
|
||||
case None => future.onComplete(invokeFutureCB)(akka.dispatch.ExecutionContexts.parasitic)
|
||||
case Some(v) => futureCompleted(v)
|
||||
}
|
||||
} catch {
|
||||
|
|
@ -2285,7 +2281,7 @@ private[stream] object Collect {
|
|||
onFlowFutureComplete(element)(completed)
|
||||
case None =>
|
||||
val cb = getAsyncCallback[Try[Flow[I, O, M]]](onFlowFutureComplete(element))
|
||||
futureFlow.onComplete(cb.invoke)(ExecutionContexts.sameThreadExecutionContext)
|
||||
futureFlow.onComplete(cb.invoke)(ExecutionContexts.parasitic)
|
||||
}
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
|
|
|
|||
|
|
@ -86,7 +86,7 @@ import scala.concurrent.{ Future, Promise }
|
|||
thisStage.tell(Unbind, thisStage)
|
||||
}
|
||||
unbindPromise.future
|
||||
}, unbindPromise.future.map(_ => Done)(ExecutionContexts.sameThreadExecutionContext)))
|
||||
}, unbindPromise.future.map(_ => Done)(ExecutionContexts.parasitic)))
|
||||
case f: CommandFailed =>
|
||||
val ex = new BindFailedException {
|
||||
// cannot modify the actual exception class for compatibility reasons
|
||||
|
|
@ -533,10 +533,7 @@ private[stream] object ConnectionSourceStage {
|
|||
remoteAddress,
|
||||
eagerMaterializer)
|
||||
|
||||
(
|
||||
logic,
|
||||
localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))(
|
||||
ExecutionContexts.sameThreadExecutionContext))
|
||||
(logic, localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))(ExecutionContexts.parasitic))
|
||||
}
|
||||
|
||||
override def toString = s"TCP-to($remoteAddress)"
|
||||
|
|
|
|||
|
|
@ -263,8 +263,7 @@ object Flow {
|
|||
fallback: function.Creator[M]): Flow[I, O, M] = {
|
||||
import scala.compat.java8.FutureConverters._
|
||||
val sflow = scaladsl.Flow
|
||||
.fromGraph(new LazyFlow[I, O, M](t =>
|
||||
flowFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext)))
|
||||
.fromGraph(new LazyFlow[I, O, M](t => flowFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.parasitic)))
|
||||
.mapMaterializedValue(_ => fallback.create())
|
||||
new Flow(sflow)
|
||||
}
|
||||
|
|
@ -291,13 +290,9 @@ object Flow {
|
|||
import scala.compat.java8.FutureConverters._
|
||||
|
||||
val sflow = scaladsl.Flow
|
||||
.lazyInitAsync(() => flowFactory.create().toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext))
|
||||
.mapMaterializedValue(
|
||||
fut =>
|
||||
fut
|
||||
.map(_.fold[Optional[M]](Optional.empty())(m => Optional.ofNullable(m)))(
|
||||
ExecutionContexts.sameThreadExecutionContext)
|
||||
.toJava)
|
||||
.lazyInitAsync(() => flowFactory.create().toScala.map(_.asScala)(ExecutionContexts.parasitic))
|
||||
.mapMaterializedValue(fut =>
|
||||
fut.map(_.fold[Optional[M]](Optional.empty())(m => Optional.ofNullable(m)))(ExecutionContexts.parasitic).toJava)
|
||||
new Flow(sflow)
|
||||
}
|
||||
|
||||
|
|
@ -353,8 +348,7 @@ object Flow {
|
|||
def lazyCompletionStageFlow[I, O, M](
|
||||
create: Creator[CompletionStage[Flow[I, O, M]]]): Flow[I, O, CompletionStage[M]] =
|
||||
scaladsl.Flow
|
||||
.lazyFutureFlow[I, O, M](() =>
|
||||
create.create().toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext))
|
||||
.lazyFutureFlow[I, O, M](() => create.create().toScala.map(_.asScala)(ExecutionContexts.parasitic))
|
||||
.mapMaterializedValue(_.toJava)
|
||||
.asJava
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import java.util.Optional
|
|||
import java.util.concurrent.CompletionStage
|
||||
|
||||
import akka.Done
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import akka.stream.QueueOfferResult
|
||||
|
||||
import scala.compat.java8.FutureConverters._
|
||||
|
|
@ -128,10 +129,9 @@ object SinkQueueWithCancel {
|
|||
// would have been better to add `asScala` in SinkQueueWithCancel trait, but not doing
|
||||
// that for backwards compatibility reasons
|
||||
new akka.stream.scaladsl.SinkQueueWithCancel[T] {
|
||||
import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext => same }
|
||||
|
||||
override def pull(): Future[Option[T]] =
|
||||
queue.pull().toScala.map(_.asScala)(same)
|
||||
queue.pull().toScala.map(_.asScala)(ExecutionContexts.parasitic)
|
||||
|
||||
override def cancel(): Unit = queue.cancel()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -116,7 +116,7 @@ object Sink {
|
|||
f: function.Function[T, CompletionStage[Void]]): Sink[T, CompletionStage[Done]] =
|
||||
new Sink(
|
||||
scaladsl.Sink
|
||||
.foreachAsync(parallelism)((x: T) => f(x).toScala.map(_ => ())(ExecutionContexts.sameThreadExecutionContext))
|
||||
.foreachAsync(parallelism)((x: T) => f(x).toScala.map(_ => ())(ExecutionContexts.parasitic))
|
||||
.toCompletionStage())
|
||||
|
||||
/**
|
||||
|
|
@ -163,10 +163,7 @@ object Sink {
|
|||
* See also [[head]].
|
||||
*/
|
||||
def headOption[In](): Sink[In, CompletionStage[Optional[In]]] =
|
||||
new Sink(
|
||||
scaladsl.Sink
|
||||
.headOption[In]
|
||||
.mapMaterializedValue(_.map(_.asJava)(ExecutionContexts.sameThreadExecutionContext).toJava))
|
||||
new Sink(scaladsl.Sink.headOption[In].mapMaterializedValue(_.map(_.asJava)(ExecutionContexts.parasitic).toJava))
|
||||
|
||||
/**
|
||||
* A `Sink` that materializes into a `CompletionStage` of the last value received.
|
||||
|
|
@ -186,10 +183,7 @@ object Sink {
|
|||
* See also [[head]], [[takeLast]].
|
||||
*/
|
||||
def lastOption[In](): Sink[In, CompletionStage[Optional[In]]] =
|
||||
new Sink(
|
||||
scaladsl.Sink
|
||||
.lastOption[In]
|
||||
.mapMaterializedValue(_.map(_.asJava)(ExecutionContexts.sameThreadExecutionContext).toJava))
|
||||
new Sink(scaladsl.Sink.lastOption[In].mapMaterializedValue(_.map(_.asJava)(ExecutionContexts.parasitic).toJava))
|
||||
|
||||
/**
|
||||
* A `Sink` that materializes into a a `CompletionStage` of `List<In>` containing the last `n` collected elements.
|
||||
|
|
@ -203,7 +197,7 @@ object Sink {
|
|||
new Sink(
|
||||
scaladsl.Sink
|
||||
.takeLast[In](n)
|
||||
.mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContexts.sameThreadExecutionContext).toJava))
|
||||
.mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContexts.parasitic).toJava))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -219,9 +213,7 @@ object Sink {
|
|||
def seq[In]: Sink[In, CompletionStage[java.util.List[In]]] = {
|
||||
import akka.util.ccompat.JavaConverters._
|
||||
new Sink(
|
||||
scaladsl.Sink
|
||||
.seq[In]
|
||||
.mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContexts.sameThreadExecutionContext).toJava))
|
||||
scaladsl.Sink.seq[In].mapMaterializedValue(fut => fut.map(sq => sq.asJava)(ExecutionContexts.parasitic).toJava))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -389,7 +381,7 @@ object Sink {
|
|||
new Sink(
|
||||
scaladsl.Sink
|
||||
.lazyInit[T, M](
|
||||
t => sinkFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext),
|
||||
t => sinkFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.parasitic),
|
||||
() => fallback.create())
|
||||
.mapMaterializedValue(_.toJava))
|
||||
|
||||
|
|
@ -406,13 +398,9 @@ object Sink {
|
|||
def lazyInitAsync[T, M](
|
||||
sinkFactory: function.Creator[CompletionStage[Sink[T, M]]]): Sink[T, CompletionStage[Optional[M]]] = {
|
||||
val sSink = scaladsl.Sink
|
||||
.lazyInitAsync[T, M](() =>
|
||||
sinkFactory.create().toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext))
|
||||
.mapMaterializedValue(
|
||||
fut =>
|
||||
fut
|
||||
.map(_.fold(Optional.empty[M]())(m => Optional.ofNullable(m)))(ExecutionContexts.sameThreadExecutionContext)
|
||||
.toJava)
|
||||
.lazyInitAsync[T, M](() => sinkFactory.create().toScala.map(_.asScala)(ExecutionContexts.parasitic))
|
||||
.mapMaterializedValue(fut =>
|
||||
fut.map(_.fold(Optional.empty[M]())(m => Optional.ofNullable(m)))(ExecutionContexts.parasitic).toJava)
|
||||
new Sink(sSink)
|
||||
}
|
||||
|
||||
|
|
@ -449,7 +437,7 @@ object Sink {
|
|||
*/
|
||||
def lazyCompletionStageSink[T, M](create: Creator[CompletionStage[Sink[T, M]]]): Sink[T, CompletionStage[M]] =
|
||||
new Sink(scaladsl.Sink.lazyFutureSink { () =>
|
||||
create.create().toScala.map(_.asScala)((ExecutionContexts.sameThreadExecutionContext))
|
||||
create.create().toScala.map(_.asScala)((ExecutionContexts.parasitic))
|
||||
}).mapMaterializedValue(_.toJava)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ object Source {
|
|||
new Source(scaladsl.Source.maybe[T].mapMaterializedValue { scalaOptionPromise: Promise[Option[T]] =>
|
||||
val javaOptionPromise = new CompletableFuture[Optional[T]]()
|
||||
scalaOptionPromise.completeWith(
|
||||
javaOptionPromise.toScala.map(_.asScala)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext))
|
||||
javaOptionPromise.toScala.map(_.asScala)(akka.dispatch.ExecutionContexts.parasitic))
|
||||
|
||||
javaOptionPromise
|
||||
})
|
||||
|
|
@ -255,7 +255,7 @@ object Source {
|
|||
*/
|
||||
def unfoldAsync[S, E](s: S, f: function.Function[S, CompletionStage[Optional[Pair[S, E]]]]): Source[E, NotUsed] =
|
||||
new Source(scaladsl.Source.unfoldAsync(s)((s: S) =>
|
||||
f.apply(s).toScala.map(_.asScala.map(_.toScala))(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)))
|
||||
f.apply(s).toScala.map(_.asScala.map(_.toScala))(akka.dispatch.ExecutionContexts.parasitic)))
|
||||
|
||||
/**
|
||||
* Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`.
|
||||
|
|
@ -305,7 +305,7 @@ object Source {
|
|||
*/
|
||||
def completionStageSource[T, M](completionStageSource: CompletionStage[Source[T, M]]): Source[T, CompletionStage[M]] =
|
||||
scaladsl.Source
|
||||
.futureSource(completionStageSource.toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext))
|
||||
.futureSource(completionStageSource.toScala.map(_.asScala)(ExecutionContexts.parasitic))
|
||||
.mapMaterializedValue(_.toJava)
|
||||
.asJava
|
||||
|
||||
|
|
@ -766,7 +766,7 @@ object Source {
|
|||
new Source(
|
||||
scaladsl.Source.unfoldResourceAsync[T, S](
|
||||
() => create.create().toScala,
|
||||
(s: S) => read.apply(s).toScala.map(_.asScala)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext),
|
||||
(s: S) => read.apply(s).toScala.map(_.asScala)(akka.dispatch.ExecutionContexts.parasitic),
|
||||
(s: S) => close.apply(s).toScala))
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -139,7 +139,7 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
|
|||
|
||||
class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
||||
import Tcp._
|
||||
import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext => ec }
|
||||
import akka.dispatch.ExecutionContexts.parasitic
|
||||
|
||||
private lazy val delegate: scaladsl.Tcp = scaladsl.Tcp(system)
|
||||
|
||||
|
|
@ -175,7 +175,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
delegate
|
||||
.bind(interface, port, backlog, immutableSeq(options), halfClose, optionalDurationToScala(idleTimeout))
|
||||
.map(new IncomingConnection(_))
|
||||
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
|
||||
.mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).toJava))
|
||||
|
||||
/**
|
||||
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`.
|
||||
|
|
@ -221,7 +221,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
delegate
|
||||
.bind(interface, port)
|
||||
.map(new IncomingConnection(_))
|
||||
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
|
||||
.mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).toJava))
|
||||
|
||||
/**
|
||||
* Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint.
|
||||
|
|
@ -259,7 +259,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
halfClose,
|
||||
optionalDurationToScala(connectTimeout),
|
||||
optionalDurationToScala(idleTimeout))
|
||||
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava))
|
||||
.mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).toJava))
|
||||
|
||||
/**
|
||||
* Creates an [[Tcp.OutgoingConnection]] instance representing a prospective TCP client connection to the given endpoint.
|
||||
|
|
@ -309,7 +309,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
Flow.fromGraph(
|
||||
delegate
|
||||
.outgoingConnection(new InetSocketAddress(host, port))
|
||||
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava))
|
||||
.mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).toJava))
|
||||
|
||||
/**
|
||||
* Creates an [[Tcp.OutgoingConnection]] with TLS.
|
||||
|
|
@ -330,7 +330,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
Flow.fromGraph(
|
||||
delegate
|
||||
.outgoingTlsConnection(host, port, sslContext, negotiateNewSession)
|
||||
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava))
|
||||
.mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).toJava))
|
||||
|
||||
/**
|
||||
* Creates an [[Tcp.OutgoingConnection]] with TLS.
|
||||
|
|
@ -363,7 +363,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
immutableSeq(options),
|
||||
connectTimeout,
|
||||
idleTimeout)
|
||||
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava))
|
||||
.mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).toJava))
|
||||
|
||||
/**
|
||||
* Creates an [[Tcp.OutgoingConnection]] with TLS.
|
||||
|
|
@ -381,7 +381,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
Flow.fromGraph(
|
||||
delegate
|
||||
.outgoingConnectionWithTls(remoteAddress, createSSLEngine = () => createSSLEngine.get())
|
||||
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava))
|
||||
.mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).toJava))
|
||||
|
||||
/**
|
||||
* Creates an [[Tcp.OutgoingConnection]] with TLS.
|
||||
|
|
@ -417,7 +417,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
case Some(t) => Failure(t)
|
||||
},
|
||||
closing)
|
||||
.mapMaterializedValue(_.map(new OutgoingConnection(_))(ec).toJava))
|
||||
.mapMaterializedValue(_.map(new OutgoingConnection(_))(parasitic).toJava))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -447,7 +447,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
delegate
|
||||
.bindTls(interface, port, sslContext, negotiateNewSession, backlog, immutableSeq(options), idleTimeout)
|
||||
.map(new IncomingConnection(_))
|
||||
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
|
||||
.mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).toJava))
|
||||
|
||||
/**
|
||||
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
|
||||
|
|
@ -468,7 +468,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
delegate
|
||||
.bindTls(interface, port, sslContext, negotiateNewSession)
|
||||
.map(new IncomingConnection(_))
|
||||
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
|
||||
.mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).toJava))
|
||||
|
||||
/**
|
||||
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
|
||||
|
|
@ -484,7 +484,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
delegate
|
||||
.bindWithTls(interface, port, createSSLEngine = () => createSSLEngine.get())
|
||||
.map(new IncomingConnection(_))
|
||||
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
|
||||
.mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).toJava))
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -518,7 +518,7 @@ class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
|
|||
},
|
||||
closing)
|
||||
.map(new IncomingConnection(_))
|
||||
.mapMaterializedValue(_.map(new ServerBinding(_))(ec).toJava))
|
||||
.mapMaterializedValue(_.map(new ServerBinding(_))(parasitic).toJava))
|
||||
}
|
||||
|
||||
private def optionalDurationToScala(duration: Optional[java.time.Duration]) = {
|
||||
|
|
|
|||
|
|
@ -605,7 +605,7 @@ object Flow {
|
|||
@deprecated("Use 'Flow.lazyFutureFlow' instead", "2.6.0")
|
||||
def lazyInitAsync[I, O, M](flowFactory: () => Future[Flow[I, O, M]]): Flow[I, O, Future[Option[M]]] =
|
||||
Flow.fromGraph(new LazyFlow[I, O, M](_ => flowFactory())).mapMaterializedValue { v =>
|
||||
implicit val ec = akka.dispatch.ExecutionContexts.sameThreadExecutionContext
|
||||
implicit val ec = akka.dispatch.ExecutionContexts.parasitic
|
||||
v.map[Option[M]](Some.apply _).recover { case _: NeverMaterializedException => None }
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
|
|||
*/
|
||||
def mapAsync[Out2](parallelism: Int)(f: Out => Future[Out2]): Repr[Out2, Ctx] =
|
||||
via(flow.mapAsync(parallelism) {
|
||||
case (e, ctx) => f(e).map(o => (o, ctx))(ExecutionContexts.sameThreadExecutionContext)
|
||||
case (e, ctx) => f(e).map(o => (o, ctx))(ExecutionContexts.parasitic)
|
||||
})
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -8,13 +8,13 @@ import java.util.Optional
|
|||
import java.util.concurrent.CompletionStage
|
||||
|
||||
import scala.concurrent.Future
|
||||
|
||||
import akka.Done
|
||||
import akka.stream.QueueOfferResult
|
||||
|
||||
import scala.compat.java8.FutureConverters._
|
||||
import scala.compat.java8.OptionConverters._
|
||||
|
||||
import akka.annotation.InternalApi
|
||||
import akka.dispatch.ExecutionContexts
|
||||
|
||||
/**
|
||||
* This trait allows to have a queue as a data source for some stream.
|
||||
|
|
@ -145,9 +145,8 @@ object SinkQueueWithCancel {
|
|||
*/
|
||||
@InternalApi private[akka] def asJava[T](queue: SinkQueueWithCancel[T]): akka.stream.javadsl.SinkQueueWithCancel[T] =
|
||||
new akka.stream.javadsl.SinkQueueWithCancel[T] {
|
||||
import akka.dispatch.ExecutionContexts.{ sameThreadExecutionContext => same }
|
||||
override def pull(): CompletionStage[Optional[T]] =
|
||||
queue.pull().map(_.asJava)(same).toJava
|
||||
queue.pull().map(_.asJava)(ExecutionContexts.parasitic).toJava
|
||||
override def cancel(): Unit = queue.cancel()
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -194,8 +194,7 @@ object Sink {
|
|||
.fromGraph(new HeadOptionStage[T])
|
||||
.withAttributes(DefaultAttributes.headSink)
|
||||
.mapMaterializedValue(e =>
|
||||
e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(
|
||||
ExecutionContexts.sameThreadExecutionContext))
|
||||
e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContexts.parasitic))
|
||||
|
||||
/**
|
||||
* A `Sink` that materializes into a `Future` of the optional first value received.
|
||||
|
|
@ -217,7 +216,7 @@ object Sink {
|
|||
def last[T]: Sink[T, Future[T]] = {
|
||||
Sink.fromGraph(new TakeLastStage[T](1)).withAttributes(DefaultAttributes.lastSink).mapMaterializedValue { e =>
|
||||
e.map(_.headOption.getOrElse(throw new NoSuchElementException("last of empty stream")))(
|
||||
ExecutionContexts.sameThreadExecutionContext)
|
||||
ExecutionContexts.parasitic)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -230,7 +229,7 @@ object Sink {
|
|||
*/
|
||||
def lastOption[T]: Sink[T, Future[Option[T]]] = {
|
||||
Sink.fromGraph(new TakeLastStage[T](1)).withAttributes(DefaultAttributes.lastOptionSink).mapMaterializedValue { e =>
|
||||
e.map(_.headOption)(ExecutionContexts.sameThreadExecutionContext)
|
||||
e.map(_.headOption)(ExecutionContexts.parasitic)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -607,8 +606,7 @@ object Sink {
|
|||
def lazyInit[T, M](sinkFactory: T => Future[Sink[T, M]], fallback: () => M): Sink[T, Future[M]] =
|
||||
Sink
|
||||
.fromGraph(new LazySink[T, M](sinkFactory))
|
||||
.mapMaterializedValue(
|
||||
_.recover { case _: NeverMaterializedException => fallback() }(ExecutionContexts.sameThreadExecutionContext))
|
||||
.mapMaterializedValue(_.recover { case _: NeverMaterializedException => fallback() }(ExecutionContexts.parasitic))
|
||||
|
||||
/**
|
||||
* Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements,
|
||||
|
|
@ -622,7 +620,7 @@ object Sink {
|
|||
@deprecated("Use 'Sink.lazyFutureSink' instead", "2.6.0")
|
||||
def lazyInitAsync[T, M](sinkFactory: () => Future[Sink[T, M]]): Sink[T, Future[Option[M]]] =
|
||||
Sink.fromGraph(new LazySink[T, M](_ => sinkFactory())).mapMaterializedValue { m =>
|
||||
implicit val ec = ExecutionContexts.sameThreadExecutionContext
|
||||
implicit val ec = ExecutionContexts.parasitic
|
||||
m.map(Option.apply _).recover { case _: NeverMaterializedException => None }
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,23 +6,25 @@ package akka.testkit
|
|||
|
||||
import java.lang.reflect.Modifier
|
||||
|
||||
import org.scalactic.{ CanEqual, TypeCheckedTripleEquals }
|
||||
|
||||
import language.postfixOps
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
import akka.actor.ActorSystem
|
||||
import akka.event.{ Logging, LoggingAdapter }
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.Future
|
||||
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
import akka.dispatch.Dispatchers
|
||||
import akka.event.Logging
|
||||
import akka.event.LoggingAdapter
|
||||
import akka.testkit.TestEvent._
|
||||
import com.typesafe.config.Config
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalactic.CanEqual
|
||||
import org.scalactic.TypeCheckedTripleEquals
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
import org.scalatest.time.{ Millis, Span }
|
||||
import org.scalatest.matchers.should.Matchers
|
||||
import org.scalatest.time.Millis
|
||||
import org.scalatest.time.Span
|
||||
import org.scalatest.wordspec.AnyWordSpecLike
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration._
|
||||
import scala.language.postfixOps
|
||||
|
||||
object AkkaSpec {
|
||||
val testConf: Config = ConfigFactory.parseString("""
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue