Add names for input and output boundaries

This commit is contained in:
Johan Andrén 2019-10-17 09:30:40 +02:00
parent c13fa76ab5
commit 8d8c7dbada
2 changed files with 6 additions and 1 deletions

View file

@ -23,6 +23,7 @@ import akka.event.Logging
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.stream.Attributes.InputBuffer import akka.stream.Attributes.InputBuffer
import akka.stream._ import akka.stream._
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.impl.fusing.ActorGraphInterpreter.ActorOutputBoundary import akka.stream.impl.fusing.ActorGraphInterpreter.ActorOutputBoundary
import akka.stream.impl.fusing.ActorGraphInterpreter.BatchingActorInputBoundary import akka.stream.impl.fusing.ActorGraphInterpreter.BatchingActorInputBoundary
@ -750,7 +751,7 @@ private final case class SavedIslandData(
val boundary = new ActorOutputBoundary(shell, out.toString) val boundary = new ActorOutputBoundary(shell, out.toString)
logics.add(boundary) logics.add(boundary)
boundary.stageId = logics.size() - 1 boundary.stageId = logics.size() - 1
boundary.attributes = logic.attributes boundary.attributes = logic.attributes.and(DefaultAttributes.outputBoundary)
val connection = outConn() val connection = outConn()
boundary.portToConn(boundary.in.id) = connection boundary.portToConn(boundary.in.id) = connection
@ -774,6 +775,7 @@ private final case class SavedIslandData(
new BatchingActorInputBoundary(bufferSize, shell, publisher, connection.inOwner.toString) new BatchingActorInputBoundary(bufferSize, shell, publisher, connection.inOwner.toString)
logics.add(boundary) logics.add(boundary)
boundary.stageId = logics.size() - 1 boundary.stageId = logics.size() - 1
boundary.attributes = connection.inOwner.attributes.and(DefaultAttributes.inputBoundary)
boundary.portToConn(boundary.out.id + boundary.inCount) = connection boundary.portToConn(boundary.out.id + boundary.inCount) = connection
connection.outHandler = boundary.handlers(0).asInstanceOf[OutHandler] connection.outHandler = boundary.handlers(0).asInstanceOf[OutHandler]

View file

@ -142,6 +142,9 @@ import akka.stream._
val inputStreamSink = name("inputStreamSink") and IODispatcher val inputStreamSink = name("inputStreamSink") and IODispatcher
val fileSink = name("fileSink") and IODispatcher val fileSink = name("fileSink") and IODispatcher
val fromJavaStream = name("fromJavaStream") val fromJavaStream = name("fromJavaStream")
val inputBoundary = name("input-boundary")
val outputBoundary = name("output-boundary")
} }
} }