diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/Utils.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/Utils.scala index ae3666a875..b816d73d97 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/Utils.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/Utils.scala @@ -7,6 +7,7 @@ import akka.stream.impl._ import akka.testkit.TestProbe import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ +import scala.util.Try import scala.util.control.NoStackTrace object Utils { @@ -19,9 +20,10 @@ object Utils { def assertAllStagesStopped[T](block: ⇒ T)(implicit materializer: FlowMaterializer): T = materializer match { case impl: ActorFlowMaterializerImpl ⇒ - impl.supervisor ! StreamSupervisor.StopChildren - val result = block val probe = TestProbe()(impl.system) + probe.send(impl.supervisor, StreamSupervisor.StopChildren) + probe.expectMsg(StreamSupervisor.StoppedChildren) + val result = block probe.within(5.seconds) { probe.awaitAssert { impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index 46e6265a92..3e7586188f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -4,11 +4,11 @@ package akka.stream.scaladsl import scala.collection.immutable -import scala.concurrent.Await +import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ +import scala.util.Try import scala.util.control.NoStackTrace -import akka.stream.ActorFlowMaterializer -import akka.stream.ActorFlowMaterializerSettings +import akka.stream.{ OperationAttributes, ActorFlowMaterializer, ActorFlowMaterializerSettings } import org.reactivestreams.Subscriber import akka.stream.testkit._ import akka.stream.testkit.Utils._ @@ -26,7 +26,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { def newHeadSink = Sink.head[(immutable.Seq[Int], Source[Int, _])] - "work on empty input" in { + "work on empty input" in assertAllStagesStopped { val futureSink = newHeadSink val fut = Source.empty.prefixAndTail(10).runWith(futureSink) val (prefix, tailFlow) = Await.result(fut, 3.seconds) @@ -36,7 +36,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { tailSubscriber.expectSubscriptionAndComplete() } - "work on short input" in { + "work on short input" in assertAllStagesStopped { val futureSink = newHeadSink val fut = Source(List(1, 2, 3)).prefixAndTail(10).runWith(futureSink) val (prefix, tailFlow) = Await.result(fut, 3.seconds) @@ -68,7 +68,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { Await.result(fut2, 3.seconds) should be(1 to 10) } - "handle negative take count" in { + "handle negative take count" in assertAllStagesStopped { val futureSink = newHeadSink val fut = Source(1 to 10).prefixAndTail(-1).runWith(futureSink) val (takes, tail) = Await.result(fut, 3.seconds) @@ -192,6 +192,23 @@ class FlowPrefixAndTailSpec extends AkkaSpec { upsub.expectCancellation() } + "work even if tail subscriber arrives after substream completion" in { + val pub = TestPublisher.manualProbe[Int]() + val sub = TestSubscriber.manualProbe[Int]() + + val f = Source(pub).prefixAndTail(1).runWith(Sink.head) + val s = pub.expectSubscription() + s.sendNext(0) + + val (_, tail) = Await.result(f, 3.seconds) + + val tailPub = tail.runWith(Sink.publisher) + s.sendComplete() + + tailPub.subscribe(sub) + sub.expectSubscriptionAndComplete() + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala index 637ed30a84..ee1671dedc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorFlowMaterializerImpl.scala @@ -256,6 +256,8 @@ private[akka] object StreamSupervisor { final case class Children(children: Set[ActorRef]) /** Testing purpose */ final case object StopChildren + /** Testing purpose */ + final case object StoppedChildren } private[akka] class StreamSupervisor(settings: ActorFlowMaterializerSettings) extends Actor { @@ -267,8 +269,10 @@ private[akka] class StreamSupervisor(settings: ActorFlowMaterializerSettings) ex case Materialize(props, name) ⇒ val impl = context.actorOf(props, name) sender() ! impl - case GetChildren ⇒ sender() ! Children(context.children.toSet) - case StopChildren ⇒ context.children.foreach(context.stop) + case GetChildren ⇒ sender() ! Children(context.children.toSet) + case StopChildren ⇒ + context.children.foreach(context.stop) + sender() ! StoppedChildren } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index acc4158d88..82e4a1aca9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -201,7 +201,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D } } - override def isClosed: Boolean = downstreamCompleted + override def isClosed: Boolean = downstreamCompleted && (subscriber ne null) protected def createSubscription(): Subscription = new ActorSubscription(actor, subscriber) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index 19a13fac44..b943b32ed3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -241,7 +241,7 @@ private[akka] abstract class FanIn(val settings: ActorFlowMaterializerSettings, protected def fail(e: Throwable): Unit = { if (settings.debugLogging) log.debug("fail due to: {}", e.getMessage) - inputBunch.cancel() + nextPhase(completedPhase) primaryOutputs.error(e) pump() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala index dc70e5e092..37a40c799c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala @@ -50,7 +50,7 @@ private[akka] class PrefixAndTailImpl(_settings: ActorFlowMaterializerSettings, } def emitEmptyTail(): Unit = { - primaryOutputs.enqueueOutputElement((taken, Source(EmptyPublisher[Any]))) + primaryOutputs.enqueueOutputElement((taken, Source.empty)) nextPhase(completedPhase) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 28be6e724b..6ff69ff9ae 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -82,9 +82,11 @@ private[akka] object MultiStreamOutputProcessor { private def closePublisher(withState: CompletedState): Unit = { subscriptionTimeout.cancel() state.getAndSet(withState) match { - case Attached(sub) ⇒ closeSubscriber(sub, withState) case _: CompletedState ⇒ throw new IllegalStateException("Attempted to double shutdown publisher") - case Open ⇒ // No action needed + case Attached(sub) ⇒ + if (subscriber eq null) tryOnSubscribe(sub, CancelledSubscription) + closeSubscriber(sub, withState) + case Open ⇒ // No action needed } }