diff --git a/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java b/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java index a313dca4e5..c3e525c161 100644 --- a/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java +++ b/akka-docs/rst/java/code/docs/stream/GraphStageDocTest.java @@ -384,7 +384,8 @@ public class GraphStageDocTest extends AbstractJavaTest { //#async-side-channel - // will close upstream when the future completes + // will close upstream in all materializations of the stage instance + // when the completion stage completes public class KillSwitch extends GraphStage> { private final CompletionStage switchF; diff --git a/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala b/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala index 60b97b82c5..70b87d3913 100644 --- a/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/stream/GraphStageDocSpec.scala @@ -274,7 +274,8 @@ class GraphStageDocSpec extends AkkaSpec { import system.dispatcher //#async-side-channel - // will close upstream when the future completes + // will close upstream in all materializations of the graph stage instance + // when the future completes class KillSwitch[A](switch: Future[Unit]) extends GraphStage[FlowShape[A, A]] { val in = Inlet[A]("KillSwitch.in") diff --git a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala index f671d5bdd6..0a20fc95d9 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/util/StreamUtils.scala @@ -115,10 +115,11 @@ private[http] object StreamUtils { def limitByteChunksStage(maxBytesPerChunk: Int): GraphStage[FlowShape[ByteString, ByteString]] = new SimpleLinearGraphStage[ByteString] { override def initialAttributes = Attributes.name("limitByteChunksStage") - var remaining = ByteString.empty override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { + var remaining = ByteString.empty + def splitAndPush(elem: ByteString): Unit = { val toPush = remaining.take(maxBytesPerChunk) val toKeep = remaining.drop(maxBytesPerChunk) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala index ce1d05b2c1..e14248e50a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala @@ -76,9 +76,9 @@ private[stream] object Timers { } final class Idle[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { - private var nextDeadline: Deadline = Deadline.now + timeout override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { + private var nextDeadline: Deadline = Deadline.now + timeout setHandler(in, new InHandler { override def onPush(): Unit = { nextDeadline = Deadline.now + timeout