Race condition fix for Test{Sink/Source}Stage #20737
This commit is contained in:
parent
31e61d0fb3
commit
80daa2f647
2 changed files with 88 additions and 23 deletions
|
|
@ -1,24 +1,45 @@
|
|||
package akka.stream.testkit
|
||||
|
||||
import akka.actor.NoSerializationVerificationNeeded
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.stream.stage.{ OutHandler, GraphStageWithMaterializedValue, InHandler }
|
||||
import akka.stream.scaladsl.{ Sink, Source }
|
||||
import akka.stream.stage.{ GraphStageWithMaterializedValue, InHandler, OutHandler }
|
||||
import akka.stream._
|
||||
import akka.testkit.TestProbe
|
||||
|
||||
object GraphStageMessages {
|
||||
case object Push extends NoSerializationVerificationNeeded
|
||||
case object UpstreamFinish extends NoSerializationVerificationNeeded
|
||||
case class Failure(ex: Throwable) extends NoSerializationVerificationNeeded
|
||||
import scala.util.control.NonFatal
|
||||
|
||||
case object Pull extends NoSerializationVerificationNeeded
|
||||
case object DownstreamFinish extends NoSerializationVerificationNeeded
|
||||
/**
|
||||
* Messages emitted after the corresponding `stageUnderTest` methods has been invoked.
|
||||
*/
|
||||
object GraphStageMessages {
|
||||
sealed trait StageMessage
|
||||
case object Push extends StageMessage with NoSerializationVerificationNeeded
|
||||
case object UpstreamFinish extends StageMessage with NoSerializationVerificationNeeded
|
||||
case class Failure(ex: Throwable) extends StageMessage with NoSerializationVerificationNeeded
|
||||
|
||||
case object Pull extends StageMessage with NoSerializationVerificationNeeded
|
||||
case object DownstreamFinish extends StageMessage with NoSerializationVerificationNeeded
|
||||
|
||||
/**
|
||||
* Sent to the probe when the stage callback threw an exception
|
||||
* @param operation The operation that failed
|
||||
*/
|
||||
case class StageFailure(operation: StageMessage, exception: Throwable)
|
||||
}
|
||||
|
||||
object TestSinkStage {
|
||||
|
||||
/**
|
||||
* Creates a sink out of the `stageUnderTest` that will inform the `probe`
|
||||
* of graph stage events and callbacks by sending it the various messages found under
|
||||
* [[GraphStageMessages]].
|
||||
*
|
||||
* This allows for creation of a "normal" stream ending with the sink while still being
|
||||
* able to assert internal events.
|
||||
*/
|
||||
def apply[T, M](
|
||||
stageUnderTest: GraphStageWithMaterializedValue[SinkShape[T], M],
|
||||
probe: TestProbe) = new TestSinkStage(stageUnderTest, probe)
|
||||
probe: TestProbe): Sink[T, M] = Sink.fromGraph(new TestSinkStage(stageUnderTest, probe))
|
||||
}
|
||||
|
||||
private[testkit] class TestSinkStage[T, M](
|
||||
|
|
@ -36,16 +57,35 @@ private[testkit] class TestSinkStage[T, M](
|
|||
val inHandler = logic.handlers(in.id).asInstanceOf[InHandler]
|
||||
logic.handlers(in.id) = new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
probe.ref ! GraphStageMessages.Push
|
||||
inHandler.onPush()
|
||||
try {
|
||||
inHandler.onPush()
|
||||
probe.ref ! GraphStageMessages.Push
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
probe.ref ! GraphStageMessages.StageFailure(GraphStageMessages.Push, ex)
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
probe.ref ! GraphStageMessages.UpstreamFinish
|
||||
inHandler.onUpstreamFinish()
|
||||
try {
|
||||
inHandler.onUpstreamFinish()
|
||||
probe.ref ! GraphStageMessages.UpstreamFinish
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
probe.ref ! GraphStageMessages.StageFailure(GraphStageMessages.UpstreamFinish, ex)
|
||||
throw ex
|
||||
}
|
||||
|
||||
}
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = {
|
||||
probe.ref ! GraphStageMessages.Failure(ex)
|
||||
inHandler.onUpstreamFailure(ex)
|
||||
try {
|
||||
inHandler.onUpstreamFailure(ex)
|
||||
probe.ref ! GraphStageMessages.Failure(ex)
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
probe.ref ! GraphStageMessages.StageFailure(GraphStageMessages.Failure(ex), ex)
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
}
|
||||
(logic, mat)
|
||||
|
|
@ -53,9 +93,19 @@ private[testkit] class TestSinkStage[T, M](
|
|||
}
|
||||
|
||||
object TestSourceStage {
|
||||
|
||||
/**
|
||||
* Creates a source out of the `stageUnderTest` that will inform the `probe`
|
||||
* of graph stage events and callbacks by sending it the various messages found under
|
||||
* [[GraphStageMessages]].
|
||||
*
|
||||
* This allows for creation of a "normal" stream starting with the source while still being
|
||||
* able to assert internal events.
|
||||
*/
|
||||
def apply[T, M](
|
||||
stageUnderTest: GraphStageWithMaterializedValue[SourceShape[T], M],
|
||||
probe: TestProbe) = Source.fromGraph(new TestSourceStage(stageUnderTest, probe))
|
||||
probe: TestProbe): Source[T, M] =
|
||||
Source.fromGraph(new TestSourceStage(stageUnderTest, probe))
|
||||
}
|
||||
|
||||
private[testkit] class TestSourceStage[T, M](
|
||||
|
|
@ -73,12 +123,24 @@ private[testkit] class TestSourceStage[T, M](
|
|||
val outHandler = logic.handlers(out.id).asInstanceOf[OutHandler]
|
||||
logic.handlers(out.id) = new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
probe.ref ! GraphStageMessages.Pull
|
||||
outHandler.onPull()
|
||||
try {
|
||||
outHandler.onPull()
|
||||
probe.ref ! GraphStageMessages.Pull
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
probe.ref ! GraphStageMessages.StageFailure(GraphStageMessages.Pull, ex)
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
override def onDownstreamFinish(): Unit = {
|
||||
probe.ref ! GraphStageMessages.DownstreamFinish
|
||||
outHandler.onDownstreamFinish()
|
||||
try {
|
||||
outHandler.onDownstreamFinish()
|
||||
probe.ref ! GraphStageMessages.DownstreamFinish
|
||||
} catch {
|
||||
case NonFatal(ex) ⇒
|
||||
probe.ref ! GraphStageMessages.StageFailure(GraphStageMessages.DownstreamFinish, ex)
|
||||
throw ex
|
||||
}
|
||||
}
|
||||
}
|
||||
(logic, mat)
|
||||
|
|
|
|||
|
|
@ -485,7 +485,7 @@ object MiMa extends AutoPlugin {
|
|||
// #20553 Tree flattening should be separate from Fusing
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.Fusing$StructuralInfo"),
|
||||
ProblemFilters.exclude[MissingClassProblem]("akka.stream.Fusing$StructuralInfo$")
|
||||
),
|
||||
),
|
||||
"2.4.14" -> Seq(
|
||||
// #21423 removal of deprecated stages (in 2.5.x)
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.Source.transform"),
|
||||
|
|
@ -532,11 +532,14 @@ object MiMa extends AutoPlugin {
|
|||
|
||||
// # 21944
|
||||
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.cluster.ClusterEvent#ReachabilityEvent.member"),
|
||||
|
||||
|
||||
// #21645 durable distributed data
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.props"),
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.WriteAggregator.this"),
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.write")
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.cluster.ddata.Replicator.write"),
|
||||
|
||||
// #20737 aligned test sink and test source stage factory methods types
|
||||
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.testkit.TestSinkStage.apply")
|
||||
)
|
||||
)
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue