diff --git a/akka-stream-tests/src/test/resources/application.conf b/akka-stream-tests/src/test/resources/reference.conf similarity index 51% rename from akka-stream-tests/src/test/resources/application.conf rename to akka-stream-tests/src/test/resources/reference.conf index 1660c0e30d..e4d5562864 100644 --- a/akka-stream-tests/src/test/resources/application.conf +++ b/akka-stream-tests/src/test/resources/reference.conf @@ -3,5 +3,8 @@ akka { actor { serialize-creators = on serialize-messages = on + default-dispatcher.throughput = 1 // Amplify the effects of fuzzing } + + stream.materializer.debug.fuzzing-mode = on } \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala index 7a48944826..3336dc541c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterPortsSpec.scala @@ -384,6 +384,8 @@ class GraphInterpreterPortsSpec extends AkkaSpec with GraphInterpreterSpecKit { "ignore pull while completing" in new PortTestSetup { out.complete() in.pull() + // While the pull event is not enqueued at this point, we should still report the state correctly + in.hasBeenPulled should be(true) stepAll() @@ -648,6 +650,8 @@ class GraphInterpreterPortsSpec extends AkkaSpec with GraphInterpreterSpecKit { in.cancel() out.push(0) + // While the push event is not enqueued at this point, we should still report the state correctly + out.isAvailable should be(false) stepAll() @@ -1066,6 +1070,7 @@ class GraphInterpreterPortsSpec extends AkkaSpec with GraphInterpreterSpecKit { "ignore pull while failing" in new PortTestSetup { out.fail(TE("test")) in.pull() + in.hasBeenPulled should be(true) stepAll() diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala index 990c64120f..df61390cde 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpec.scala @@ -3,7 +3,8 @@ */ package akka.stream.impl.fusing -import akka.stream.Attributes +import akka.stream.{ OverflowStrategy, Attributes } +import akka.stream.stage.AbstractStage.PushPullGraphStage import akka.stream.testkit.AkkaSpec import akka.stream.scaladsl.{ Merge, Broadcast, Balance, Zip } import GraphInterpreter._ @@ -341,6 +342,43 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit { // The cycle is now empty interpreter.isSuspended should be(false) } + + "implement buffer" in new TestSetup { + val source = new UpstreamProbe[String]("source") + val sink = new DownstreamProbe[String]("sink") + val buffer = new PushPullGraphStage[String, String, Unit]( + (_) ⇒ new Buffer[String](2, OverflowStrategy.backpressure), + Attributes.none) + + builder(buffer) + .connect(source, buffer.shape.inlet) + .connect(buffer.shape.outlet, sink) + .init() + + stepAll() + lastEvents() should ===(Set(RequestOne(source))) + + sink.requestOne() + lastEvents() should ===(Set.empty) + + source.onNext("A") + lastEvents() should ===(Set(RequestOne(source), OnNext(sink, "A"))) + + source.onNext("B") + lastEvents() should ===(Set(RequestOne(source))) + + source.onNext("C", eventLimit = 0) + sink.requestOne() + lastEvents() should ===(Set(OnNext(sink, "B"), RequestOne(source))) + + sink.requestOne(eventLimit = 0) + source.onComplete(eventLimit = 3) + lastEvents() should ===(Set(OnNext(sink, "C"))) + + sink.requestOne() + lastEvents() should ===(Set(OnComplete(sink))) + + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala index 5825ba3c55..9d2642426d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala @@ -32,7 +32,7 @@ trait GraphInterpreterSpecKit { in.id = 0 } - class AssemblyBuilder(stages: Seq[GraphStage[_ <: Shape]]) { + class AssemblyBuilder(stages: Seq[GraphStageWithMaterializedValue[_ <: Shape, _]]) { var upstreams = Vector.empty[(UpstreamBoundaryStageLogic[_], Inlet[_])] var downstreams = Vector.empty[(Outlet[_], DownstreamBoundaryStageLogic[_])] var connections = Vector.empty[(Outlet[_], Inlet[_])] @@ -71,7 +71,8 @@ trait GraphInterpreterSpecKit { val assembly = buildAssembly() val (inHandlers, outHandlers, logics, _) = assembly.materialize(Attributes.none) - _interpreter = new GraphInterpreter(assembly, NoMaterializer, NoLogging, inHandlers, outHandlers, logics, (_, _, _) ⇒ ()) + _interpreter = new GraphInterpreter(assembly, NoMaterializer, NoLogging, inHandlers, outHandlers, logics, + (_, _, _) ⇒ (), fuzzingMode = false) for ((upstream, i) ← upstreams.zipWithIndex) { _interpreter.attachUpstreamBoundary(i, upstream._1) @@ -87,10 +88,11 @@ trait GraphInterpreterSpecKit { def manualInit(assembly: GraphAssembly): Unit = { val (inHandlers, outHandlers, logics, _) = assembly.materialize(Attributes.none) - _interpreter = new GraphInterpreter(assembly, NoMaterializer, NoLogging, inHandlers, outHandlers, logics, (_, _, _) ⇒ ()) + _interpreter = new GraphInterpreter(assembly, NoMaterializer, NoLogging, inHandlers, outHandlers, logics, + (_, _, _) ⇒ (), fuzzingMode = false) } - def builder(stages: GraphStage[_ <: Shape]*): AssemblyBuilder = new AssemblyBuilder(stages) + def builder(stages: GraphStageWithMaterializedValue[_ <: Shape, _]*): AssemblyBuilder = new AssemblyBuilder(stages) } abstract class TestSetup extends Builder { @@ -132,6 +134,18 @@ trait GraphInterpreterSpecKit { push(out, elem) interpreter.execute(eventLimit) } + + def onComplete(eventLimit: Int = Int.MaxValue): Unit = { + if (GraphInterpreter.Debug) println(s"----- COMPLETE $this") + complete(out) + interpreter.execute(eventLimit) + } + + def onFailure(eventLimit: Int = Int.MaxValue, ex: Throwable): Unit = { + if (GraphInterpreter.Debug) println(s"----- FAIL $this") + fail(out, ex) + interpreter.execute(eventLimit) + } } class DownstreamProbe[T](override val toString: String) extends DownstreamBoundaryStageLogic[T] { @@ -149,6 +163,12 @@ trait GraphInterpreterSpecKit { pull(in) interpreter.execute(eventLimit) } + + def cancel(eventLimit: Int = Int.MaxValue): Unit = { + if (GraphInterpreter.Debug) println(s"----- CANCEL $this") + cancel(in) + interpreter.execute(eventLimit) + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala index da0f8d34a6..370f74656b 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpSpec.scala @@ -19,7 +19,10 @@ import scala.concurrent.duration._ import java.net.BindException import akka.testkit.EventFilter -class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-enabled=auto\nakka.stream.materializer.subscription-timeout.timeout = 3s") with TcpHelper { +class TcpSpec extends AkkaSpec( + """ + |akka.io.tcp.windows-connection-abort-workaround-enabled=auto + |akka.stream.materializer.subscription-timeout.timeout = 3s""".stripMargin) with TcpHelper { var demand = 0L "Outgoing TCP stream" must { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala index 7a56cdd0e5..afa5219ddc 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala @@ -6,9 +6,7 @@ package akka.stream.scaladsl import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings -import akka.stream.OverflowStrategy +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, OverflowStrategy } import akka.stream.OverflowStrategy.Fail.BufferOverflowException import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ @@ -77,6 +75,9 @@ class FlowBufferSpec extends AkkaSpec { // Fill up buffer for (i ← 1 to 200) publisher.sendNext(i) + // The next request would be otherwise in race with the last onNext in the above loop + subscriber.expectNoMsg(500.millis) + // drain for (i ← 101 to 200) { sub.request(1) @@ -103,6 +104,9 @@ class FlowBufferSpec extends AkkaSpec { // Fill up buffer for (i ← 1 to 200) publisher.sendNext(i) + // The next request would be otherwise in race with the last onNext in the above loop + subscriber.expectNoMsg(500.millis) + // drain for (i ← 1 to 99) { sub.request(1) @@ -132,6 +136,9 @@ class FlowBufferSpec extends AkkaSpec { // Fill up buffer for (i ← 1 to 150) publisher.sendNext(i) + // The next request would be otherwise in race with the last onNext in the above loop + subscriber.expectNoMsg(500.millis) + // drain for (i ← 101 to 150) { sub.request(1) @@ -151,9 +158,14 @@ class FlowBufferSpec extends AkkaSpec { "drop new elements if buffer is full and configured so" in { val (publisher, subscriber) = TestSource.probe[Int].buffer(100, overflowStrategy = OverflowStrategy.dropNew).toMat(TestSink.probe[Int])(Keep.both).run() + subscriber.ensureSubscription() + // Fill up buffer for (i ← 1 to 150) publisher.sendNext(i) + // The next request would be otherwise in race with the last onNext in the above loop + subscriber.expectNoMsg(500.millis) + // drain for (i ← 1 to 100) { subscriber.requestNext(i) @@ -205,6 +217,8 @@ class FlowBufferSpec extends AkkaSpec { // Fill up buffer for (i ← 1 to 200) publisher.sendNext(i) + // The request below is in race otherwise with the onNext(200) above + subscriber.expectNoMsg(500.millis) sub.request(1) subscriber.expectNext(200) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala index aade0269b4..3e2055b78f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala @@ -70,17 +70,19 @@ class FlowConflateSpec extends AkkaSpec { subscriber.expectNext(1) sub.request(1) - subscriber.expectNoMsg(1.second) + subscriber.expectNoMsg(500.millis) publisher.sendNext(2) subscriber.expectNext(2) publisher.sendNext(3) publisher.sendNext(4) + // The request can be in race with the above onNext(4) so the result would be either 3 or 7. + subscriber.expectNoMsg(500.millis) sub.request(1) subscriber.expectNext(7) sub.request(1) - subscriber.expectNoMsg(1.second) + subscriber.expectNoMsg(500.millis) sub.cancel() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala index 5dbedeacd3..0509e13d6f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala @@ -7,8 +7,7 @@ import scala.concurrent.Await import scala.concurrent.duration._ import scala.concurrent.forkjoin.ThreadLocalRandom -import akka.stream.ActorMaterializer -import akka.stream.ActorMaterializerSettings +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit._ @@ -22,6 +21,9 @@ class FlowExpandSpec extends AkkaSpec { "Expand" must { "pass-through elements unchanged when there is no rate difference" in { + // Shadow the fuzzed materializer (see the ordering guarantee needed by the for loop below). + implicit val materializer = ActorMaterializer(settings.withFuzzing(false)) + val publisher = TestPublisher.probe[Int]() val subscriber = TestSubscriber.probe[Int]() @@ -51,6 +53,9 @@ class FlowExpandSpec extends AkkaSpec { } publisher.sendNext(-42) + + // The request below is otherwise in race with the above sendNext + subscriber.expectNoMsg(500.millis) subscriber.requestNext(-42) subscriber.cancel() @@ -69,6 +74,9 @@ class FlowExpandSpec extends AkkaSpec { publisher.sendNext(2) publisher.sendComplete() + // The request below is otherwise in race with the above sendNext(2) (and completion) + subscriber.expectNoMsg(500.millis) + subscriber.requestNext(2) subscriber.expectComplete() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIdleInjectSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIdleInjectSpec.scala index 968e3e8271..f0ba3e27f0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIdleInjectSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIdleInjectSpec.scala @@ -137,10 +137,10 @@ class FlowIdleInjectSpec extends AkkaSpec { Source(upstream).keepAlive(1.second, () ⇒ 0).runWith(Sink(downstream)) downstream.request(2) - downstream.expectNoMsg(0.5.second) + downstream.expectNoMsg(500.millis) downstream.expectNext(0) - downstream.expectNoMsg(0.5 second) + downstream.expectNoMsg(500.millis) downstream.expectNext(0) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala index faf1ee5964..40a84c37b1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncSpec.scala @@ -247,7 +247,7 @@ class FlowMapAsyncSpec extends AkkaSpec with ScalaFutures { } try { - val N = 100000 + val N = 10000 Source(1 to N) .mapAsync(parallelism)(i ⇒ deferred()) .runFold(0)((c, _) ⇒ c + 1) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala index 09173520cb..ce82d80096 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapAsyncUnorderedSpec.scala @@ -7,7 +7,7 @@ import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.duration._ import scala.util.control.NoStackTrace -import akka.stream.ActorMaterializer +import akka.stream.{ ActorMaterializerSettings, ActorMaterializer } import akka.stream.testkit._ import akka.stream.testkit.scaladsl._ import akka.stream.testkit.Utils._ @@ -235,7 +235,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec with ScalaFutures with Conversi } try { - val N = 100000 + val N = 10000 Source(1 to N) .mapAsyncUnordered(parallelism)(i ⇒ deferred()) .runFold(0)((c, _) ⇒ c + 1) diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template index fa8b30d93f..2322b9d0cc 100644 --- a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWithApply.scala.template @@ -32,8 +32,13 @@ class ZipWith1[[#A1#], O] (zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { var pending = 1 + // Without this field the completion signalling would take one extra pull + var willShutDown = false - private def pushAll(): Unit = push(out, zipper([#grab(in0)#])) + private def pushAll(): Unit = { + push(out, zipper([#grab(in0)#])) + if (willShutDown) completeStage() + } [#setHandler(in0, new InHandler { override def onPush(): Unit = { @@ -43,6 +48,7 @@ class ZipWith1[[#A1#], O] (zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape override def onUpstreamFinish(): Unit = { if (!isAvailable(in0)) completeStage() + willShutDown = true } })# @@ -51,9 +57,11 @@ class ZipWith1[[#A1#], O] (zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape setHandler(out, new OutHandler { override def onPull(): Unit = { pending = shape.inlets.size - [#if (!isClosed(in0)) pull(in0) - else completeStage()# - ] + if (willShutDown) completeStage() + else { + [#pull(in0)# + ] + } } }) } diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index 0bf4ed2e5d..e0bd4800c2 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -40,6 +40,17 @@ akka { # Maximum number of elements emitted in batch if downstream signals large demand output-burst-limit = 1000 + + debug { + # Enables the fuzzing mode which increases the chance of race conditions + # by aggressively reordering events and making certain operations more + # concurrent than usual. + # This setting is for testing purposes, NEVER enable this in a production + # environment! + # To get the best results, try combining this setting with a throughput + # of 1 on the corresponding dispatchers. + fuzzing-mode = off + } } # Fully qualified config path which holds the dispatcher configuration diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index e072cfd40e..5067ac614b 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -198,10 +198,11 @@ object ActorMaterializerSettings { supervisionDecider: Supervision.Decider, subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, debugLogging: Boolean, - outputBurstLimit: Int) = + outputBurstLimit: Int, + fuzzingMode: Boolean) = new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, - outputBurstLimit) + outputBurstLimit, fuzzingMode) /** * Create [[ActorMaterializerSettings]]. @@ -226,7 +227,8 @@ object ActorMaterializerSettings { supervisionDecider = Supervision.stoppingDecider, subscriptionTimeoutSettings = StreamSubscriptionTimeoutSettings(config), debugLogging = config.getBoolean("debug-logging"), - outputBurstLimit = config.getInt("output-burst-limit")) + outputBurstLimit = config.getInt("output-burst-limit"), + fuzzingMode = config.getBoolean("debug.fuzzing-mode")) /** * Java API @@ -245,6 +247,7 @@ object ActorMaterializerSettings { */ def create(config: Config): ActorMaterializerSettings = apply(config) + } /** @@ -260,7 +263,8 @@ final class ActorMaterializerSettings( val supervisionDecider: Supervision.Decider, val subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings, val debugLogging: Boolean, - val outputBurstLimit: Int) { + val outputBurstLimit: Int, + val fuzzingMode: Boolean) { require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") @@ -274,24 +278,31 @@ final class ActorMaterializerSettings( supervisionDecider: Supervision.Decider = this.supervisionDecider, subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings = this.subscriptionTimeoutSettings, debugLogging: Boolean = this.debugLogging, - outputBurstLimit: Int = this.outputBurstLimit) = + outputBurstLimit: Int = this.outputBurstLimit, + fuzzingMode: Boolean = this.fuzzingMode) = new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, - outputBurstLimit) + outputBurstLimit, fuzzingMode) - def withInputBuffer(initialSize: Int, maxSize: Int): ActorMaterializerSettings = - copy(initialInputBufferSize = initialSize, maxInputBufferSize = maxSize) + def withInputBuffer(initialSize: Int, maxSize: Int): ActorMaterializerSettings = { + if (initialSize == this.initialInputBufferSize && maxSize == this.maxInputBufferSize) this + else copy(initialInputBufferSize = initialSize, maxInputBufferSize = maxSize) + } - def withDispatcher(dispatcher: String): ActorMaterializerSettings = - copy(dispatcher = dispatcher) + def withDispatcher(dispatcher: String): ActorMaterializerSettings = { + if (this.dispatcher == dispatcher) this + else copy(dispatcher = dispatcher) + } /** * Scala API: Decides how exceptions from application code are to be handled, unless * overridden for specific flows of the stream operations with * [[akka.stream.Attributes#supervisionStrategy]]. */ - def withSupervisionStrategy(decider: Supervision.Decider): ActorMaterializerSettings = - copy(supervisionDecider = decider) + def withSupervisionStrategy(decider: Supervision.Decider): ActorMaterializerSettings = { + if (decider eq this.supervisionDecider) this + else copy(supervisionDecider = decider) + } /** * Java API: Decides how exceptions from application code are to be handled, unless @@ -308,8 +319,15 @@ final class ActorMaterializerSettings( }) } - def withDebugLogging(enable: Boolean): ActorMaterializerSettings = - copy(debugLogging = enable) + def withFuzzing(enable: Boolean): ActorMaterializerSettings = { + if (enable == this.fuzzingMode) this + else copy(fuzzingMode = enable) + } + + def withDebugLogging(enable: Boolean): ActorMaterializerSettings = { + if (enable == this.debugLogging) this + else copy(debugLogging = enable) + } private def requirePowerOfTwo(n: Integer, name: String): Unit = { require(n > 0, s"$name must be > 0") diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index 84a977cfbd..9e631754f6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -6,6 +6,7 @@ package akka.stream.impl import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong } import akka.actor._ +import akka.event.Logging import akka.dispatch.Dispatchers import akka.pattern.ask import akka.stream._ @@ -29,6 +30,12 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem, flowNameCounter: AtomicLong, namePrefix: String) extends ActorMaterializer { import akka.stream.impl.Stages._ + private val logger = Logging.getLogger(system, this) + + if (settings.fuzzingMode) { + logger.warning("Fuzzing mode is enabled on this system. If you see this warning on your production system then " + + "set akka.materializer.debug.fuzzing-mode to off.") + } override def shutdown(): Unit = if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill diff --git a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala index aa0d4160f7..edb1904fcf 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala @@ -196,13 +196,18 @@ private[stream] object Timers { pull(in) } } + + override def onUpstreamFinish(): Unit = { + if (!isAvailable(in)) completeStage() + } }) setHandler(out, new OutHandler { override def onPull(): Unit = { if (isAvailable(in)) { push(out, grab(in)) - pull(in) + if (isClosed(in)) completeStage() + else pull(in) } else { if (nextDeadline.isOverdue()) { nextDeadline = Deadline.now + timeout diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala index abe0df2276..6bd988061c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorGraphInterpreter.scala @@ -3,7 +3,7 @@ */ package akka.stream.impl.fusing -import java.util.concurrent.TimeoutException +import java.util.concurrent.{ ThreadLocalRandom, TimeoutException } import akka.actor._ import akka.event.Logging @@ -305,7 +305,8 @@ private[stream] class ActorGraphInterpreter( inHandlers, outHandlers, logics, - (logic, event, handler) ⇒ self ! AsyncInput(logic, event, handler)) + (logic, event, handler) ⇒ self ! AsyncInput(logic, event, handler), + settings.fuzzingMode) private val inputs = Array.tabulate(shape.inlets.size)(new BatchingActorInputBoundary(settings.maxInputBufferSize, _)) private val outputs = Array.tabulate(shape.outlets.size)(new ActorOutputBoundary(self, _)) @@ -396,7 +397,14 @@ private[stream] class ActorGraphInterpreter( private def runBatch(): Unit = { try { - interpreter.execute(eventLimit) + val effectiveLimit = { + if (!settings.fuzzingMode) eventLimit + else { + if (ThreadLocalRandom.current().nextBoolean()) Thread.`yield`() + ThreadLocalRandom.current().nextInt(2) // 1 or zero events to be processed + } + } + interpreter.execute(effectiveLimit) if (interpreter.isCompleted) { // Cannot stop right away if not completely subscribed if (subscribesPending == 0) context.stop(self) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala index 6dae4b87c5..3f56c8fa8f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/GraphInterpreter.scala @@ -10,6 +10,7 @@ import akka.stream.stage._ import scala.annotation.tailrec import scala.collection.immutable import akka.stream._ +import scala.concurrent.forkjoin.ThreadLocalRandom import scala.util.control.NonFatal /** @@ -332,7 +333,8 @@ private[stream] final class GraphInterpreter( val inHandlers: Array[InHandler], // Lookup table for the InHandler of a connection val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection val logics: Array[GraphStageLogic], // Array of stage logics - val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit) { + val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit, + val fuzzingMode: Boolean) { import GraphInterpreter._ // Maintains additional information for events, basically elements in-flight, or failure. @@ -579,14 +581,17 @@ private[stream] final class GraphInterpreter( } private def dequeue(): Int = { - if (queueHead == queueTail) NoEvent - else { - val idx = queueHead & mask - val elem = eventQueue(idx) - eventQueue(idx) = NoEvent - queueHead += 1 - elem + val idx = queueHead & mask + if (fuzzingMode) { + val swapWith = (ThreadLocalRandom.current.nextInt(queueTail - queueHead) + queueHead) & mask + val ev = eventQueue(swapWith) + eventQueue(swapWith) = eventQueue(idx) + eventQueue(idx) = ev } + val elem = eventQueue(idx) + eventQueue(idx) = NoEvent + queueHead += 1 + elem } private def enqueue(connection: Int): Unit = { @@ -624,8 +629,9 @@ private[stream] final class GraphInterpreter( } private[stream] def push(connection: Int, elem: Any): Unit = { - if ((portStates(connection) & InClosed) == 0) { - portStates(connection) ^= PushStartFlip + val currentState = portStates(connection) + portStates(connection) = currentState ^ PushStartFlip + if ((currentState & InClosed) == 0) { connectionSlots(connection) = elem enqueue(connection) } @@ -633,8 +639,8 @@ private[stream] final class GraphInterpreter( private[stream] def pull(connection: Int): Unit = { val currentState = portStates(connection) + portStates(connection) = currentState ^ PullStartFlip if ((currentState & OutClosed) == 0) { - portStates(connection) = currentState ^ PullStartFlip enqueue(connection) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala index e1451ffeb2..a818e87275 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/IteratorInterpreter.scala @@ -144,7 +144,8 @@ private[akka] class IteratorInterpreter[I, O](val input: Iterator[I], val ops: S inHandlers, outHandlers, logics, - (_, _, _) ⇒ throw new UnsupportedOperationException("IteratorInterpreter does not support asynchronous events.")) + (_, _, _) ⇒ throw new UnsupportedOperationException("IteratorInterpreter does not support asynchronous events."), + fuzzingMode = false) interpreter.attachUpstreamBoundary(0, upstream) interpreter.attachDownstreamBoundary(ops.length, downstream) interpreter.init()