From 80daa2f6474ba68d22dee2df95955a7d2f0863b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20=22Party=20Cannon=22=20Andr=C3=A9n?= Date: Thu, 8 Dec 2016 13:18:29 +0100 Subject: [PATCH] Race condition fix for Test{Sink/Source}Stage #20737 --- .../akka/stream/testkit/TestGraphStage.scala | 102 ++++++++++++++---- project/MiMa.scala | 9 +- 2 files changed, 88 insertions(+), 23 deletions(-) diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/TestGraphStage.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/TestGraphStage.scala index 6db3b858e3..7afb1f4c0d 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/TestGraphStage.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/TestGraphStage.scala @@ -1,24 +1,45 @@ package akka.stream.testkit import akka.actor.NoSerializationVerificationNeeded -import akka.stream.scaladsl.Source -import akka.stream.stage.{ OutHandler, GraphStageWithMaterializedValue, InHandler } +import akka.stream.scaladsl.{ Sink, Source } +import akka.stream.stage.{ GraphStageWithMaterializedValue, InHandler, OutHandler } import akka.stream._ import akka.testkit.TestProbe -object GraphStageMessages { - case object Push extends NoSerializationVerificationNeeded - case object UpstreamFinish extends NoSerializationVerificationNeeded - case class Failure(ex: Throwable) extends NoSerializationVerificationNeeded +import scala.util.control.NonFatal - case object Pull extends NoSerializationVerificationNeeded - case object DownstreamFinish extends NoSerializationVerificationNeeded +/** + * Messages emitted after the corresponding `stageUnderTest` methods has been invoked. + */ +object GraphStageMessages { + sealed trait StageMessage + case object Push extends StageMessage with NoSerializationVerificationNeeded + case object UpstreamFinish extends StageMessage with NoSerializationVerificationNeeded + case class Failure(ex: Throwable) extends StageMessage with NoSerializationVerificationNeeded + + case object Pull extends StageMessage with NoSerializationVerificationNeeded + case object DownstreamFinish extends StageMessage with NoSerializationVerificationNeeded + + /** + * Sent to the probe when the stage callback threw an exception + * @param operation The operation that failed + */ + case class StageFailure(operation: StageMessage, exception: Throwable) } object TestSinkStage { + + /** + * Creates a sink out of the `stageUnderTest` that will inform the `probe` + * of graph stage events and callbacks by sending it the various messages found under + * [[GraphStageMessages]]. + * + * This allows for creation of a "normal" stream ending with the sink while still being + * able to assert internal events. + */ def apply[T, M]( stageUnderTest: GraphStageWithMaterializedValue[SinkShape[T], M], - probe: TestProbe) = new TestSinkStage(stageUnderTest, probe) + probe: TestProbe): Sink[T, M] = Sink.fromGraph(new TestSinkStage(stageUnderTest, probe)) } private[testkit] class TestSinkStage[T, M]( @@ -36,16 +57,35 @@ private[testkit] class TestSinkStage[T, M]( val inHandler = logic.handlers(in.id).asInstanceOf[InHandler] logic.handlers(in.id) = new InHandler { override def onPush(): Unit = { - probe.ref ! GraphStageMessages.Push - inHandler.onPush() + try { + inHandler.onPush() + probe.ref ! GraphStageMessages.Push + } catch { + case NonFatal(ex) ⇒ + probe.ref ! GraphStageMessages.StageFailure(GraphStageMessages.Push, ex) + throw ex + } } override def onUpstreamFinish(): Unit = { - probe.ref ! GraphStageMessages.UpstreamFinish - inHandler.onUpstreamFinish() + try { + inHandler.onUpstreamFinish() + probe.ref ! GraphStageMessages.UpstreamFinish + } catch { + case NonFatal(ex) ⇒ + probe.ref ! GraphStageMessages.StageFailure(GraphStageMessages.UpstreamFinish, ex) + throw ex + } + } override def onUpstreamFailure(ex: Throwable): Unit = { - probe.ref ! GraphStageMessages.Failure(ex) - inHandler.onUpstreamFailure(ex) + try { + inHandler.onUpstreamFailure(ex) + probe.ref ! GraphStageMessages.Failure(ex) + } catch { + case NonFatal(ex) ⇒ + probe.ref ! GraphStageMessages.StageFailure(GraphStageMessages.Failure(ex), ex) + throw ex + } } } (logic, mat) @@ -53,9 +93,19 @@ private[testkit] class TestSinkStage[T, M]( } object TestSourceStage { + + /** + * Creates a source out of the `stageUnderTest` that will inform the `probe` + * of graph stage events and callbacks by sending it the various messages found under + * [[GraphStageMessages]]. + * + * This allows for creation of a "normal" stream starting with the source while still being + * able to assert internal events. + */ def apply[T, M]( stageUnderTest: GraphStageWithMaterializedValue[SourceShape[T], M], - probe: TestProbe) = Source.fromGraph(new TestSourceStage(stageUnderTest, probe)) + probe: TestProbe): Source[T, M] = + Source.fromGraph(new TestSourceStage(stageUnderTest, probe)) } private[testkit] class TestSourceStage[T, M]( @@ -73,12 +123,24 @@ private[testkit] class TestSourceStage[T, M]( val outHandler = logic.handlers(out.id).asInstanceOf[OutHandler] logic.handlers(out.id) = new OutHandler { override def onPull(): Unit = { - probe.ref ! GraphStageMessages.Pull - outHandler.onPull() + try { + outHandler.onPull() + probe.ref ! GraphStageMessages.Pull + } catch { + case NonFatal(ex) ⇒ + probe.ref ! GraphStageMessages.StageFailure(GraphStageMessages.Pull, ex) + throw ex + } } override def onDownstreamFinish(): Unit = { - probe.ref ! GraphStageMessages.DownstreamFinish - outHandler.onDownstreamFinish() + try { + outHandler.onDownstreamFinish() + probe.ref ! GraphStageMessages.DownstreamFinish + } catch { + case NonFatal(ex) ⇒ + probe.ref ! GraphStageMessages.StageFailure(GraphStageMessages.DownstreamFinish, ex) + throw ex + } } } (logic, mat) diff --git a/project/MiMa.scala b/project/MiMa.scala index ea7fc7bd23..bd083f266e 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -485,7 +485,7 @@ object MiMa extends AutoPlugin { // #20553 Tree flattening should be separate from Fusing ProblemFilters.exclude[MissingClassProblem]("akka.stream.Fusing$StructuralInfo"), ProblemFilters.exclude[MissingClassProblem]("akka.stream.Fusing$StructuralInfo$") - ), + ), "2.4.14" -> Seq( // #21423 removal of deprecated stages (in 2.5.x) ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Source.transform"), @@ -532,11 +532,14 @@ object MiMa extends AutoPlugin { // # 21944 ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ClusterEvent#ReachabilityEvent.member"), - + // #21645 durable distributed data ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.props"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.this"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.write") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.write"), + + // #20737 aligned test sink and test source stage factory methods types + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.testkit.TestSinkStage.apply") ) ) }