include initialAttributes of GraphStage, #22463

* these must also be included via setAttributes because
  because it will create island for async (dispatcher attribute)
* better actor name of GraphStageIsland, we need it in tests for
  lookup of the actor
* this unlocks OutputStreamSourceSpec and InputStreamSinkSpec
This commit is contained in:
Patrik Nordwall 2017-03-07 19:40:50 +01:00
parent d5e117bfc9
commit 1a74b43cd6
8 changed files with 49 additions and 31 deletions

View file

@ -13,7 +13,7 @@ import akka.stream._
import akka.stream.impl.fusing.GraphInterpreterShell import akka.stream.impl.fusing.GraphInterpreterShell
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContextExecutor} import scala.concurrent.{ Await, ExecutionContextExecutor }
/** /**
* ExtendedActorMaterializer used by subtypes which materializer using GraphInterpreterShell * ExtendedActorMaterializer used by subtypes which materializer using GraphInterpreterShell

View file

@ -24,6 +24,9 @@ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
import scala.collection.immutable.Map import scala.collection.immutable.Map
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContextExecutor import scala.concurrent.ExecutionContextExecutor
import scala.annotation.tailrec
import akka.stream.impl.fusing.GraphInterpreter.DownstreamBoundaryStageLogic
import akka.stream.impl.fusing.GraphInterpreter.UpstreamBoundaryStageLogic
object PhasedFusingActorMaterializer { object PhasedFusingActorMaterializer {
@ -656,11 +659,23 @@ final class GraphStageIsland(
case _ case _
val props = ActorGraphInterpreter.props(shell) val props = ActorGraphInterpreter.props(shell)
.withDispatcher(effectiveSettings.dispatcher) .withDispatcher(effectiveSettings.dispatcher)
materializer.actorOf(props, fullIslandName)
materializer.actorOf(props, islandName)
} }
} }
private def fullIslandName: String = {
@tailrec def findUsefulName(i: Int): String = {
if (i == logics.size) islandName
else logics.get(i) match {
case _: DownstreamBoundaryStageLogic[_] | _: UpstreamBoundaryStageLogic[_]
findUsefulName(i + 1)
case _
islandName + "-" + logics.get(i).attributes.nameOrDefault()
}
}
findUsefulName(0)
}
override def toString: String = "GraphStagePhase" override def toString: String = "GraphStagePhase"
} }
@ -714,8 +729,8 @@ final class SinkModulePhase(materializer: PhasedFusingActorMaterializer, islandN
override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = { override def takePublisher(slot: Int, publisher: Publisher[Any]): Unit = {
subscriberOrVirtualPublisher match { subscriberOrVirtualPublisher match {
case v: VirtualPublisher[Any] v.registerPublisher(publisher) case v: VirtualPublisher[_] v.registerPublisher(publisher)
case s: Subscriber[Any] publisher.subscribe(s) case s: Subscriber[Any] @unchecked publisher.subscribe(s)
} }
} }

View file

@ -357,5 +357,5 @@ final case class ProcessorModule[In, Out, Mat](
override def toString: String = f"ProcessorModule [${System.identityHashCode(this)}%08x]" override def toString: String = f"ProcessorModule [${System.identityHashCode(this)}%08x]"
override private[stream] def traversalBuilder = override private[stream] def traversalBuilder =
LinearTraversalBuilder.fromModule(this).makeIsland(ProcessorModuleIslandTag) LinearTraversalBuilder.fromModule(this, attributes).makeIsland(ProcessorModuleIslandTag)
} }

View file

@ -156,23 +156,26 @@ object TraversalBuilder {
/** /**
* Create a generic traversal builder starting from an atomic module. * Create a generic traversal builder starting from an atomic module.
*/ */
def atomic(module: AtomicModule[Shape, Any], attributes: Attributes = Attributes.none): TraversalBuilder = { def atomic(module: AtomicModule[Shape, Any], attributes: Attributes): TraversalBuilder = {
initShape(module.shape) initShape(module.shape)
if (module.shape.outlets.isEmpty) { val builder =
val b = CompletedTraversalBuilder( if (module.shape.outlets.isEmpty) {
traversalSoFar = MaterializeAtomic(module, Array.ofDim[Int](module.shape.outlets.size)), val b = CompletedTraversalBuilder(
inSlots = module.shape.inlets.size, traversalSoFar = MaterializeAtomic(module, Array.ofDim[Int](module.shape.outlets.size)),
inToOffset = module.shape.inlets.map(in in in.id).toMap, inSlots = module.shape.inlets.size,
attributes) inToOffset = module.shape.inlets.map(in in in.id).toMap,
b Attributes.none)
} else { b
AtomicTraversalBuilder( } else {
module, AtomicTraversalBuilder(
Array.ofDim[Int](module.shape.outlets.size), module,
module.shape.outlets.size, Array.ofDim[Int](module.shape.outlets.size),
attributes) module.shape.outlets.size,
} Attributes.none)
}
// important to use setAttributes because it will create island for async (dispatcher attribute)
builder.setAttributes(attributes)
} }
def printTraversal(t: Traversal, indent: Int = 0): Unit = { def printTraversal(t: Traversal, indent: Int = 0): Unit = {
@ -477,7 +480,7 @@ object LinearTraversalBuilder {
* Create a traversal builder specialized for linear graphs. This is designed to be much faster and lightweight * Create a traversal builder specialized for linear graphs. This is designed to be much faster and lightweight
* than its generic counterpart. It can be freely mixed with the generic builder in both ways. * than its generic counterpart. It can be freely mixed with the generic builder in both ways.
*/ */
def fromModule(module: AtomicModule[Shape, Any], attributes: Attributes = Attributes.none): LinearTraversalBuilder = { def fromModule(module: AtomicModule[Shape, Any], attributes: Attributes): LinearTraversalBuilder = {
require(module.shape.inlets.size <= 1, "Modules with more than one input port cannot be linear.") require(module.shape.inlets.size <= 1, "Modules with more than one input port cannot be linear.")
require(module.shape.outlets.size <= 1, "Modules with more than one input port cannot be linear.") require(module.shape.outlets.size <= 1, "Modules with more than one input port cannot be linear.")
TraversalBuilder.initShape(module.shape) TraversalBuilder.initShape(module.shape)

View file

@ -36,7 +36,7 @@ final case class GraphStageModule[+S <: Shape @uncheckedVariance, +M](
if (attributes ne this.attributes) new GraphStageModule(shape, attributes, stage) if (attributes ne this.attributes) new GraphStageModule(shape, attributes, stage)
else this else this
override private[stream] def traversalBuilder = LinearTraversalBuilder.fromModule(this) override private[stream] def traversalBuilder = LinearTraversalBuilder.fromModule(this, attributes)
override def toString: String = f"GraphStage($stage) [${System.identityHashCode(this)}%08x]" override def toString: String = f"GraphStage($stage) [${System.identityHashCode(this)}%08x]"
} }

View file

@ -28,7 +28,7 @@ private[stream] final case class TlsModule(plainIn: Inlet[SslTlsOutbound], plain
override def toString: String = f"TlsModule($closing) [${System.identityHashCode(this)}%08x]" override def toString: String = f"TlsModule($closing) [${System.identityHashCode(this)}%08x]"
override private[stream] def traversalBuilder = TraversalBuilder.atomic(this).makeIsland(TlsModuleIslandTag) override private[stream] def traversalBuilder = TraversalBuilder.atomic(this, attributes).makeIsland(TlsModuleIslandTag)
} }
/** /**

View file

@ -30,8 +30,7 @@ import scala.compat.java8.FutureConverters._
*/ */
final class Source[+Out, +Mat]( final class Source[+Out, +Mat](
override val traversalBuilder: LinearTraversalBuilder, override val traversalBuilder: LinearTraversalBuilder,
override val shape: SourceShape[Out] override val shape: SourceShape[Out])
)
extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] { extends FlowOpsMat[Out, Mat] with Graph[SourceShape[Out], Mat] {
override type Repr[+O] = Source[O, Mat @uncheckedVariance] override type Repr[+O] = Source[O, Mat @uncheckedVariance]
@ -53,8 +52,7 @@ final class Source[+Out, +Mat](
new Source[T, Mat3]( new Source[T, Mat3](
traversalBuilder.append(toAppend, flow.shape, combine), traversalBuilder.append(toAppend, flow.shape, combine),
SourceShape(flow.shape.out) SourceShape(flow.shape.out))
)
} }
/** /**
@ -157,8 +155,7 @@ final class Source[+Out, +Mat](
*/ */
override def async: Repr[Out] = new Source( override def async: Repr[Out] = new Source(
traversalBuilder.makeIsland(GraphStageTag), traversalBuilder.makeIsland(GraphStageTag),
shape shape)
)
/** /**
* Converts this Scala DSL element to it's Java DSL counterpart. * Converts this Scala DSL element to it's Java DSL counterpart.

View file

@ -28,7 +28,10 @@ abstract class GraphStageWithMaterializedValue[+S <: Shape, +M] extends Graph[S,
protected def initialAttributes: Attributes = Attributes.none protected def initialAttributes: Attributes = Attributes.none
final override lazy val traversalBuilder: TraversalBuilder = TraversalBuilder.atomic(GraphStageModule(shape, initialAttributes, this)) final override lazy val traversalBuilder: TraversalBuilder = {
val attr = initialAttributes
TraversalBuilder.atomic(GraphStageModule(shape, attr, this), attr)
}
final override def withAttributes(attr: Attributes): Graph[S, M] = new Graph[S, M] { final override def withAttributes(attr: Attributes): Graph[S, M] = new Graph[S, M] {
override def shape = GraphStageWithMaterializedValue.this.shape override def shape = GraphStageWithMaterializedValue.this.shape