=str #17453: Fix substream RS compliance, awaitAllStagesStopped and Fanin leak

This commit is contained in:
Endre Sándor Varga 2015-05-14 12:21:47 +02:00
parent 6d08cd48c6
commit 8a7f6a357d
7 changed files with 40 additions and 15 deletions

View file

@ -7,6 +7,7 @@ import akka.stream.impl._
import akka.testkit.TestProbe import akka.testkit.TestProbe
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
object Utils { object Utils {
@ -19,9 +20,10 @@ object Utils {
def assertAllStagesStopped[T](block: T)(implicit materializer: FlowMaterializer): T = def assertAllStagesStopped[T](block: T)(implicit materializer: FlowMaterializer): T =
materializer match { materializer match {
case impl: ActorFlowMaterializerImpl case impl: ActorFlowMaterializerImpl
impl.supervisor ! StreamSupervisor.StopChildren
val result = block
val probe = TestProbe()(impl.system) val probe = TestProbe()(impl.system)
probe.send(impl.supervisor, StreamSupervisor.StopChildren)
probe.expectMsg(StreamSupervisor.StoppedChildren)
val result = block
probe.within(5.seconds) { probe.within(5.seconds) {
probe.awaitAssert { probe.awaitAssert {
impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref) impl.supervisor.tell(StreamSupervisor.GetChildren, probe.ref)

View file

@ -4,11 +4,11 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Await import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.Try
import scala.util.control.NoStackTrace import scala.util.control.NoStackTrace
import akka.stream.ActorFlowMaterializer import akka.stream.{ OperationAttributes, ActorFlowMaterializer, ActorFlowMaterializerSettings }
import akka.stream.ActorFlowMaterializerSettings
import org.reactivestreams.Subscriber import org.reactivestreams.Subscriber
import akka.stream.testkit._ import akka.stream.testkit._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
@ -26,7 +26,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
def newHeadSink = Sink.head[(immutable.Seq[Int], Source[Int, _])] def newHeadSink = Sink.head[(immutable.Seq[Int], Source[Int, _])]
"work on empty input" in { "work on empty input" in assertAllStagesStopped {
val futureSink = newHeadSink val futureSink = newHeadSink
val fut = Source.empty.prefixAndTail(10).runWith(futureSink) val fut = Source.empty.prefixAndTail(10).runWith(futureSink)
val (prefix, tailFlow) = Await.result(fut, 3.seconds) val (prefix, tailFlow) = Await.result(fut, 3.seconds)
@ -36,7 +36,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
tailSubscriber.expectSubscriptionAndComplete() tailSubscriber.expectSubscriptionAndComplete()
} }
"work on short input" in { "work on short input" in assertAllStagesStopped {
val futureSink = newHeadSink val futureSink = newHeadSink
val fut = Source(List(1, 2, 3)).prefixAndTail(10).runWith(futureSink) val fut = Source(List(1, 2, 3)).prefixAndTail(10).runWith(futureSink)
val (prefix, tailFlow) = Await.result(fut, 3.seconds) val (prefix, tailFlow) = Await.result(fut, 3.seconds)
@ -68,7 +68,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
Await.result(fut2, 3.seconds) should be(1 to 10) Await.result(fut2, 3.seconds) should be(1 to 10)
} }
"handle negative take count" in { "handle negative take count" in assertAllStagesStopped {
val futureSink = newHeadSink val futureSink = newHeadSink
val fut = Source(1 to 10).prefixAndTail(-1).runWith(futureSink) val fut = Source(1 to 10).prefixAndTail(-1).runWith(futureSink)
val (takes, tail) = Await.result(fut, 3.seconds) val (takes, tail) = Await.result(fut, 3.seconds)
@ -192,6 +192,23 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
upsub.expectCancellation() upsub.expectCancellation()
} }
"work even if tail subscriber arrives after substream completion" in {
val pub = TestPublisher.manualProbe[Int]()
val sub = TestSubscriber.manualProbe[Int]()
val f = Source(pub).prefixAndTail(1).runWith(Sink.head)
val s = pub.expectSubscription()
s.sendNext(0)
val (_, tail) = Await.result(f, 3.seconds)
val tailPub = tail.runWith(Sink.publisher)
s.sendComplete()
tailPub.subscribe(sub)
sub.expectSubscriptionAndComplete()
}
} }
} }

View file

@ -256,6 +256,8 @@ private[akka] object StreamSupervisor {
final case class Children(children: Set[ActorRef]) final case class Children(children: Set[ActorRef])
/** Testing purpose */ /** Testing purpose */
final case object StopChildren final case object StopChildren
/** Testing purpose */
final case object StoppedChildren
} }
private[akka] class StreamSupervisor(settings: ActorFlowMaterializerSettings) extends Actor { private[akka] class StreamSupervisor(settings: ActorFlowMaterializerSettings) extends Actor {
@ -267,8 +269,10 @@ private[akka] class StreamSupervisor(settings: ActorFlowMaterializerSettings) ex
case Materialize(props, name) case Materialize(props, name)
val impl = context.actorOf(props, name) val impl = context.actorOf(props, name)
sender() ! impl sender() ! impl
case GetChildren sender() ! Children(context.children.toSet) case GetChildren sender() ! Children(context.children.toSet)
case StopChildren context.children.foreach(context.stop) case StopChildren
context.children.foreach(context.stop)
sender() ! StoppedChildren
} }
} }

View file

@ -201,7 +201,7 @@ private[akka] class SimpleOutputs(val actor: ActorRef, val pump: Pump) extends D
} }
} }
override def isClosed: Boolean = downstreamCompleted override def isClosed: Boolean = downstreamCompleted && (subscriber ne null)
protected def createSubscription(): Subscription = new ActorSubscription(actor, subscriber) protected def createSubscription(): Subscription = new ActorSubscription(actor, subscriber)

View file

@ -241,7 +241,7 @@ private[akka] abstract class FanIn(val settings: ActorFlowMaterializerSettings,
protected def fail(e: Throwable): Unit = { protected def fail(e: Throwable): Unit = {
if (settings.debugLogging) if (settings.debugLogging)
log.debug("fail due to: {}", e.getMessage) log.debug("fail due to: {}", e.getMessage)
inputBunch.cancel() nextPhase(completedPhase)
primaryOutputs.error(e) primaryOutputs.error(e)
pump() pump()
} }

View file

@ -50,7 +50,7 @@ private[akka] class PrefixAndTailImpl(_settings: ActorFlowMaterializerSettings,
} }
def emitEmptyTail(): Unit = { def emitEmptyTail(): Unit = {
primaryOutputs.enqueueOutputElement((taken, Source(EmptyPublisher[Any]))) primaryOutputs.enqueueOutputElement((taken, Source.empty))
nextPhase(completedPhase) nextPhase(completedPhase)
} }

View file

@ -82,9 +82,11 @@ private[akka] object MultiStreamOutputProcessor {
private def closePublisher(withState: CompletedState): Unit = { private def closePublisher(withState: CompletedState): Unit = {
subscriptionTimeout.cancel() subscriptionTimeout.cancel()
state.getAndSet(withState) match { state.getAndSet(withState) match {
case Attached(sub) closeSubscriber(sub, withState)
case _: CompletedState throw new IllegalStateException("Attempted to double shutdown publisher") case _: CompletedState throw new IllegalStateException("Attempted to double shutdown publisher")
case Open // No action needed case Attached(sub)
if (subscriber eq null) tryOnSubscribe(sub, CancelledSubscription)
closeSubscriber(sub, withState)
case Open // No action needed
} }
} }