Materializer settings as attributes (#27499)

* Replace MaterializerSettings with Attributes #25559 
 * Field access to settings deprecated to make stages use attributes instead
 * Internal stages updated to use attributes
 * Docs on ActorMaterializerSettings updated to recommend away from using it
 * Verify all stages stopped after each testcase in FlowGroupBySpec
 * Subscription timeout attributes merged into one
This commit is contained in:
Johan Andrén 2019-09-04 13:37:06 +02:00 committed by GitHub
parent b9a879d722
commit aca63ea198
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
132 changed files with 1596 additions and 1116 deletions

View file

@ -6,15 +6,6 @@ package akka.stream.impl
import java.util.function.BinaryOperator
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.NonFatal
import akka.NotUsed
import akka.actor.ActorRef
import akka.actor.Props
@ -22,6 +13,7 @@ import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.Logging
import akka.stream.ActorAttributes.StreamSubscriptionTimeout
import akka.stream.Attributes.InputBuffer
import akka.stream._
import akka.stream.impl.QueueSink.Output
@ -36,6 +28,16 @@ import akka.util.ccompat._
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
@ -93,10 +95,13 @@ import org.reactivestreams.Subscriber
val proc = new VirtualPublisher[In]
context.materializer match {
case am: ActorMaterializer =>
if (am.settings.subscriptionTimeoutSettings.mode != StreamSubscriptionTimeoutTerminationMode.noop)
am.scheduleOnce(am.settings.subscriptionTimeoutSettings.timeout, new Runnable {
def run(): Unit = proc.onSubscriptionTimeout(am)
val StreamSubscriptionTimeout(timeout, mode) =
context.effectiveAttributes.mandatoryAttribute[StreamSubscriptionTimeout]
if (mode != StreamSubscriptionTimeoutTerminationMode.noop) {
am.scheduleOnce(timeout, new Runnable {
def run(): Unit = proc.onSubscriptionTimeout(am, mode)
})
}
case _ => // not possible to setup timeout
}
(proc, proc)
@ -116,9 +121,7 @@ import org.reactivestreams.Subscriber
override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = {
val actorMaterializer = ActorMaterializerHelper.downcast(context.materializer)
val impl = actorMaterializer.actorOf(
context,
FanoutProcessorImpl.props(context.effectiveAttributes, actorMaterializer.settings))
val impl = actorMaterializer.actorOf(context, FanoutProcessorImpl.props(context.effectiveAttributes))
val fanoutProcessor = new ActorProcessor[In, In](impl)
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]])
@ -356,7 +359,7 @@ import org.reactivestreams.Subscriber
override def preStart(): Unit = {
// Allocates one additional element to hold stream
// closed/failure indicators
buffer = Buffer(maxBuffer + 1, materializer)
buffer = Buffer(maxBuffer + 1, inheritedAttributes)
setKeepGoing(true)
pull(in)
}