From 4325f8bacd06833d5866258db4e137c1f79763d3 Mon Sep 17 00:00:00 2001 From: Christian Schmitt Date: Fri, 18 Nov 2016 10:25:06 +0100 Subject: [PATCH 1/2] asOutputStream did never complete on close and exposed a unsafe callback #20503, #21864 StreamConverters.asOutputStream did never complete if the buffer size was greater than zero, this lead to a bad state, where the close was unblocked, but the stage was never completed. Also the OutputStream uses getAsyncCallback which had the problem that it always lead to races, if flush or close was called to early. --- .../stream/io/OutputStreamSourceSpec.scala | 47 +++++++++++++++++-- .../impl/io/OutputStreamSourceStage.scala | 19 ++++---- 2 files changed, 51 insertions(+), 15 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala index 806f31be51..f4894780fb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala @@ -6,24 +6,31 @@ package akka.stream.io import java.io.IOException import java.lang.management.ManagementFactory import java.util.concurrent.TimeoutException + import akka.actor.ActorSystem -import akka.stream._ import akka.stream.Attributes.inputBuffer +import akka.stream._ +import akka.stream.impl.ActorMaterializerImpl +import akka.stream.impl.StreamSupervisor import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.io.OutputStreamSourceStage -import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor } -import akka.stream.scaladsl.{ Keep, StreamConverters, Sink } +import akka.stream.scaladsl.Keep +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.StreamConverters import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSink import akka.testkit.TestProbe import akka.util.ByteString + +import scala.concurrent.Await +import scala.concurrent.Future import scala.concurrent.duration.Duration.Zero import scala.concurrent.duration._ -import scala.concurrent.{ Await, Future } import scala.util.Random class OutputStreamSourceSpec extends StreamSpec(UnboundedMailboxConfig) { + import system.dispatcher val settings = ActorMaterializerSettings(system).withDispatcher("akka.actor.default-dispatcher") @@ -204,5 +211,37 @@ class OutputStreamSourceSpec extends StreamSpec(UnboundedMailboxConfig) { assertNoBlockedThreads() } + + "correctly complete the stage after close" in assertAllStagesStopped { + // actually this was a race, so it only happened in at least one of 20 runs + for (_ ← 1 to 20) { + val sourceProbe = TestProbe() + val (outputStream, probe) = TestSourceStage(new OutputStreamSourceStage(timeout), sourceProbe) + .toMat(TestSink.probe[ByteString])(Keep.both).run + + outputStream.write(1) + outputStream.write(1) + outputStream.write(1) + outputStream.write(1) + outputStream.write(1) + outputStream.write(1) + outputStream.write(1) + outputStream.write(1) + + // await the request, so that the close + // will come before + materializer.scheduleOnce(500.milliseconds, new Runnable { + override def run(): Unit = { + probe.request(8) + probe.expectNextN(8) + + probe.expectComplete() + } + }) + + outputStream.close() + } + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala index 21832b4832..fbae436eff 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamSourceStage.scala @@ -32,9 +32,6 @@ private[stream] object OutputStreamSourceStage { case object Ok extends DownstreamStatus case object Canceled extends DownstreamStatus - sealed trait StageWithCallback { - def wakeUp(msg: AdapterToStageMessage): Future[Unit] - } } final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration) extends GraphStageWithMaterializedValue[SourceShape[ByteString], OutputStream] { @@ -52,7 +49,7 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration val dataQueue = new LinkedBlockingQueue[ByteString](maxBuffer) val downstreamStatus = new AtomicReference[DownstreamStatus](Ok) - val logic = new GraphStageLogic(shape) with StageWithCallback { + val logic = new GraphStageLogic(shape) with CallbackWrapper[(AdapterToStageMessage, Promise[Unit])] { var flush: Option[Promise[Unit]] = None var close: Option[Promise[Unit]] = None @@ -68,9 +65,9 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration private val upstreamCallback: AsyncCallback[(AdapterToStageMessage, Promise[Unit])] = getAsyncCallback(onAsyncMessage) - override def wakeUp(msg: AdapterToStageMessage): Future[Unit] = { + def wakeUp(msg: AdapterToStageMessage): Future[Unit] = { val p = Promise[Unit]() - upstreamCallback.invoke((msg, p)) + this.invoke((msg, p)) p.future } @@ -81,11 +78,7 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration sendResponseIfNeed() case Close ⇒ close = Some(event._2) - if (dataQueue.isEmpty) { - downstreamStatus.set(Canceled) - completeStage() - unblockUpstream() - } else sendResponseIfNeed() + sendResponseIfNeed() } private def unblockUpstream(): Boolean = @@ -96,8 +89,10 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration true case None ⇒ close match { case Some(p) ⇒ + downstreamStatus.set(Canceled) p.complete(Success(())) close = None + completeStage() true case None ⇒ false } @@ -115,6 +110,7 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration override def preStart(): Unit = { dispatcher = ActorMaterializerHelper.downcast(materializer).system.dispatchers.lookup(dispatcherId) super.preStart() + initCallback(upstreamCallback.invoke) } setHandler(out, new OutHandler { @@ -151,6 +147,7 @@ final private[stream] class OutputStreamSourceStage(writeTimeout: FiniteDuration super.postStop() } } + (logic, new OutputStreamAdapter(dataQueue, downstreamStatus, logic.wakeUp, writeTimeout)) } } From 6b03383ddbbe4f69926d4a801fcc3952729dfb38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andre=CC=81n?= Date: Mon, 28 Nov 2016 11:01:44 +0100 Subject: [PATCH 2/2] Test case more concistently exercising the race without taking 8s #21864 --- .../stream/io/OutputStreamSourceSpec.scala | 40 +++++++------------ 1 file changed, 14 insertions(+), 26 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala index f4894780fb..4b3f8c4c8e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/OutputStreamSourceSpec.scala @@ -14,9 +14,7 @@ import akka.stream.impl.ActorMaterializerImpl import akka.stream.impl.StreamSupervisor import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.io.OutputStreamSourceStage -import akka.stream.scaladsl.Keep -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.StreamConverters +import akka.stream.scaladsl.{ Keep, Sink, Source, StreamConverters } import akka.stream.testkit.Utils._ import akka.stream.testkit._ import akka.stream.testkit.scaladsl.TestSink @@ -214,33 +212,23 @@ class OutputStreamSourceSpec extends StreamSpec(UnboundedMailboxConfig) { "correctly complete the stage after close" in assertAllStagesStopped { // actually this was a race, so it only happened in at least one of 20 runs - for (_ ← 1 to 20) { - val sourceProbe = TestProbe() - val (outputStream, probe) = TestSourceStage(new OutputStreamSourceStage(timeout), sourceProbe) - .toMat(TestSink.probe[ByteString])(Keep.both).run - outputStream.write(1) - outputStream.write(1) - outputStream.write(1) - outputStream.write(1) - outputStream.write(1) - outputStream.write(1) - outputStream.write(1) - outputStream.write(1) - - // await the request, so that the close - // will come before - materializer.scheduleOnce(500.milliseconds, new Runnable { - override def run(): Unit = { - probe.request(8) - probe.expectNextN(8) - - probe.expectComplete() - } - }) + val bufSize = 4 + val sourceProbe = TestProbe() + val (outputStream, probe) = StreamConverters.asOutputStream(timeout) + .addAttributes(Attributes.inputBuffer(bufSize, bufSize)) + .toMat(TestSink.probe[ByteString])(Keep.both).run + // fill the buffer up + (1 to (bufSize - 1)).foreach(outputStream.write) + Future { outputStream.close() } + // here is the race, has the elements reached the stage buffer yet? + Thread.sleep(500) + probe.request(bufSize - 1) + probe.expectNextN(bufSize - 1) + probe.expectComplete() } }