=str #15250 Use BlackholeConsumer for Flow.consume()

* last operator in front of consume() was treated specially, running
  in impl.ActorConsumer instead of the ordinary processors
This commit is contained in:
Patrik Nordwall 2014-05-22 08:40:41 +02:00
parent f93e3c21c0
commit ecd469e539
8 changed files with 60 additions and 202 deletions

View file

@ -46,7 +46,7 @@ object FlowMaterializer {
* steps are split up into asynchronous regions is implementation * steps are split up into asynchronous regions is implementation
* dependent. * dependent.
*/ */
abstract class FlowMaterializer { abstract class FlowMaterializer(val settings: MaterializerSettings) {
/** /**
* The `namePrefix` is used as the first part of the names of the actors running * The `namePrefix` is used as the first part of the names of the actors running
@ -59,21 +59,12 @@ abstract class FlowMaterializer {
* ops are stored in reverse order * ops are stored in reverse order
*/ */
private[akka] def toProducer[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Producer[O] private[akka] def toProducer[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Producer[O]
/**
* INTERNAL API
*/
private[akka] def consume[I](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Unit
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] def ductProduceTo[In, Out](consumer: Consumer[Out], ops: List[Ast.AstNode]): Consumer[In] private[akka] def ductProduceTo[In, Out](consumer: Consumer[Out], ops: List[Ast.AstNode]): Consumer[In]
/**
* INTERNAL API
*/
private[akka] def ductConsume[In](ops: List[Ast.AstNode]): Consumer[In]
/** /**
* INTERNAL API * INTERNAL API
*/ */

View file

@ -27,7 +27,7 @@ import akka.stream.actor.ActorConsumer
* INTERNAL API * INTERNAL API
*/ */
private[akka] object Ast { private[akka] object Ast {
trait AstNode { sealed trait AstNode {
def name: String def name: String
} }
@ -120,10 +120,10 @@ private[akka] object ActorBasedFlowMaterializer {
* INTERNAL API * INTERNAL API
*/ */
private[akka] class ActorBasedFlowMaterializer( private[akka] class ActorBasedFlowMaterializer(
val settings: MaterializerSettings, settings: MaterializerSettings,
_context: ActorRefFactory, _context: ActorRefFactory,
namePrefix: String) namePrefix: String)
extends FlowMaterializer { extends FlowMaterializer(settings) {
import Ast._ import Ast._
import ActorBasedFlowMaterializer._ import ActorBasedFlowMaterializer._
@ -187,26 +187,6 @@ private[akka] class ActorBasedFlowMaterializer(
override def onNext(element: Any) = List(element) override def onNext(element: Any) = List(element)
}) })
override def consume[I](producerNode: ProducerNode[I], ops: List[AstNode]): Unit = {
val flowName = createFlowName()
val consumer = consume(ops, flowName)
producerNode.createProducer(this, flowName).produceTo(consumer.asInstanceOf[Consumer[I]])
}
private def consume[In, Out](ops: List[Ast.AstNode], flowName: String): Consumer[In] = {
val c = ops match {
case Nil
ActorConsumer[Any](context.actorOf(ActorConsumerProps.props(settings, blackholeTransform),
name = s"$flowName-1-consume"))
case head :: tail
val opsSize = ops.size
val c = ActorConsumer[Any](context.actorOf(ActorConsumerProps.props(settings, head),
name = s"$flowName-$opsSize-${head.name}"))
processorChain(c, tail, flowName, ops.size - 1)
}
c.asInstanceOf[Consumer[In]]
}
def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] =
new ActorProcessor(context.actorOf(ActorProcessor.props(settings, op), new ActorProcessor(context.actorOf(ActorProcessor.props(settings, op),
name = s"$flowName-$n-${op.name}")) name = s"$flowName-$n-${op.name}"))
@ -214,9 +194,6 @@ private[akka] class ActorBasedFlowMaterializer(
override def ductProduceTo[In, Out](consumer: Consumer[Out], ops: List[Ast.AstNode]): Consumer[In] = override def ductProduceTo[In, Out](consumer: Consumer[Out], ops: List[Ast.AstNode]): Consumer[In] =
processorChain(consumer, ops, createFlowName(), ops.size).asInstanceOf[Consumer[In]] processorChain(consumer, ops, createFlowName(), ops.size).asInstanceOf[Consumer[In]]
override def ductConsume[In](ops: List[Ast.AstNode]): Consumer[In] =
consume(ops, createFlowName)
override def ductBuild[In, Out](ops: List[Ast.AstNode]): (Consumer[In], Producer[Out]) = { override def ductBuild[In, Out](ops: List[Ast.AstNode]): (Consumer[In], Producer[Out]) = {
val flowName = createFlowName() val flowName = createFlowName()
if (ops.isEmpty) { if (ops.isEmpty) {

View file

@ -1,146 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success }
import scala.util.control.NonFatal
import org.reactivestreams.api.Consumer
import org.reactivestreams.spi.{ Subscriber, Subscription }
import Ast.{ AstNode, Transform }
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, actorRef2Scala }
import akka.stream.MaterializerSettings
import akka.stream.Transformer
import akka.stream.actor.ActorConsumer.{ OnNext, OnError, OnComplete, OnSubscribe }
/**
* INTERNAL API
*/
private[akka] object ActorConsumerProps {
import Ast._
def props(settings: MaterializerSettings, op: AstNode) = op match {
case t: Transform Props(new TransformActorConsumer(settings, t.transformer)) withDispatcher (settings.dispatcher)
}
}
/**
* INTERNAL API
*/
private[akka] abstract class AbstractActorConsumer(val settings: MaterializerSettings) extends Actor with SoftShutdown {
import ActorProcessor._
import ActorBasedFlowMaterializer._
/**
* Consume one element synchronously: the Actor mailbox is the queue.
*/
def onNext(elem: Any): Unit
/**
* Must call shutdown() eventually.
*/
def onError(e: Throwable): Unit
/**
* Must call shutdown() eventually.
*/
def onComplete(): Unit
/**
* Terminate processing after the current message; will cancel the subscription if necessary.
*/
def shutdown(): Unit = softShutdown()
context.setReceiveTimeout(settings.upstreamSubscriptionTimeout)
final def receive = {
case OnSubscribe(sub)
context.setReceiveTimeout(Duration.Undefined)
subscription = Some(sub)
requestMore()
context.become(active)
case OnError(cause)
withCtx(context)(onError(cause))
case OnComplete
withCtx(context)(onComplete())
}
private var subscription: Option[Subscription] = None
private val highWatermark = settings.maximumInputBufferSize
private val lowWatermark = Math.max(1, highWatermark / 2)
private var requested = 0
private def requestMore(): Unit =
if (requested < lowWatermark) {
val amount = highWatermark - requested
subscription.get.requestMore(amount)
requested += amount
}
private def gotOne(): Unit = {
requested -= 1
requestMore()
}
final def active: Receive = {
case OnSubscribe(sub) sub.cancel()
case OnNext(elem) { gotOne(); withCtx(context)(onNext(elem)) }
case OnError(cause) { subscription = None; withCtx(context)(onError(cause)) }
case OnComplete { subscription = None; withCtx(context)(onComplete()) }
}
override def postStop(): Unit = {
subscription foreach (_.cancel())
}
}
/**
* INTERNAL API
*/
private[akka] class TransformActorConsumer(_settings: MaterializerSettings, transformer: Transformer[Any, Any]) extends AbstractActorConsumer(_settings) with ActorLogging {
var error: Option[Throwable] = None // Null is the proper default here
var hasCleanupRun = false
private var onCompleteCalled = false
private def callOnComplete(): Unit = {
if (!onCompleteCalled) {
onCompleteCalled = true
try transformer.onTermination(error)
catch { case NonFatal(e) log.error(e, "failure during onTermination") }
shutdown()
}
}
override def onNext(elem: Any): Unit = {
transformer.onNext(elem)
if (transformer.isComplete)
callOnComplete()
}
override def onError(cause: Throwable): Unit = {
try {
transformer.onError(cause)
error = Some(cause)
onComplete()
} catch {
case NonFatal(e)
log.error(e, "terminating due to onError")
shutdown()
}
}
override def onComplete(): Unit = {
callOnComplete()
}
override def softShutdown(): Unit = {
transformer.cleanup()
hasCleanupRun = true // for postStop
super.softShutdown()
}
override def postStop(): Unit = {
try super.postStop() finally if (!hasCleanupRun) transformer.cleanup()
}
}

View file

@ -0,0 +1,49 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import org.reactivestreams.api.Consumer
import org.reactivestreams.spi.Subscription
import org.reactivestreams.spi.Subscriber
/**
* INTERNAL API
*/
private[akka] class BlackholeConsumer[T](highWatermark: Int) extends Consumer[T] with Subscriber[T] {
private val lowWatermark = Math.max(1, highWatermark / 2)
private var requested = 0
private var subscription: Subscription = _
override def getSubscriber: Subscriber[T] = this
override def onSubscribe(sub: Subscription): Unit = {
subscription = sub
subscription.requestMore(1)
}
override def onError(cause: Throwable): Unit = ()
override def onComplete(): Unit = ()
override def onNext(element: T): Unit = {
gotOne()
subscription.requestMore(1)
}
private def gotOne(): Unit = {
requested -= 1
requestMore()
}
private def requestMore(): Unit =
if (requested < lowWatermark) {
val amount = highWatermark - requested
subscription.requestMore(amount)
requested += amount
}
}

View file

@ -47,7 +47,8 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops:
p.future p.future
} }
override def consume(materializer: FlowMaterializer): Unit = materializer.consume(producerNode, ops) override def consume(materializer: FlowMaterializer): Unit =
produceTo(materializer, new BlackholeConsumer(materializer.settings.maximumInputBufferSize))
override def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] Unit): Unit = override def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] Unit): Unit =
transform(new Transformer[O, Unit] { transform(new Transformer[O, Unit] {
@ -88,7 +89,7 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[
materializer.ductProduceTo(consumer, ops) materializer.ductProduceTo(consumer, ops)
override def consume(materializer: FlowMaterializer): Consumer[In] = override def consume(materializer: FlowMaterializer): Consumer[In] =
materializer.ductConsume(ops) produceTo(materializer, new BlackholeConsumer(materializer.settings.maximumInputBufferSize))
override def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] Unit): Consumer[In] = override def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] Unit): Consumer[In] =
transform(new Transformer[Out, Unit] { transform(new Transformer[Out, Unit] {

View file

@ -28,7 +28,7 @@ private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, tran
} }
object NeedsInputAndDemandOrCompletion extends TransferState { object NeedsInputAndDemandOrCompletion extends TransferState {
def isReady = (primaryInputs.inputsAvailable && primaryOutputs.demandAvailable) || primaryInputs.inputsDepleted def isReady = (primaryInputs.inputsAvailable && primaryOutputs.demandAvailable) || transformer.isComplete || primaryInputs.inputsDepleted
def isCompleted = false def isCompleted = false
} }

View file

@ -1,35 +1,20 @@
package akka.stream.javadsl; package akka.stream.javadsl;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assert.assertEquals;
import org.reactivestreams.api.Consumer; import org.reactivestreams.api.Consumer;
import org.reactivestreams.api.Producer; import org.reactivestreams.api.Producer;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration; import scala.concurrent.duration.FiniteDuration;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.japi.Function; import akka.japi.Function;
import akka.japi.Function2; import akka.japi.Function2;
import akka.japi.Procedure; import akka.japi.Procedure;
import akka.japi.Util;
import akka.stream.FlowMaterializer; import akka.stream.FlowMaterializer;
import akka.stream.MaterializerSettings; import akka.stream.MaterializerSettings;
import akka.stream.Transformer;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;

View file

@ -14,6 +14,7 @@ import akka.stream.scaladsl.Flow
import akka.testkit.TestProbe import akka.testkit.TestProbe
import scala.util.Try import scala.util.Try
import scala.util.Success import scala.util.Success
import scala.util.control.NoStackTrace
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
@ -45,7 +46,7 @@ class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ } Flow(p).onComplete(materializer) { onCompleteProbe.ref ! _ }
val proc = p.expectSubscription val proc = p.expectSubscription
proc.expectRequestMore() proc.expectRequestMore()
val ex = new RuntimeException("ex") val ex = new RuntimeException("ex") with NoStackTrace
proc.sendError(ex) proc.sendError(ex)
onCompleteProbe.expectMsg(Failure(ex)) onCompleteProbe.expectMsg(Failure(ex))
onCompleteProbe.expectNoMsg(100.millis) onCompleteProbe.expectNoMsg(100.millis)