From 8d8c7dbada23c07d2ad57499b61e7b85fb38a2f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 17 Oct 2019 09:30:40 +0200 Subject: [PATCH] Add names for input and output boundaries --- .../akka/stream/impl/PhasedFusingActorMaterializer.scala | 4 +++- akka-stream/src/main/scala/akka/stream/impl/Stages.scala | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala index a10f6880fc..800ff9fdb0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PhasedFusingActorMaterializer.scala @@ -23,6 +23,7 @@ import akka.event.Logging import akka.event.LoggingAdapter import akka.stream.Attributes.InputBuffer import akka.stream._ +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.AtomicModule import akka.stream.impl.fusing.ActorGraphInterpreter.ActorOutputBoundary import akka.stream.impl.fusing.ActorGraphInterpreter.BatchingActorInputBoundary @@ -750,7 +751,7 @@ private final case class SavedIslandData( val boundary = new ActorOutputBoundary(shell, out.toString) logics.add(boundary) boundary.stageId = logics.size() - 1 - boundary.attributes = logic.attributes + boundary.attributes = logic.attributes.and(DefaultAttributes.outputBoundary) val connection = outConn() boundary.portToConn(boundary.in.id) = connection @@ -774,6 +775,7 @@ private final case class SavedIslandData( new BatchingActorInputBoundary(bufferSize, shell, publisher, connection.inOwner.toString) logics.add(boundary) boundary.stageId = logics.size() - 1 + boundary.attributes = connection.inOwner.attributes.and(DefaultAttributes.inputBoundary) boundary.portToConn(boundary.out.id + boundary.inCount) = connection connection.outHandler = boundary.handlers(0).asInstanceOf[OutHandler] diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index cf995e6202..e4c4078c3b 100755 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -142,6 +142,9 @@ import akka.stream._ val inputStreamSink = name("inputStreamSink") and IODispatcher val fileSink = name("fileSink") and IODispatcher val fromJavaStream = name("fromJavaStream") + + val inputBoundary = name("input-boundary") + val outputBoundary = name("output-boundary") } }