parent
6a22cdebfc
commit
c4417a8c39
19 changed files with 209 additions and 52 deletions
|
|
@ -3,5 +3,8 @@ akka {
|
||||||
actor {
|
actor {
|
||||||
serialize-creators = on
|
serialize-creators = on
|
||||||
serialize-messages = on
|
serialize-messages = on
|
||||||
|
default-dispatcher.throughput = 1 // Amplify the effects of fuzzing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stream.materializer.debug.fuzzing-mode = on
|
||||||
}
|
}
|
||||||
|
|
@ -384,6 +384,8 @@ class GraphInterpreterPortsSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
||||||
"ignore pull while completing" in new PortTestSetup {
|
"ignore pull while completing" in new PortTestSetup {
|
||||||
out.complete()
|
out.complete()
|
||||||
in.pull()
|
in.pull()
|
||||||
|
// While the pull event is not enqueued at this point, we should still report the state correctly
|
||||||
|
in.hasBeenPulled should be(true)
|
||||||
|
|
||||||
stepAll()
|
stepAll()
|
||||||
|
|
||||||
|
|
@ -648,6 +650,8 @@ class GraphInterpreterPortsSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
||||||
|
|
||||||
in.cancel()
|
in.cancel()
|
||||||
out.push(0)
|
out.push(0)
|
||||||
|
// While the push event is not enqueued at this point, we should still report the state correctly
|
||||||
|
out.isAvailable should be(false)
|
||||||
|
|
||||||
stepAll()
|
stepAll()
|
||||||
|
|
||||||
|
|
@ -1066,6 +1070,7 @@ class GraphInterpreterPortsSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
||||||
"ignore pull while failing" in new PortTestSetup {
|
"ignore pull while failing" in new PortTestSetup {
|
||||||
out.fail(TE("test"))
|
out.fail(TE("test"))
|
||||||
in.pull()
|
in.pull()
|
||||||
|
in.hasBeenPulled should be(true)
|
||||||
|
|
||||||
stepAll()
|
stepAll()
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,8 @@
|
||||||
*/
|
*/
|
||||||
package akka.stream.impl.fusing
|
package akka.stream.impl.fusing
|
||||||
|
|
||||||
import akka.stream.Attributes
|
import akka.stream.{ OverflowStrategy, Attributes }
|
||||||
|
import akka.stream.stage.AbstractStage.PushPullGraphStage
|
||||||
import akka.stream.testkit.AkkaSpec
|
import akka.stream.testkit.AkkaSpec
|
||||||
import akka.stream.scaladsl.{ Merge, Broadcast, Balance, Zip }
|
import akka.stream.scaladsl.{ Merge, Broadcast, Balance, Zip }
|
||||||
import GraphInterpreter._
|
import GraphInterpreter._
|
||||||
|
|
@ -341,6 +342,43 @@ class GraphInterpreterSpec extends AkkaSpec with GraphInterpreterSpecKit {
|
||||||
// The cycle is now empty
|
// The cycle is now empty
|
||||||
interpreter.isSuspended should be(false)
|
interpreter.isSuspended should be(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"implement buffer" in new TestSetup {
|
||||||
|
val source = new UpstreamProbe[String]("source")
|
||||||
|
val sink = new DownstreamProbe[String]("sink")
|
||||||
|
val buffer = new PushPullGraphStage[String, String, Unit](
|
||||||
|
(_) ⇒ new Buffer[String](2, OverflowStrategy.backpressure),
|
||||||
|
Attributes.none)
|
||||||
|
|
||||||
|
builder(buffer)
|
||||||
|
.connect(source, buffer.shape.inlet)
|
||||||
|
.connect(buffer.shape.outlet, sink)
|
||||||
|
.init()
|
||||||
|
|
||||||
|
stepAll()
|
||||||
|
lastEvents() should ===(Set(RequestOne(source)))
|
||||||
|
|
||||||
|
sink.requestOne()
|
||||||
|
lastEvents() should ===(Set.empty)
|
||||||
|
|
||||||
|
source.onNext("A")
|
||||||
|
lastEvents() should ===(Set(RequestOne(source), OnNext(sink, "A")))
|
||||||
|
|
||||||
|
source.onNext("B")
|
||||||
|
lastEvents() should ===(Set(RequestOne(source)))
|
||||||
|
|
||||||
|
source.onNext("C", eventLimit = 0)
|
||||||
|
sink.requestOne()
|
||||||
|
lastEvents() should ===(Set(OnNext(sink, "B"), RequestOne(source)))
|
||||||
|
|
||||||
|
sink.requestOne(eventLimit = 0)
|
||||||
|
source.onComplete(eventLimit = 3)
|
||||||
|
lastEvents() should ===(Set(OnNext(sink, "C")))
|
||||||
|
|
||||||
|
sink.requestOne()
|
||||||
|
lastEvents() should ===(Set(OnComplete(sink)))
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ trait GraphInterpreterSpecKit {
|
||||||
in.id = 0
|
in.id = 0
|
||||||
}
|
}
|
||||||
|
|
||||||
class AssemblyBuilder(stages: Seq[GraphStage[_ <: Shape]]) {
|
class AssemblyBuilder(stages: Seq[GraphStageWithMaterializedValue[_ <: Shape, _]]) {
|
||||||
var upstreams = Vector.empty[(UpstreamBoundaryStageLogic[_], Inlet[_])]
|
var upstreams = Vector.empty[(UpstreamBoundaryStageLogic[_], Inlet[_])]
|
||||||
var downstreams = Vector.empty[(Outlet[_], DownstreamBoundaryStageLogic[_])]
|
var downstreams = Vector.empty[(Outlet[_], DownstreamBoundaryStageLogic[_])]
|
||||||
var connections = Vector.empty[(Outlet[_], Inlet[_])]
|
var connections = Vector.empty[(Outlet[_], Inlet[_])]
|
||||||
|
|
@ -71,7 +71,8 @@ trait GraphInterpreterSpecKit {
|
||||||
val assembly = buildAssembly()
|
val assembly = buildAssembly()
|
||||||
|
|
||||||
val (inHandlers, outHandlers, logics, _) = assembly.materialize(Attributes.none)
|
val (inHandlers, outHandlers, logics, _) = assembly.materialize(Attributes.none)
|
||||||
_interpreter = new GraphInterpreter(assembly, NoMaterializer, NoLogging, inHandlers, outHandlers, logics, (_, _, _) ⇒ ())
|
_interpreter = new GraphInterpreter(assembly, NoMaterializer, NoLogging, inHandlers, outHandlers, logics,
|
||||||
|
(_, _, _) ⇒ (), fuzzingMode = false)
|
||||||
|
|
||||||
for ((upstream, i) ← upstreams.zipWithIndex) {
|
for ((upstream, i) ← upstreams.zipWithIndex) {
|
||||||
_interpreter.attachUpstreamBoundary(i, upstream._1)
|
_interpreter.attachUpstreamBoundary(i, upstream._1)
|
||||||
|
|
@ -87,10 +88,11 @@ trait GraphInterpreterSpecKit {
|
||||||
|
|
||||||
def manualInit(assembly: GraphAssembly): Unit = {
|
def manualInit(assembly: GraphAssembly): Unit = {
|
||||||
val (inHandlers, outHandlers, logics, _) = assembly.materialize(Attributes.none)
|
val (inHandlers, outHandlers, logics, _) = assembly.materialize(Attributes.none)
|
||||||
_interpreter = new GraphInterpreter(assembly, NoMaterializer, NoLogging, inHandlers, outHandlers, logics, (_, _, _) ⇒ ())
|
_interpreter = new GraphInterpreter(assembly, NoMaterializer, NoLogging, inHandlers, outHandlers, logics,
|
||||||
|
(_, _, _) ⇒ (), fuzzingMode = false)
|
||||||
}
|
}
|
||||||
|
|
||||||
def builder(stages: GraphStage[_ <: Shape]*): AssemblyBuilder = new AssemblyBuilder(stages)
|
def builder(stages: GraphStageWithMaterializedValue[_ <: Shape, _]*): AssemblyBuilder = new AssemblyBuilder(stages)
|
||||||
}
|
}
|
||||||
|
|
||||||
abstract class TestSetup extends Builder {
|
abstract class TestSetup extends Builder {
|
||||||
|
|
@ -132,6 +134,18 @@ trait GraphInterpreterSpecKit {
|
||||||
push(out, elem)
|
push(out, elem)
|
||||||
interpreter.execute(eventLimit)
|
interpreter.execute(eventLimit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def onComplete(eventLimit: Int = Int.MaxValue): Unit = {
|
||||||
|
if (GraphInterpreter.Debug) println(s"----- COMPLETE $this")
|
||||||
|
complete(out)
|
||||||
|
interpreter.execute(eventLimit)
|
||||||
|
}
|
||||||
|
|
||||||
|
def onFailure(eventLimit: Int = Int.MaxValue, ex: Throwable): Unit = {
|
||||||
|
if (GraphInterpreter.Debug) println(s"----- FAIL $this")
|
||||||
|
fail(out, ex)
|
||||||
|
interpreter.execute(eventLimit)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class DownstreamProbe[T](override val toString: String) extends DownstreamBoundaryStageLogic[T] {
|
class DownstreamProbe[T](override val toString: String) extends DownstreamBoundaryStageLogic[T] {
|
||||||
|
|
@ -149,6 +163,12 @@ trait GraphInterpreterSpecKit {
|
||||||
pull(in)
|
pull(in)
|
||||||
interpreter.execute(eventLimit)
|
interpreter.execute(eventLimit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def cancel(eventLimit: Int = Int.MaxValue): Unit = {
|
||||||
|
if (GraphInterpreter.Debug) println(s"----- CANCEL $this")
|
||||||
|
cancel(in)
|
||||||
|
interpreter.execute(eventLimit)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,7 +19,10 @@ import scala.concurrent.duration._
|
||||||
import java.net.BindException
|
import java.net.BindException
|
||||||
import akka.testkit.EventFilter
|
import akka.testkit.EventFilter
|
||||||
|
|
||||||
class TcpSpec extends AkkaSpec("akka.io.tcp.windows-connection-abort-workaround-enabled=auto\nakka.stream.materializer.subscription-timeout.timeout = 3s") with TcpHelper {
|
class TcpSpec extends AkkaSpec(
|
||||||
|
"""
|
||||||
|
|akka.io.tcp.windows-connection-abort-workaround-enabled=auto
|
||||||
|
|akka.stream.materializer.subscription-timeout.timeout = 3s""".stripMargin) with TcpHelper {
|
||||||
var demand = 0L
|
var demand = 0L
|
||||||
|
|
||||||
"Outgoing TCP stream" must {
|
"Outgoing TCP stream" must {
|
||||||
|
|
|
||||||
|
|
@ -6,9 +6,7 @@ package akka.stream.scaladsl
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import akka.stream.ActorMaterializer
|
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings, OverflowStrategy }
|
||||||
import akka.stream.ActorMaterializerSettings
|
|
||||||
import akka.stream.OverflowStrategy
|
|
||||||
import akka.stream.OverflowStrategy.Fail.BufferOverflowException
|
import akka.stream.OverflowStrategy.Fail.BufferOverflowException
|
||||||
import akka.stream.testkit._
|
import akka.stream.testkit._
|
||||||
import akka.stream.testkit.scaladsl._
|
import akka.stream.testkit.scaladsl._
|
||||||
|
|
@ -77,6 +75,9 @@ class FlowBufferSpec extends AkkaSpec {
|
||||||
// Fill up buffer
|
// Fill up buffer
|
||||||
for (i ← 1 to 200) publisher.sendNext(i)
|
for (i ← 1 to 200) publisher.sendNext(i)
|
||||||
|
|
||||||
|
// The next request would be otherwise in race with the last onNext in the above loop
|
||||||
|
subscriber.expectNoMsg(500.millis)
|
||||||
|
|
||||||
// drain
|
// drain
|
||||||
for (i ← 101 to 200) {
|
for (i ← 101 to 200) {
|
||||||
sub.request(1)
|
sub.request(1)
|
||||||
|
|
@ -103,6 +104,9 @@ class FlowBufferSpec extends AkkaSpec {
|
||||||
// Fill up buffer
|
// Fill up buffer
|
||||||
for (i ← 1 to 200) publisher.sendNext(i)
|
for (i ← 1 to 200) publisher.sendNext(i)
|
||||||
|
|
||||||
|
// The next request would be otherwise in race with the last onNext in the above loop
|
||||||
|
subscriber.expectNoMsg(500.millis)
|
||||||
|
|
||||||
// drain
|
// drain
|
||||||
for (i ← 1 to 99) {
|
for (i ← 1 to 99) {
|
||||||
sub.request(1)
|
sub.request(1)
|
||||||
|
|
@ -132,6 +136,9 @@ class FlowBufferSpec extends AkkaSpec {
|
||||||
// Fill up buffer
|
// Fill up buffer
|
||||||
for (i ← 1 to 150) publisher.sendNext(i)
|
for (i ← 1 to 150) publisher.sendNext(i)
|
||||||
|
|
||||||
|
// The next request would be otherwise in race with the last onNext in the above loop
|
||||||
|
subscriber.expectNoMsg(500.millis)
|
||||||
|
|
||||||
// drain
|
// drain
|
||||||
for (i ← 101 to 150) {
|
for (i ← 101 to 150) {
|
||||||
sub.request(1)
|
sub.request(1)
|
||||||
|
|
@ -151,9 +158,14 @@ class FlowBufferSpec extends AkkaSpec {
|
||||||
"drop new elements if buffer is full and configured so" in {
|
"drop new elements if buffer is full and configured so" in {
|
||||||
val (publisher, subscriber) = TestSource.probe[Int].buffer(100, overflowStrategy = OverflowStrategy.dropNew).toMat(TestSink.probe[Int])(Keep.both).run()
|
val (publisher, subscriber) = TestSource.probe[Int].buffer(100, overflowStrategy = OverflowStrategy.dropNew).toMat(TestSink.probe[Int])(Keep.both).run()
|
||||||
|
|
||||||
|
subscriber.ensureSubscription()
|
||||||
|
|
||||||
// Fill up buffer
|
// Fill up buffer
|
||||||
for (i ← 1 to 150) publisher.sendNext(i)
|
for (i ← 1 to 150) publisher.sendNext(i)
|
||||||
|
|
||||||
|
// The next request would be otherwise in race with the last onNext in the above loop
|
||||||
|
subscriber.expectNoMsg(500.millis)
|
||||||
|
|
||||||
// drain
|
// drain
|
||||||
for (i ← 1 to 100) {
|
for (i ← 1 to 100) {
|
||||||
subscriber.requestNext(i)
|
subscriber.requestNext(i)
|
||||||
|
|
@ -205,6 +217,8 @@ class FlowBufferSpec extends AkkaSpec {
|
||||||
// Fill up buffer
|
// Fill up buffer
|
||||||
for (i ← 1 to 200) publisher.sendNext(i)
|
for (i ← 1 to 200) publisher.sendNext(i)
|
||||||
|
|
||||||
|
// The request below is in race otherwise with the onNext(200) above
|
||||||
|
subscriber.expectNoMsg(500.millis)
|
||||||
sub.request(1)
|
sub.request(1)
|
||||||
subscriber.expectNext(200)
|
subscriber.expectNext(200)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -70,17 +70,19 @@ class FlowConflateSpec extends AkkaSpec {
|
||||||
subscriber.expectNext(1)
|
subscriber.expectNext(1)
|
||||||
|
|
||||||
sub.request(1)
|
sub.request(1)
|
||||||
subscriber.expectNoMsg(1.second)
|
subscriber.expectNoMsg(500.millis)
|
||||||
publisher.sendNext(2)
|
publisher.sendNext(2)
|
||||||
subscriber.expectNext(2)
|
subscriber.expectNext(2)
|
||||||
|
|
||||||
publisher.sendNext(3)
|
publisher.sendNext(3)
|
||||||
publisher.sendNext(4)
|
publisher.sendNext(4)
|
||||||
|
// The request can be in race with the above onNext(4) so the result would be either 3 or 7.
|
||||||
|
subscriber.expectNoMsg(500.millis)
|
||||||
sub.request(1)
|
sub.request(1)
|
||||||
subscriber.expectNext(7)
|
subscriber.expectNext(7)
|
||||||
|
|
||||||
sub.request(1)
|
sub.request(1)
|
||||||
subscriber.expectNoMsg(1.second)
|
subscriber.expectNoMsg(500.millis)
|
||||||
sub.cancel()
|
sub.cancel()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,8 +7,7 @@ import scala.concurrent.Await
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||||
|
|
||||||
import akka.stream.ActorMaterializer
|
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
|
||||||
import akka.stream.ActorMaterializerSettings
|
|
||||||
|
|
||||||
import akka.stream.testkit._
|
import akka.stream.testkit._
|
||||||
|
|
||||||
|
|
@ -22,6 +21,9 @@ class FlowExpandSpec extends AkkaSpec {
|
||||||
"Expand" must {
|
"Expand" must {
|
||||||
|
|
||||||
"pass-through elements unchanged when there is no rate difference" in {
|
"pass-through elements unchanged when there is no rate difference" in {
|
||||||
|
// Shadow the fuzzed materializer (see the ordering guarantee needed by the for loop below).
|
||||||
|
implicit val materializer = ActorMaterializer(settings.withFuzzing(false))
|
||||||
|
|
||||||
val publisher = TestPublisher.probe[Int]()
|
val publisher = TestPublisher.probe[Int]()
|
||||||
val subscriber = TestSubscriber.probe[Int]()
|
val subscriber = TestSubscriber.probe[Int]()
|
||||||
|
|
||||||
|
|
@ -51,6 +53,9 @@ class FlowExpandSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
publisher.sendNext(-42)
|
publisher.sendNext(-42)
|
||||||
|
|
||||||
|
// The request below is otherwise in race with the above sendNext
|
||||||
|
subscriber.expectNoMsg(500.millis)
|
||||||
subscriber.requestNext(-42)
|
subscriber.requestNext(-42)
|
||||||
|
|
||||||
subscriber.cancel()
|
subscriber.cancel()
|
||||||
|
|
@ -69,6 +74,9 @@ class FlowExpandSpec extends AkkaSpec {
|
||||||
publisher.sendNext(2)
|
publisher.sendNext(2)
|
||||||
publisher.sendComplete()
|
publisher.sendComplete()
|
||||||
|
|
||||||
|
// The request below is otherwise in race with the above sendNext(2) (and completion)
|
||||||
|
subscriber.expectNoMsg(500.millis)
|
||||||
|
|
||||||
subscriber.requestNext(2)
|
subscriber.requestNext(2)
|
||||||
subscriber.expectComplete()
|
subscriber.expectComplete()
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -137,10 +137,10 @@ class FlowIdleInjectSpec extends AkkaSpec {
|
||||||
Source(upstream).keepAlive(1.second, () ⇒ 0).runWith(Sink(downstream))
|
Source(upstream).keepAlive(1.second, () ⇒ 0).runWith(Sink(downstream))
|
||||||
|
|
||||||
downstream.request(2)
|
downstream.request(2)
|
||||||
downstream.expectNoMsg(0.5.second)
|
downstream.expectNoMsg(500.millis)
|
||||||
downstream.expectNext(0)
|
downstream.expectNext(0)
|
||||||
|
|
||||||
downstream.expectNoMsg(0.5 second)
|
downstream.expectNoMsg(500.millis)
|
||||||
downstream.expectNext(0)
|
downstream.expectNext(0)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -247,7 +247,7 @@ class FlowMapAsyncSpec extends AkkaSpec with ScalaFutures {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
val N = 100000
|
val N = 10000
|
||||||
Source(1 to N)
|
Source(1 to N)
|
||||||
.mapAsync(parallelism)(i ⇒ deferred())
|
.mapAsync(parallelism)(i ⇒ deferred())
|
||||||
.runFold(0)((c, _) ⇒ c + 1)
|
.runFold(0)((c, _) ⇒ c + 1)
|
||||||
|
|
|
||||||
|
|
@ -7,7 +7,7 @@ import scala.concurrent.Await
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.util.control.NoStackTrace
|
import scala.util.control.NoStackTrace
|
||||||
import akka.stream.ActorMaterializer
|
import akka.stream.{ ActorMaterializerSettings, ActorMaterializer }
|
||||||
import akka.stream.testkit._
|
import akka.stream.testkit._
|
||||||
import akka.stream.testkit.scaladsl._
|
import akka.stream.testkit.scaladsl._
|
||||||
import akka.stream.testkit.Utils._
|
import akka.stream.testkit.Utils._
|
||||||
|
|
@ -235,7 +235,7 @@ class FlowMapAsyncUnorderedSpec extends AkkaSpec with ScalaFutures with Conversi
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
val N = 100000
|
val N = 10000
|
||||||
Source(1 to N)
|
Source(1 to N)
|
||||||
.mapAsyncUnordered(parallelism)(i ⇒ deferred())
|
.mapAsyncUnordered(parallelism)(i ⇒ deferred())
|
||||||
.runFold(0)((c, _) ⇒ c + 1)
|
.runFold(0)((c, _) ⇒ c + 1)
|
||||||
|
|
|
||||||
|
|
@ -32,8 +32,13 @@ class ZipWith1[[#A1#], O] (zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape
|
||||||
|
|
||||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) {
|
||||||
var pending = 1
|
var pending = 1
|
||||||
|
// Without this field the completion signalling would take one extra pull
|
||||||
|
var willShutDown = false
|
||||||
|
|
||||||
private def pushAll(): Unit = push(out, zipper([#grab(in0)#]))
|
private def pushAll(): Unit = {
|
||||||
|
push(out, zipper([#grab(in0)#]))
|
||||||
|
if (willShutDown) completeStage()
|
||||||
|
}
|
||||||
|
|
||||||
[#setHandler(in0, new InHandler {
|
[#setHandler(in0, new InHandler {
|
||||||
override def onPush(): Unit = {
|
override def onPush(): Unit = {
|
||||||
|
|
@ -43,6 +48,7 @@ class ZipWith1[[#A1#], O] (zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape
|
||||||
|
|
||||||
override def onUpstreamFinish(): Unit = {
|
override def onUpstreamFinish(): Unit = {
|
||||||
if (!isAvailable(in0)) completeStage()
|
if (!isAvailable(in0)) completeStage()
|
||||||
|
willShutDown = true
|
||||||
}
|
}
|
||||||
|
|
||||||
})#
|
})#
|
||||||
|
|
@ -51,9 +57,11 @@ class ZipWith1[[#A1#], O] (zipper: ([#A1#]) ⇒ O) extends GraphStage[FanInShape
|
||||||
setHandler(out, new OutHandler {
|
setHandler(out, new OutHandler {
|
||||||
override def onPull(): Unit = {
|
override def onPull(): Unit = {
|
||||||
pending = shape.inlets.size
|
pending = shape.inlets.size
|
||||||
[#if (!isClosed(in0)) pull(in0)
|
if (willShutDown) completeStage()
|
||||||
else completeStage()#
|
else {
|
||||||
]
|
[#pull(in0)#
|
||||||
|
]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -40,6 +40,17 @@ akka {
|
||||||
|
|
||||||
# Maximum number of elements emitted in batch if downstream signals large demand
|
# Maximum number of elements emitted in batch if downstream signals large demand
|
||||||
output-burst-limit = 1000
|
output-burst-limit = 1000
|
||||||
|
|
||||||
|
debug {
|
||||||
|
# Enables the fuzzing mode which increases the chance of race conditions
|
||||||
|
# by aggressively reordering events and making certain operations more
|
||||||
|
# concurrent than usual.
|
||||||
|
# This setting is for testing purposes, NEVER enable this in a production
|
||||||
|
# environment!
|
||||||
|
# To get the best results, try combining this setting with a throughput
|
||||||
|
# of 1 on the corresponding dispatchers.
|
||||||
|
fuzzing-mode = off
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# Fully qualified config path which holds the dispatcher configuration
|
# Fully qualified config path which holds the dispatcher configuration
|
||||||
|
|
|
||||||
|
|
@ -198,10 +198,11 @@ object ActorMaterializerSettings {
|
||||||
supervisionDecider: Supervision.Decider,
|
supervisionDecider: Supervision.Decider,
|
||||||
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
|
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
|
||||||
debugLogging: Boolean,
|
debugLogging: Boolean,
|
||||||
outputBurstLimit: Int) =
|
outputBurstLimit: Int,
|
||||||
|
fuzzingMode: Boolean) =
|
||||||
new ActorMaterializerSettings(
|
new ActorMaterializerSettings(
|
||||||
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
|
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
|
||||||
outputBurstLimit)
|
outputBurstLimit, fuzzingMode)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create [[ActorMaterializerSettings]].
|
* Create [[ActorMaterializerSettings]].
|
||||||
|
|
@ -226,7 +227,8 @@ object ActorMaterializerSettings {
|
||||||
supervisionDecider = Supervision.stoppingDecider,
|
supervisionDecider = Supervision.stoppingDecider,
|
||||||
subscriptionTimeoutSettings = StreamSubscriptionTimeoutSettings(config),
|
subscriptionTimeoutSettings = StreamSubscriptionTimeoutSettings(config),
|
||||||
debugLogging = config.getBoolean("debug-logging"),
|
debugLogging = config.getBoolean("debug-logging"),
|
||||||
outputBurstLimit = config.getInt("output-burst-limit"))
|
outputBurstLimit = config.getInt("output-burst-limit"),
|
||||||
|
fuzzingMode = config.getBoolean("debug.fuzzing-mode"))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API
|
* Java API
|
||||||
|
|
@ -245,6 +247,7 @@ object ActorMaterializerSettings {
|
||||||
*/
|
*/
|
||||||
def create(config: Config): ActorMaterializerSettings =
|
def create(config: Config): ActorMaterializerSettings =
|
||||||
apply(config)
|
apply(config)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -260,7 +263,8 @@ final class ActorMaterializerSettings(
|
||||||
val supervisionDecider: Supervision.Decider,
|
val supervisionDecider: Supervision.Decider,
|
||||||
val subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
|
val subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
|
||||||
val debugLogging: Boolean,
|
val debugLogging: Boolean,
|
||||||
val outputBurstLimit: Int) {
|
val outputBurstLimit: Int,
|
||||||
|
val fuzzingMode: Boolean) {
|
||||||
|
|
||||||
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
|
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
|
||||||
|
|
||||||
|
|
@ -274,24 +278,31 @@ final class ActorMaterializerSettings(
|
||||||
supervisionDecider: Supervision.Decider = this.supervisionDecider,
|
supervisionDecider: Supervision.Decider = this.supervisionDecider,
|
||||||
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings = this.subscriptionTimeoutSettings,
|
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings = this.subscriptionTimeoutSettings,
|
||||||
debugLogging: Boolean = this.debugLogging,
|
debugLogging: Boolean = this.debugLogging,
|
||||||
outputBurstLimit: Int = this.outputBurstLimit) =
|
outputBurstLimit: Int = this.outputBurstLimit,
|
||||||
|
fuzzingMode: Boolean = this.fuzzingMode) =
|
||||||
new ActorMaterializerSettings(
|
new ActorMaterializerSettings(
|
||||||
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
|
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
|
||||||
outputBurstLimit)
|
outputBurstLimit, fuzzingMode)
|
||||||
|
|
||||||
def withInputBuffer(initialSize: Int, maxSize: Int): ActorMaterializerSettings =
|
def withInputBuffer(initialSize: Int, maxSize: Int): ActorMaterializerSettings = {
|
||||||
copy(initialInputBufferSize = initialSize, maxInputBufferSize = maxSize)
|
if (initialSize == this.initialInputBufferSize && maxSize == this.maxInputBufferSize) this
|
||||||
|
else copy(initialInputBufferSize = initialSize, maxInputBufferSize = maxSize)
|
||||||
|
}
|
||||||
|
|
||||||
def withDispatcher(dispatcher: String): ActorMaterializerSettings =
|
def withDispatcher(dispatcher: String): ActorMaterializerSettings = {
|
||||||
copy(dispatcher = dispatcher)
|
if (this.dispatcher == dispatcher) this
|
||||||
|
else copy(dispatcher = dispatcher)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Scala API: Decides how exceptions from application code are to be handled, unless
|
* Scala API: Decides how exceptions from application code are to be handled, unless
|
||||||
* overridden for specific flows of the stream operations with
|
* overridden for specific flows of the stream operations with
|
||||||
* [[akka.stream.Attributes#supervisionStrategy]].
|
* [[akka.stream.Attributes#supervisionStrategy]].
|
||||||
*/
|
*/
|
||||||
def withSupervisionStrategy(decider: Supervision.Decider): ActorMaterializerSettings =
|
def withSupervisionStrategy(decider: Supervision.Decider): ActorMaterializerSettings = {
|
||||||
copy(supervisionDecider = decider)
|
if (decider eq this.supervisionDecider) this
|
||||||
|
else copy(supervisionDecider = decider)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Java API: Decides how exceptions from application code are to be handled, unless
|
* Java API: Decides how exceptions from application code are to be handled, unless
|
||||||
|
|
@ -308,8 +319,15 @@ final class ActorMaterializerSettings(
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
def withDebugLogging(enable: Boolean): ActorMaterializerSettings =
|
def withFuzzing(enable: Boolean): ActorMaterializerSettings = {
|
||||||
copy(debugLogging = enable)
|
if (enable == this.fuzzingMode) this
|
||||||
|
else copy(fuzzingMode = enable)
|
||||||
|
}
|
||||||
|
|
||||||
|
def withDebugLogging(enable: Boolean): ActorMaterializerSettings = {
|
||||||
|
if (enable == this.debugLogging) this
|
||||||
|
else copy(debugLogging = enable)
|
||||||
|
}
|
||||||
|
|
||||||
private def requirePowerOfTwo(n: Integer, name: String): Unit = {
|
private def requirePowerOfTwo(n: Integer, name: String): Unit = {
|
||||||
require(n > 0, s"$name must be > 0")
|
require(n > 0, s"$name must be > 0")
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ package akka.stream.impl
|
||||||
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
|
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
|
||||||
|
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
|
import akka.event.Logging
|
||||||
import akka.dispatch.Dispatchers
|
import akka.dispatch.Dispatchers
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
import akka.stream._
|
import akka.stream._
|
||||||
|
|
@ -29,6 +30,12 @@ private[akka] case class ActorMaterializerImpl(system: ActorSystem,
|
||||||
flowNameCounter: AtomicLong,
|
flowNameCounter: AtomicLong,
|
||||||
namePrefix: String) extends ActorMaterializer {
|
namePrefix: String) extends ActorMaterializer {
|
||||||
import akka.stream.impl.Stages._
|
import akka.stream.impl.Stages._
|
||||||
|
private val logger = Logging.getLogger(system, this)
|
||||||
|
|
||||||
|
if (settings.fuzzingMode) {
|
||||||
|
logger.warning("Fuzzing mode is enabled on this system. If you see this warning on your production system then " +
|
||||||
|
"set akka.materializer.debug.fuzzing-mode to off.")
|
||||||
|
}
|
||||||
|
|
||||||
override def shutdown(): Unit =
|
override def shutdown(): Unit =
|
||||||
if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
|
if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
|
||||||
|
|
|
||||||
|
|
@ -196,13 +196,18 @@ private[stream] object Timers {
|
||||||
pull(in)
|
pull(in)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def onUpstreamFinish(): Unit = {
|
||||||
|
if (!isAvailable(in)) completeStage()
|
||||||
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
setHandler(out, new OutHandler {
|
setHandler(out, new OutHandler {
|
||||||
override def onPull(): Unit = {
|
override def onPull(): Unit = {
|
||||||
if (isAvailable(in)) {
|
if (isAvailable(in)) {
|
||||||
push(out, grab(in))
|
push(out, grab(in))
|
||||||
pull(in)
|
if (isClosed(in)) completeStage()
|
||||||
|
else pull(in)
|
||||||
} else {
|
} else {
|
||||||
if (nextDeadline.isOverdue()) {
|
if (nextDeadline.isOverdue()) {
|
||||||
nextDeadline = Deadline.now + timeout
|
nextDeadline = Deadline.now + timeout
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,7 @@
|
||||||
*/
|
*/
|
||||||
package akka.stream.impl.fusing
|
package akka.stream.impl.fusing
|
||||||
|
|
||||||
import java.util.concurrent.TimeoutException
|
import java.util.concurrent.{ ThreadLocalRandom, TimeoutException }
|
||||||
|
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
|
|
@ -305,7 +305,8 @@ private[stream] class ActorGraphInterpreter(
|
||||||
inHandlers,
|
inHandlers,
|
||||||
outHandlers,
|
outHandlers,
|
||||||
logics,
|
logics,
|
||||||
(logic, event, handler) ⇒ self ! AsyncInput(logic, event, handler))
|
(logic, event, handler) ⇒ self ! AsyncInput(logic, event, handler),
|
||||||
|
settings.fuzzingMode)
|
||||||
|
|
||||||
private val inputs = Array.tabulate(shape.inlets.size)(new BatchingActorInputBoundary(settings.maxInputBufferSize, _))
|
private val inputs = Array.tabulate(shape.inlets.size)(new BatchingActorInputBoundary(settings.maxInputBufferSize, _))
|
||||||
private val outputs = Array.tabulate(shape.outlets.size)(new ActorOutputBoundary(self, _))
|
private val outputs = Array.tabulate(shape.outlets.size)(new ActorOutputBoundary(self, _))
|
||||||
|
|
@ -396,7 +397,14 @@ private[stream] class ActorGraphInterpreter(
|
||||||
|
|
||||||
private def runBatch(): Unit = {
|
private def runBatch(): Unit = {
|
||||||
try {
|
try {
|
||||||
interpreter.execute(eventLimit)
|
val effectiveLimit = {
|
||||||
|
if (!settings.fuzzingMode) eventLimit
|
||||||
|
else {
|
||||||
|
if (ThreadLocalRandom.current().nextBoolean()) Thread.`yield`()
|
||||||
|
ThreadLocalRandom.current().nextInt(2) // 1 or zero events to be processed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
interpreter.execute(effectiveLimit)
|
||||||
if (interpreter.isCompleted) {
|
if (interpreter.isCompleted) {
|
||||||
// Cannot stop right away if not completely subscribed
|
// Cannot stop right away if not completely subscribed
|
||||||
if (subscribesPending == 0) context.stop(self)
|
if (subscribesPending == 0) context.stop(self)
|
||||||
|
|
|
||||||
|
|
@ -10,6 +10,7 @@ import akka.stream.stage._
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
import akka.stream._
|
import akka.stream._
|
||||||
|
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -332,7 +333,8 @@ private[stream] final class GraphInterpreter(
|
||||||
val inHandlers: Array[InHandler], // Lookup table for the InHandler of a connection
|
val inHandlers: Array[InHandler], // Lookup table for the InHandler of a connection
|
||||||
val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection
|
val outHandlers: Array[OutHandler], // Lookup table for the outHandler of the connection
|
||||||
val logics: Array[GraphStageLogic], // Array of stage logics
|
val logics: Array[GraphStageLogic], // Array of stage logics
|
||||||
val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit) {
|
val onAsyncInput: (GraphStageLogic, Any, (Any) ⇒ Unit) ⇒ Unit,
|
||||||
|
val fuzzingMode: Boolean) {
|
||||||
import GraphInterpreter._
|
import GraphInterpreter._
|
||||||
|
|
||||||
// Maintains additional information for events, basically elements in-flight, or failure.
|
// Maintains additional information for events, basically elements in-flight, or failure.
|
||||||
|
|
@ -579,14 +581,17 @@ private[stream] final class GraphInterpreter(
|
||||||
}
|
}
|
||||||
|
|
||||||
private def dequeue(): Int = {
|
private def dequeue(): Int = {
|
||||||
if (queueHead == queueTail) NoEvent
|
val idx = queueHead & mask
|
||||||
else {
|
if (fuzzingMode) {
|
||||||
val idx = queueHead & mask
|
val swapWith = (ThreadLocalRandom.current.nextInt(queueTail - queueHead) + queueHead) & mask
|
||||||
val elem = eventQueue(idx)
|
val ev = eventQueue(swapWith)
|
||||||
eventQueue(idx) = NoEvent
|
eventQueue(swapWith) = eventQueue(idx)
|
||||||
queueHead += 1
|
eventQueue(idx) = ev
|
||||||
elem
|
|
||||||
}
|
}
|
||||||
|
val elem = eventQueue(idx)
|
||||||
|
eventQueue(idx) = NoEvent
|
||||||
|
queueHead += 1
|
||||||
|
elem
|
||||||
}
|
}
|
||||||
|
|
||||||
private def enqueue(connection: Int): Unit = {
|
private def enqueue(connection: Int): Unit = {
|
||||||
|
|
@ -624,8 +629,9 @@ private[stream] final class GraphInterpreter(
|
||||||
}
|
}
|
||||||
|
|
||||||
private[stream] def push(connection: Int, elem: Any): Unit = {
|
private[stream] def push(connection: Int, elem: Any): Unit = {
|
||||||
if ((portStates(connection) & InClosed) == 0) {
|
val currentState = portStates(connection)
|
||||||
portStates(connection) ^= PushStartFlip
|
portStates(connection) = currentState ^ PushStartFlip
|
||||||
|
if ((currentState & InClosed) == 0) {
|
||||||
connectionSlots(connection) = elem
|
connectionSlots(connection) = elem
|
||||||
enqueue(connection)
|
enqueue(connection)
|
||||||
}
|
}
|
||||||
|
|
@ -633,8 +639,8 @@ private[stream] final class GraphInterpreter(
|
||||||
|
|
||||||
private[stream] def pull(connection: Int): Unit = {
|
private[stream] def pull(connection: Int): Unit = {
|
||||||
val currentState = portStates(connection)
|
val currentState = portStates(connection)
|
||||||
|
portStates(connection) = currentState ^ PullStartFlip
|
||||||
if ((currentState & OutClosed) == 0) {
|
if ((currentState & OutClosed) == 0) {
|
||||||
portStates(connection) = currentState ^ PullStartFlip
|
|
||||||
enqueue(connection)
|
enqueue(connection)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -144,7 +144,8 @@ private[akka] class IteratorInterpreter[I, O](val input: Iterator[I], val ops: S
|
||||||
inHandlers,
|
inHandlers,
|
||||||
outHandlers,
|
outHandlers,
|
||||||
logics,
|
logics,
|
||||||
(_, _, _) ⇒ throw new UnsupportedOperationException("IteratorInterpreter does not support asynchronous events."))
|
(_, _, _) ⇒ throw new UnsupportedOperationException("IteratorInterpreter does not support asynchronous events."),
|
||||||
|
fuzzingMode = false)
|
||||||
interpreter.attachUpstreamBoundary(0, upstream)
|
interpreter.attachUpstreamBoundary(0, upstream)
|
||||||
interpreter.attachDownstreamBoundary(ops.length, downstream)
|
interpreter.attachDownstreamBoundary(ops.length, downstream)
|
||||||
interpreter.init()
|
interpreter.init()
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue