diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index a10f6880fc..800ff9fdb0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -23,6 +23,7 @@ import akka.event.Logging import akka.event.LoggingAdapter import akka.stream.Attributes.InputBuffer import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.fusing.ActorGraphInterpreter.ActorOutputBoundary import akka.stream.impl.fusing.ActorGraphInterpreter.BatchingActorInputBoundary @@ -750,7 +751,7 @@ private final case class SavedIslandData( val boundary = new ActorOutputBoundary(shell, out.toString) logics.add(boundary) boundary.stageId = logics.size() - 1 - boundary.attributes = logic.attributes + boundary.attributes = logic.attributes.and(DefaultAttributes.outputBoundary) val connection = outConn() boundary.portToConn(boundary.in.id) = connection @@ -774,6 +775,7 @@ private final case class SavedIslandData( new BatchingActorInputBoundary(bufferSize, shell, publisher, connection.inOwner.toString) logics.add(boundary) boundary.stageId = logics.size() - 1 + boundary.attributes = connection.inOwner.attributes.and(DefaultAttributes.inputBoundary) boundary.portToConn(boundary.out.id + boundary.inCount) = connection connection.outHandler = boundary.handlers(0).asInstanceOf[OutHandler] diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index cf995e6202..e4c4078c3b 100755 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -142,6 +142,9 @@ import akka.stream._ val inputStreamSink = name("inputStreamSink") and IODispatcher val fileSink = name("fileSink") and IODispatcher val fromJavaStream = name("fromJavaStream") + + val inputBoundary = name("input-boundary") + val outputBoundary = name("output-boundary") } }