From ecd469e53930911be88885eec67e3813f2852b87 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 22 May 2014 08:40:41 +0200 Subject: [PATCH] =str #15250 Use BlackholeConsumer for Flow.consume() * last operator in front of consume() was treated specially, running in impl.ActorConsumer instead of the ordinary processors --- .../scala/akka/stream/FlowMaterializer.scala | 11 +- .../impl/ActorBasedFlowMaterializer.scala | 29 +--- .../akka/stream/impl/ActorConsumer.scala | 146 ------------------ .../akka/stream/impl/BlackholeConsumer.scala | 49 ++++++ .../scala/akka/stream/impl/FlowImpl.scala | 5 +- .../stream/impl/SingleStreamProcessors.scala | 2 +- .../java/akka/stream/javadsl/DuctTest.java | 17 +- .../akka/stream/FlowOnCompleteSpec.scala | 3 +- 8 files changed, 60 insertions(+), 202 deletions(-) delete mode 100644 akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/BlackholeConsumer.scala diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 17f6dcc7bd..079d2137f4 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -46,7 +46,7 @@ object FlowMaterializer { * steps are split up into asynchronous regions is implementation * dependent. */ -abstract class FlowMaterializer { +abstract class FlowMaterializer(val settings: MaterializerSettings) { /** * The `namePrefix` is used as the first part of the names of the actors running @@ -59,21 +59,12 @@ abstract class FlowMaterializer { * ops are stored in reverse order */ private[akka] def toProducer[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Producer[O] - /** - * INTERNAL API - */ - private[akka] def consume[I](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Unit /** * INTERNAL API */ private[akka] def ductProduceTo[In, Out](consumer: Consumer[Out], ops: List[Ast.AstNode]): Consumer[In] - /** - * INTERNAL API - */ - private[akka] def ductConsume[In](ops: List[Ast.AstNode]): Consumer[In] - /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index aeb3aa97f6..69742f7515 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -27,7 +27,7 @@ import akka.stream.actor.ActorConsumer * INTERNAL API */ private[akka] object Ast { - trait AstNode { + sealed trait AstNode { def name: String } @@ -120,10 +120,10 @@ private[akka] object ActorBasedFlowMaterializer { * INTERNAL API */ private[akka] class ActorBasedFlowMaterializer( - val settings: MaterializerSettings, + settings: MaterializerSettings, _context: ActorRefFactory, namePrefix: String) - extends FlowMaterializer { + extends FlowMaterializer(settings) { import Ast._ import ActorBasedFlowMaterializer._ @@ -187,26 +187,6 @@ private[akka] class ActorBasedFlowMaterializer( override def onNext(element: Any) = List(element) }) - override def consume[I](producerNode: ProducerNode[I], ops: List[AstNode]): Unit = { - val flowName = createFlowName() - val consumer = consume(ops, flowName) - producerNode.createProducer(this, flowName).produceTo(consumer.asInstanceOf[Consumer[I]]) - } - - private def consume[In, Out](ops: List[Ast.AstNode], flowName: String): Consumer[In] = { - val c = ops match { - case Nil ⇒ - ActorConsumer[Any](context.actorOf(ActorConsumerProps.props(settings, blackholeTransform), - name = s"$flowName-1-consume")) - case head :: tail ⇒ - val opsSize = ops.size - val c = ActorConsumer[Any](context.actorOf(ActorConsumerProps.props(settings, head), - name = s"$flowName-$opsSize-${head.name}")) - processorChain(c, tail, flowName, ops.size - 1) - } - c.asInstanceOf[Consumer[In]] - } - def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = new ActorProcessor(context.actorOf(ActorProcessor.props(settings, op), name = s"$flowName-$n-${op.name}")) @@ -214,9 +194,6 @@ private[akka] class ActorBasedFlowMaterializer( override def ductProduceTo[In, Out](consumer: Consumer[Out], ops: List[Ast.AstNode]): Consumer[In] = processorChain(consumer, ops, createFlowName(), ops.size).asInstanceOf[Consumer[In]] - override def ductConsume[In](ops: List[Ast.AstNode]): Consumer[In] = - consume(ops, createFlowName) - override def ductBuild[In, Out](ops: List[Ast.AstNode]): (Consumer[In], Producer[Out]) = { val flowName = createFlowName() if (ops.isEmpty) { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala deleted file mode 100644 index 39406f9d9a..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala +++ /dev/null @@ -1,146 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import scala.concurrent.duration.Duration -import scala.util.{ Failure, Success } -import scala.util.control.NonFatal -import org.reactivestreams.api.Consumer -import org.reactivestreams.spi.{ Subscriber, Subscription } -import Ast.{ AstNode, Transform } -import akka.actor.{ Actor, ActorLogging, ActorRef, Props, actorRef2Scala } -import akka.stream.MaterializerSettings -import akka.stream.Transformer -import akka.stream.actor.ActorConsumer.{ OnNext, OnError, OnComplete, OnSubscribe } - -/** - * INTERNAL API - */ -private[akka] object ActorConsumerProps { - import Ast._ - - def props(settings: MaterializerSettings, op: AstNode) = op match { - case t: Transform ⇒ Props(new TransformActorConsumer(settings, t.transformer)) withDispatcher (settings.dispatcher) - } -} - -/** - * INTERNAL API - */ -private[akka] abstract class AbstractActorConsumer(val settings: MaterializerSettings) extends Actor with SoftShutdown { - import ActorProcessor._ - import ActorBasedFlowMaterializer._ - - /** - * Consume one element synchronously: the Actor mailbox is the queue. - */ - def onNext(elem: Any): Unit - - /** - * Must call shutdown() eventually. - */ - def onError(e: Throwable): Unit - - /** - * Must call shutdown() eventually. - */ - def onComplete(): Unit - - /** - * Terminate processing after the current message; will cancel the subscription if necessary. - */ - def shutdown(): Unit = softShutdown() - - context.setReceiveTimeout(settings.upstreamSubscriptionTimeout) - - final def receive = { - case OnSubscribe(sub) ⇒ - context.setReceiveTimeout(Duration.Undefined) - subscription = Some(sub) - requestMore() - context.become(active) - case OnError(cause) ⇒ - withCtx(context)(onError(cause)) - case OnComplete ⇒ - withCtx(context)(onComplete()) - } - - private var subscription: Option[Subscription] = None - - private val highWatermark = settings.maximumInputBufferSize - private val lowWatermark = Math.max(1, highWatermark / 2) - private var requested = 0 - private def requestMore(): Unit = - if (requested < lowWatermark) { - val amount = highWatermark - requested - subscription.get.requestMore(amount) - requested += amount - } - private def gotOne(): Unit = { - requested -= 1 - requestMore() - } - - final def active: Receive = { - case OnSubscribe(sub) ⇒ sub.cancel() - case OnNext(elem) ⇒ { gotOne(); withCtx(context)(onNext(elem)) } - case OnError(cause) ⇒ { subscription = None; withCtx(context)(onError(cause)) } - case OnComplete ⇒ { subscription = None; withCtx(context)(onComplete()) } - } - - override def postStop(): Unit = { - subscription foreach (_.cancel()) - } -} - -/** - * INTERNAL API - */ -private[akka] class TransformActorConsumer(_settings: MaterializerSettings, transformer: Transformer[Any, Any]) extends AbstractActorConsumer(_settings) with ActorLogging { - - var error: Option[Throwable] = None // Null is the proper default here - - var hasCleanupRun = false - private var onCompleteCalled = false - private def callOnComplete(): Unit = { - if (!onCompleteCalled) { - onCompleteCalled = true - try transformer.onTermination(error) - catch { case NonFatal(e) ⇒ log.error(e, "failure during onTermination") } - shutdown() - } - } - - override def onNext(elem: Any): Unit = { - transformer.onNext(elem) - if (transformer.isComplete) - callOnComplete() - } - - override def onError(cause: Throwable): Unit = { - try { - transformer.onError(cause) - error = Some(cause) - onComplete() - } catch { - case NonFatal(e) ⇒ - log.error(e, "terminating due to onError") - shutdown() - } - } - - override def onComplete(): Unit = { - callOnComplete() - } - - override def softShutdown(): Unit = { - transformer.cleanup() - hasCleanupRun = true // for postStop - super.softShutdown() - } - - override def postStop(): Unit = { - try super.postStop() finally if (!hasCleanupRun) transformer.cleanup() - } -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/BlackholeConsumer.scala b/akka-stream/src/main/scala/akka/stream/impl/BlackholeConsumer.scala new file mode 100644 index 0000000000..3ddfc2957b --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/BlackholeConsumer.scala @@ -0,0 +1,49 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import org.reactivestreams.api.Consumer +import org.reactivestreams.spi.Subscription +import org.reactivestreams.spi.Subscriber + +/** + * INTERNAL API + */ + +private[akka] class BlackholeConsumer[T](highWatermark: Int) extends Consumer[T] with Subscriber[T] { + + private val lowWatermark = Math.max(1, highWatermark / 2) + private var requested = 0 + + private var subscription: Subscription = _ + + override def getSubscriber: Subscriber[T] = this + + override def onSubscribe(sub: Subscription): Unit = { + subscription = sub + subscription.requestMore(1) + } + + override def onError(cause: Throwable): Unit = () + + override def onComplete(): Unit = () + + override def onNext(element: T): Unit = { + gotOne() + subscription.requestMore(1) + } + + private def gotOne(): Unit = { + requested -= 1 + requestMore() + } + + private def requestMore(): Unit = + if (requested < lowWatermark) { + val amount = highWatermark - requested + subscription.requestMore(amount) + requested += amount + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index e700a23741..f4338d209d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -47,7 +47,8 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: p.future } - override def consume(materializer: FlowMaterializer): Unit = materializer.consume(producerNode, ops) + override def consume(materializer: FlowMaterializer): Unit = + produceTo(materializer, new BlackholeConsumer(materializer.settings.maximumInputBufferSize)) override def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] ⇒ Unit): Unit = transform(new Transformer[O, Unit] { @@ -88,7 +89,7 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[ materializer.ductProduceTo(consumer, ops) override def consume(materializer: FlowMaterializer): Consumer[In] = - materializer.ductConsume(ops) + produceTo(materializer, new BlackholeConsumer(materializer.settings.maximumInputBufferSize)) override def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] ⇒ Unit): Consumer[In] = transform(new Transformer[Out, Unit] { diff --git a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala index 3b8922776f..70ebef1f1b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala @@ -28,7 +28,7 @@ private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, tran } object NeedsInputAndDemandOrCompletion extends TransferState { - def isReady = (primaryInputs.inputsAvailable && primaryOutputs.demandAvailable) || primaryInputs.inputsDepleted + def isReady = (primaryInputs.inputsAvailable && primaryOutputs.demandAvailable) || transformer.isComplete || primaryInputs.inputsDepleted def isCompleted = false } diff --git a/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java b/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java index de847acf2a..bb6df57994 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java @@ -1,35 +1,20 @@ package akka.stream.javadsl; -import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; - import org.junit.ClassRule; import org.junit.Test; - -import static org.junit.Assert.assertEquals; - import org.reactivestreams.api.Consumer; import org.reactivestreams.api.Producer; - -import scala.concurrent.Await; -import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; + import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.japi.Function; import akka.japi.Function2; import akka.japi.Procedure; -import akka.japi.Util; import akka.stream.FlowMaterializer; import akka.stream.MaterializerSettings; -import akka.stream.Transformer; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; diff --git a/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala index 8784f41756..6276f8bebc 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala @@ -14,6 +14,7 @@ import akka.stream.scaladsl.Flow import akka.testkit.TestProbe import scala.util.Try import scala.util.Success +import scala.util.control.NoStackTrace @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { @@ -45,7 +46,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ } val proc = p.expectSubscription proc.expectRequestMore() - val ex = new RuntimeException("ex") + val ex = new RuntimeException("ex") with NoStackTrace proc.sendError(ex) onCompleteProbe.expectMsg(Failure(ex)) onCompleteProbe.expectNoMsg(100.millis)