diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index dbeaf3a7a6..cf49c7f157 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -4,9 +4,9 @@ package akka.actor.dungeon -import akka.dispatch.sysmsg.{Unwatch, Watch, DeathWatchNotification} -import akka.event.Logging.{Warning, Debug} -import akka.actor.{InternalActorRef, Address, Terminated, Actor, ActorRefScope, ActorCell, ActorRef, MinimalActorRef} +import akka.dispatch.sysmsg.{ Unwatch, Watch, DeathWatchNotification } +import akka.event.Logging.{ Warning, Debug } +import akka.actor.{ InternalActorRef, Address, Terminated, Actor, ActorRefScope, ActorCell, ActorRef, MinimalActorRef } import akka.event.AddressTerminatedTopic private[akka] trait DeathWatch { this: ActorCell ⇒ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala index 152bd2d8c4..a8be0c1ac1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala @@ -7,6 +7,7 @@ import akka.stream.ActorAttributes.supervisionStrategy import akka.stream.Attributes.inputBuffer import akka.stream.Supervision.{ resumingDecider, restartingDecider } import akka.stream.testkit.Utils.TE +import akka.testkit.TestLatch import scala.concurrent.Await import scala.concurrent.duration._ @@ -143,10 +144,17 @@ class FlowConflateSpec extends AkkaSpec { "restart when `seed` throws and a restartingDecider is used" in { val sourceProbe = TestPublisher.probe[Int]() val sinkProbe = TestSubscriber.probe[Int]() + val exceptionLatch = TestLatch() val future = Source.fromPublisher(sourceProbe) - .conflateWithSeed(seed = i ⇒ - if (i % 2 == 0) throw TE("I hate even seed numbers") else i)(aggregate = (sum, i) ⇒ sum + i) + .conflateWithSeed { i ⇒ + if (i % 2 == 0) { + exceptionLatch.open() + throw TE("I hate even seed numbers") + } else i + } { (sum, i) ⇒ + sum + i + } .withAttributes(supervisionStrategy(restartingDecider)) .to(Sink.fromSubscriber(sinkProbe)) .withAttributes(inputBuffer(initial = 1, max = 1)) @@ -154,6 +162,7 @@ class FlowConflateSpec extends AkkaSpec { val sub = sourceProbe.expectSubscription() val sinkSub = sinkProbe.expectSubscription() + // push the first value sub.expectRequest(1) sub.sendNext(1) @@ -165,6 +174,11 @@ class FlowConflateSpec extends AkkaSpec { sub.expectRequest(1) sub.sendNext(2) + + // make sure the seed exception happened + // before going any further + Await.result(exceptionLatch, 3.seconds) + sub.expectRequest(1) sub.sendNext(3) @@ -174,10 +188,13 @@ class FlowConflateSpec extends AkkaSpec { } "restart when `aggregate` throws and a restartingDecider is used" in { + val latch = TestLatch() val conflate = Flow[String] .conflateWithSeed(seed = i ⇒ i)((state, elem) ⇒ - if (elem == "two") throw TE("two is a three letter word") - else state + elem) + if (elem == "two") { + latch.open() + throw TE("two is a three letter word") + } else state + elem) .withAttributes(supervisionStrategy(restartingDecider)) val sourceProbe = TestPublisher.probe[String]() @@ -190,6 +207,7 @@ class FlowConflateSpec extends AkkaSpec { .run() val sub = sourceProbe.expectSubscription() + sub.expectRequest(4) sub.sendNext("one") sub.sendNext("two") @@ -197,7 +215,8 @@ class FlowConflateSpec extends AkkaSpec { sub.sendComplete() // "one" should be lost - sinkProbe.requestNext() shouldEqual ("three") + Await.ready(latch, 3.seconds) + sinkProbe.requestNext() should ===("three") } @@ -205,10 +224,16 @@ class FlowConflateSpec extends AkkaSpec { val sourceProbe = TestPublisher.probe[Int]() val sinkProbe = TestSubscriber.probe[Vector[Int]]() + val saw4Latch = TestLatch() val future = Source.fromPublisher(sourceProbe) .conflateWithSeed(seed = i ⇒ Vector(i))((state, elem) ⇒ - if (elem == 2) throw TE("three is a four letter word") else state :+ elem) + if (elem == 2) { + throw TE("three is a four letter word") + } else { + if (elem == 4) saw4Latch.open() + state :+ elem + }) .withAttributes(supervisionStrategy(resumingDecider)) .to(Sink.fromSubscriber(sinkProbe)) .withAttributes(inputBuffer(initial = 1, max = 1)) @@ -233,7 +258,9 @@ class FlowConflateSpec extends AkkaSpec { // and consume it, so that the next element // will trigger seed + Await.ready(saw4Latch, 3.seconds) sinkSub.request(1) + sinkProbe.expectNext(Vector(1, 3, 4)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 669bff29c6..8494890c78 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -452,8 +452,10 @@ private[akka] final case class Batch[In, Out](max: Long, costFn: In ⇒ Long, se private var pending: In = null.asInstanceOf[In] private def flush(): Unit = { - push(out, agg) - left = max + if (agg != null) { + push(out, agg) + left = max + } if (pending != null) { try { agg = seed(pending)