pekko/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala

669 lines
21 KiB
Scala
Raw Normal View History

/*
* Copyright (C) 2014-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl
import java.util.function.BinaryOperator
import akka.NotUsed
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.dispatch.ExecutionContexts
import akka.event.Logging
import akka.stream.ActorAttributes.StreamSubscriptionTimeout
2015-12-04 09:37:32 -05:00
import akka.stream.Attributes.InputBuffer
import akka.stream._
import akka.stream.impl.QueueSink.Output
import akka.stream.impl.QueueSink.Pull
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.AtomicModule
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.SinkQueueWithCancel
import akka.stream.scaladsl.Source
2016-01-16 12:17:19 -05:00
import akka.stream.stage._
Various scala-2.13.0-M5 fixes fix akka-actor-tests compile errors some tests still fail though Fix test failures in akka-actor-test Manually work arround missing implicit Factory[Nothing, Seq[Nothing]] see https://github.com/scala/scala-collection-compat/issues/137 akka-remote scalafix changes Fix shutdownAll compile error test:akka-remote scalafix changes akka-multi-node-testkit scalafix Fix akka-remote-tests multi-jvm compile errors akka-stream-tests/test:scalafix Fix test:akka-stream-tests Crude implementation of ByteString.map scalafix akka-actor-typed, akka-actor-typed-tests akka-actor-typed-tests compile and succeed scalafix akka-camel scalafix akka-cluster akka-cluster compile & test scalafix akka-cluster-metrics Fix akka-cluster-metrics scalafix akka-cluster-tools akka-cluster-tools compile and test scalafix akka-distributed-data akka-distributed-data fixes scalafix akka-persistence scalafix akka-cluster-sharding fix akka-cluster-sharding scalafix akka-contrib Fix akka-cluster-sharding-typed test scalafix akka-docs Use scala-stm 0.9 (released for M5) akka-docs Remove dependency on collections-compat Cherry-pick the relevant constructs to our own private utils Shorten 'scala.collections.immutable' by importing it Duplicate 'immutable' imports Use 'foreach' on futures Replace MapLike with regular Map Internal API markers Simplify ccompat by moving PackageShared into object Since we don't currently need to differentiate between 2.11 and Avoid relying on 'union' (and ++) being left-biased Fix akka-actor/doc by removing -Ywarn-unused Make more things more private Copyright headers Use 'unsorted' to go from SortedSet to Set Duplicate import Use onComplete rather than failed.foreach Clarify why we partly duplicate scala-collection-compat
2018-11-22 16:18:10 +01:00
import akka.util.ccompat._
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
2017-11-29 13:49:31 -03:00
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.collection.mutable
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import scala.util.control.NonFatal
/**
* INTERNAL API
*/
2019-03-11 10:38:24 +01:00
@DoNotInherit private[akka] abstract class SinkModule[-In, Mat](val shape: SinkShape[In])
extends AtomicModule[SinkShape[In], Mat] {
/**
* Create the Subscriber or VirtualPublisher that consumes the incoming
* stream, plus the materialized value. Since Subscriber and VirtualPublisher
* do not share a common supertype apart from AnyRef this is what the type
* union devolves into; unfortunately we do not have union types at our
* disposal at this point.
*/
def create(context: MaterializationContext): (AnyRef, Mat)
def attributes: Attributes
2016-07-27 13:29:23 +02:00
override def traversalBuilder: TraversalBuilder =
LinearTraversalBuilder.fromModule(this, attributes).makeIsland(SinkModuleIslandTag)
// This is okay since we the only caller of this method is right below.
2016-07-27 13:29:23 +02:00
// TODO: Remove this, no longer needed
protected def newInstance(s: SinkShape[In] @uncheckedVariance): SinkModule[In, Mat]
protected def amendShape(attr: Attributes): SinkShape[In] = {
2016-07-27 13:29:23 +02:00
val thisN = traversalBuilder.attributes.nameOrDefault(null)
val thatN = attr.nameOrDefault(null)
if ((thatN eq null) || thisN == thatN) shape
else shape.copy(in = Inlet(thatN + ".in"))
}
protected def label: String = Logging.simpleName(this)
final override def toString: String = f"$label [${System.identityHashCode(this)}%08x]"
}
/**
* INTERNAL API
* Holds the downstream-most [[org.reactivestreams.Publisher]] interface of the materialized flow.
* The stream will not have any subscribers attached at this point, which means that after prefetching
* elements to fill the internal buffers it will assert back-pressure until
* a subscriber connects and creates demand for elements to be emitted.
*/
2019-03-11 10:38:24 +01:00
@InternalApi private[akka] class PublisherSink[In](val attributes: Attributes, shape: SinkShape[In])
extends SinkModule[In, Publisher[In]](shape) {
/*
* This method is the reason why SinkModule.create may return something that is
* not a Subscriber: a VirtualPublisher is used in order to avoid the immediate
* subscription a VirtualProcessor would perform (and it also saves overhead).
*/
override def create(context: MaterializationContext): (AnyRef, Publisher[In]) = {
val proc = new VirtualPublisher[In]
context.materializer match {
case am: ActorMaterializer =>
val StreamSubscriptionTimeout(timeout, mode) =
context.effectiveAttributes.mandatoryAttribute[StreamSubscriptionTimeout]
if (mode != StreamSubscriptionTimeoutTerminationMode.noop) {
am.scheduleOnce(timeout, new Runnable {
def run(): Unit = proc.onSubscriptionTimeout(am, mode)
})
}
case _ => // not possible to setup timeout
}
(proc, proc)
}
2019-03-11 10:38:24 +01:00
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] =
new PublisherSink[In](attributes, shape)
override def withAttributes(attr: Attributes): SinkModule[In, Publisher[In]] =
new PublisherSink[In](attr, amendShape(attr))
}
/**
* INTERNAL API
*/
2019-03-11 10:38:24 +01:00
@InternalApi private[akka] final class FanoutPublisherSink[In](val attributes: Attributes, shape: SinkShape[In])
extends SinkModule[In, Publisher[In]](shape) {
override def create(context: MaterializationContext): (Subscriber[In], Publisher[In]) = {
2019-09-05 16:08:37 +02:00
val impl = context.materializer.actorOf(context, FanoutProcessorImpl.props(context.effectiveAttributes))
val fanoutProcessor = new ActorProcessor[In, In](impl)
// Resolve cyclic dependency with actor. This MUST be the first message no matter what.
impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]])
(fanoutProcessor, fanoutProcessor)
}
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Publisher[In]] =
new FanoutPublisherSink[In](attributes, shape)
2016-07-27 13:29:23 +02:00
override def withAttributes(attr: Attributes): SinkModule[In, Publisher[In]] =
new FanoutPublisherSink[In](attr, amendShape(attr))
}
/**
* INTERNAL API
* Attaches a subscriber to this stream.
*/
2019-03-13 10:56:20 +01:00
@InternalApi private[akka] final class SubscriberSink[In](
subscriber: Subscriber[In],
val attributes: Attributes,
shape: SinkShape[In])
2019-03-11 10:38:24 +01:00
extends SinkModule[In, NotUsed](shape) {
override def create(context: MaterializationContext) = (subscriber, NotUsed)
2019-03-11 10:38:24 +01:00
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, NotUsed] =
new SubscriberSink[In](subscriber, attributes, shape)
override def withAttributes(attr: Attributes): SinkModule[In, NotUsed] =
new SubscriberSink[In](subscriber, attr, amendShape(attr))
}
/**
* INTERNAL API
* A sink that immediately cancels its upstream upon materialization.
*/
2019-03-11 10:38:24 +01:00
@InternalApi private[akka] final class CancelSink(val attributes: Attributes, shape: SinkShape[Any])
extends SinkModule[Any, NotUsed](shape) {
override def create(context: MaterializationContext): (Subscriber[Any], NotUsed) =
(new CancellingSubscriber[Any], NotUsed)
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, NotUsed] =
new CancelSink(attributes, shape)
2016-07-27 13:29:23 +02:00
override def withAttributes(attr: Attributes): SinkModule[Any, NotUsed] = new CancelSink(attr, amendShape(attr))
}
/**
* INTERNAL API
*/
2019-03-11 10:38:24 +01:00
@InternalApi private[akka] final class TakeLastStage[T](n: Int)
extends GraphStageWithMaterializedValue[SinkShape[T], Future[immutable.Seq[T]]] {
if (n <= 0)
throw new IllegalArgumentException("requirement failed: n must be greater than 0")
val in: Inlet[T] = Inlet("takeLast.in")
override val shape: SinkShape[T] = SinkShape.of(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val p: Promise[immutable.Seq[T]] = Promise()
(new GraphStageLogic(shape) with InHandler {
private[this] val buffer = mutable.Queue.empty[T]
private[this] var count = 0
override def preStart(): Unit = pull(in)
override def onPush(): Unit = {
buffer.enqueue(grab(in))
if (count < n)
count += 1
else
buffer.dequeue()
pull(in)
}
override def onUpstreamFinish(): Unit = {
val elements = buffer.toList
buffer.clear()
p.trySuccess(elements)
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
p.tryFailure(ex)
failStage(ex)
}
setHandler(in, this)
}, p.future)
}
override def toString: String = "TakeLastStage"
}
/**
* INTERNAL API
*/
2019-03-11 10:38:24 +01:00
@InternalApi private[akka] final class HeadOptionStage[T]
extends GraphStageWithMaterializedValue[SinkShape[T], Future[Option[T]]] {
2016-01-14 15:22:25 +01:00
val in: Inlet[T] = Inlet("headOption.in")
override val shape: SinkShape[T] = SinkShape.of(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val p: Promise[Option[T]] = Promise()
(new GraphStageLogic(shape) with InHandler {
override def preStart(): Unit = pull(in)
def onPush(): Unit = {
p.trySuccess(Option(grab(in)))
completeStage()
}
override def onUpstreamFinish(): Unit = {
p.trySuccess(None)
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
p.tryFailure(ex)
failStage(ex)
}
override def postStop(): Unit = {
if (!p.isCompleted) p.failure(new AbruptStageTerminationException(this))
}
setHandler(in, this)
}, p.future)
}
2015-12-14 17:02:00 +01:00
override def toString: String = "HeadOptionStage"
}
2015-12-04 09:37:32 -05:00
/**
* INTERNAL API
*/
2019-03-11 10:38:24 +01:00
@InternalApi private[akka] final class SeqStage[T, That](implicit cbf: Factory[T, That with immutable.Iterable[_]])
extends GraphStageWithMaterializedValue[SinkShape[T], Future[That]] {
val in = Inlet[T]("seq.in")
override def toString: String = "SeqStage"
override val shape: SinkShape[T] = SinkShape.of(in)
override protected def initialAttributes: Attributes = DefaultAttributes.seqSink
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
2017-11-29 13:49:31 -03:00
val p: Promise[That] = Promise()
val logic = new GraphStageLogic(shape) with InHandler {
val buf = cbf.newBuilder
override def preStart(): Unit = pull(in)
def onPush(): Unit = {
buf += grab(in)
pull(in)
}
override def onUpstreamFinish(): Unit = {
val result = buf.result()
p.trySuccess(result)
completeStage()
}
override def onUpstreamFailure(ex: Throwable): Unit = {
p.tryFailure(ex)
failStage(ex)
}
override def postStop(): Unit = {
if (!p.isCompleted) p.failure(new AbruptStageTerminationException(this))
}
setHandler(in, this)
}
(logic, p.future)
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object QueueSink {
2016-01-14 15:22:25 +01:00
sealed trait Output[+T]
final case class Pull[T](promise: Promise[Option[T]]) extends Output[T]
case object Cancel extends Output[Nothing]
}
2015-12-04 09:37:32 -05:00
/**
* INTERNAL API
*/
2019-03-11 10:38:24 +01:00
@InternalApi private[akka] final class QueueSink[T]()
extends GraphStageWithMaterializedValue[SinkShape[T], SinkQueueWithCancel[T]] {
2016-01-16 12:17:19 -05:00
type Requested[E] = Promise[Option[E]]
2015-12-04 09:37:32 -05:00
val in = Inlet[T]("queueSink.in")
override def initialAttributes = DefaultAttributes.queueSink
2015-12-04 09:37:32 -05:00
override val shape: SinkShape[T] = SinkShape.of(in)
override def toString: String = "QueueSink"
2015-12-04 09:37:32 -05:00
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stageLogic = new GraphStageLogic(shape) with InHandler with SinkQueueWithCancel[T] {
type Received[E] = Try[Option[E]]
2015-12-04 09:37:32 -05:00
val maxBuffer = inheritedAttributes.get[InputBuffer](InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
2015-12-04 09:37:32 -05:00
var buffer: Buffer[Received[T]] = _
var currentRequest: Option[Requested[T]] = None
2015-12-04 09:37:32 -05:00
override def preStart(): Unit = {
// Allocates one additional element to hold stream
// closed/failure indicators
buffer = Buffer(maxBuffer + 1, inheritedAttributes)
setKeepGoing(true)
2015-12-04 09:37:32 -05:00
pull(in)
}
private val callback = getAsyncCallback[Output[T]] {
2019-03-11 10:38:24 +01:00
case QueueSink.Pull(pullPromise) =>
currentRequest match {
case Some(_) =>
pullPromise.failure(
new IllegalStateException(
"You have to wait for previous future to be resolved to send another request"))
case None =>
if (buffer.isEmpty) currentRequest = Some(pullPromise)
else {
if (buffer.used == maxBuffer) tryPull(in)
sendDownstream(pullPromise)
}
}
case QueueSink.Cancel => completeStage()
}
2015-12-04 09:37:32 -05:00
def sendDownstream(promise: Requested[T]): Unit = {
val e = buffer.dequeue()
promise.complete(e)
e match {
case Success(_: Some[_]) => //do nothing
case Success(None) => completeStage()
case Failure(t) => failStage(t)
2015-12-04 09:37:32 -05:00
}
}
def enqueueAndNotify(requested: Received[T]): Unit = {
buffer.enqueue(requested)
currentRequest match {
case Some(p) =>
2015-12-04 09:37:32 -05:00
sendDownstream(p)
currentRequest = None
case None => //do nothing
2015-12-04 09:37:32 -05:00
}
}
def onPush(): Unit = {
enqueueAndNotify(Success(Some(grab(in))))
if (buffer.used < maxBuffer) pull(in)
}
override def onUpstreamFinish(): Unit = enqueueAndNotify(Success(None))
override def onUpstreamFailure(ex: Throwable): Unit = enqueueAndNotify(Failure(ex))
setHandler(in, this)
2015-12-04 09:37:32 -05:00
// SinkQueueWithCancel impl
2015-12-04 09:37:32 -05:00
override def pull(): Future[Option[T]] = {
val p = Promise[Option[T]]
2019-03-11 10:38:24 +01:00
callback
.invokeWithFeedback(Pull(p))
.failed
.foreach {
case NonFatal(e) => p.tryFailure(e)
case _ => ()
}(akka.dispatch.ExecutionContexts.sameThreadExecutionContext)
2015-12-04 09:37:32 -05:00
p.future
}
2016-01-14 15:22:25 +01:00
override def cancel(): Unit = {
callback.invoke(QueueSink.Cancel)
2016-01-14 15:22:25 +01:00
}
}
(stageLogic, stageLogic)
2015-12-04 09:37:32 -05:00
}
}
/**
* INTERNAL API
*
* Helper class to be able to express collection as a fold using mutable data without
* accidentally sharing state between materializations
*/
@InternalApi private[akka] trait CollectorState[T, R] {
def accumulated(): Any
def update(elem: T): CollectorState[T, R]
def finish(): R
}
/**
* INTERNAL API
*
* Helper class to be able to express collection as a fold using mutable data
*/
@InternalApi private[akka] final class FirstCollectorState[T, R](
collectorFactory: () => java.util.stream.Collector[T, Any, R])
extends CollectorState[T, R] {
override def update(elem: T): CollectorState[T, R] = {
// on first update, return a new mutable collector to ensure not
// sharing collector between streams
val collector = collectorFactory()
val accumulator = collector.accumulator()
val accumulated = collector.supplier().get()
accumulator.accept(accumulated, elem)
new MutableCollectorState(collector, accumulator, accumulated)
}
override def accumulated(): Any = {
// only called if it is asked about accumulated before accepting a first element
val collector = collectorFactory()
collector.supplier().get()
}
override def finish(): R = {
// only called if completed without elements
val collector = collectorFactory()
collector.finisher().apply(collector.supplier().get())
}
}
2016-01-14 15:22:25 +01:00
/**
* INTERNAL API
*
* Helper class to be able to express collection as a fold using mutable data
*/
@InternalApi private[akka] final class MutableCollectorState[T, R](
collector: java.util.stream.Collector[T, Any, R],
accumulator: java.util.function.BiConsumer[Any, T],
val accumulated: Any)
extends CollectorState[T, R] {
2016-01-14 15:22:25 +01:00
override def update(elem: T): CollectorState[T, R] = {
2016-01-14 15:22:25 +01:00
accumulator.accept(accumulated, elem)
this
}
override def finish(): R = {
// only called if completed without elements
collector.finisher().apply(accumulated)
}
}
/**
* INTERNAL API
*
* Helper class to be able to express reduce as a fold for parallel collector without
* accidentally sharing state between materializations
*/
@InternalApi private[akka] trait ReducerState[T, R] {
def update(batch: Any): ReducerState[T, R]
def finish(): R
}
/**
* INTERNAL API
*
* Helper class to be able to express reduce as a fold for parallel collector
*/
@InternalApi private[akka] final class FirstReducerState[T, R](
collectorFactory: () => java.util.stream.Collector[T, Any, R])
extends ReducerState[T, R] {
def update(batch: Any): ReducerState[T, R] = {
// on first update, return a new mutable collector to ensure not
// sharing collector between streams
val collector = collectorFactory()
val combiner = collector.combiner()
new MutableReducerState(collector, combiner, batch)
}
def finish(): R = {
// only called if completed without elements
val collector = collectorFactory()
collector.finisher().apply(null)
}
}
2016-01-14 15:22:25 +01:00
/**
* INTERNAL API
*
* Helper class to be able to express reduce as a fold for parallel collector
*/
@InternalApi private[akka] final class MutableReducerState[T, R](
val collector: java.util.stream.Collector[T, Any, R],
val combiner: BinaryOperator[Any],
var reduced: Any)
extends ReducerState[T, R] {
2016-01-14 15:22:25 +01:00
def update(batch: Any): ReducerState[T, R] = {
reduced = combiner(reduced, batch)
2016-01-14 15:22:25 +01:00
this
}
def finish(): R = collector.finisher().apply(reduced)
}
2016-07-07 07:01:28 -04:00
/**
* INTERNAL API
*/
2019-03-11 10:38:24 +01:00
@InternalApi final private[stream] class LazySink[T, M](sinkFactory: T => Future[Sink[T, M]])
2019-10-16 17:02:12 +02:00
extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
2016-07-07 07:01:28 -04:00
val in = Inlet[T]("lazySink.in")
override def initialAttributes = DefaultAttributes.lazySink
override val shape: SinkShape[T] = SinkShape.of(in)
override def toString: String = "LazySink"
2019-10-16 17:02:12 +02:00
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[M]) = {
2016-07-07 07:01:28 -04:00
2019-10-16 17:02:12 +02:00
val promise = Promise[M]()
2016-07-07 07:01:28 -04:00
val stageLogic = new GraphStageLogic(shape) with InHandler {
var switching = false
2016-07-07 07:01:28 -04:00
override def preStart(): Unit = pull(in)
override def onPush(): Unit = {
val element = grab(in)
switching = true
val cb: AsyncCallback[Try[Sink[T, M]]] =
getAsyncCallback {
case Success(sink) =>
// check if the stage is still in need for the lazy sink
// (there could have been an onUpstreamFailure in the meantime that has completed the promise)
if (!promise.isCompleted) {
try {
val mat = switchTo(sink, element)
2019-10-16 17:02:12 +02:00
promise.success(mat)
setKeepGoing(true)
} catch {
case NonFatal(e) =>
promise.failure(e)
failStage(e)
}
}
case Failure(e) =>
promise.failure(e)
failStage(e)
}
2016-07-07 07:01:28 -04:00
try {
sinkFactory(element).onComplete(cb.invoke)(ExecutionContexts.sameThreadExecutionContext)
2016-07-07 07:01:28 -04:00
} catch {
case NonFatal(e) =>
promise.failure(e)
failStage(e)
2016-07-07 07:01:28 -04:00
}
}
override def onUpstreamFinish(): Unit = {
// ignore onUpstreamFinish while the stage is switching but setKeepGoing
//
if (switching) {
// there is a cached element -> the stage must not be shut down automatically because isClosed(in) is satisfied
setKeepGoing(true)
} else {
2019-10-16 17:02:12 +02:00
promise.failure(new NeverMaterializedException)
super.onUpstreamFinish()
}
2016-07-07 07:01:28 -04:00
}
override def onUpstreamFailure(ex: Throwable): Unit = {
promise.failure(ex)
super.onUpstreamFailure(ex)
2016-07-07 07:01:28 -04:00
}
2016-07-07 07:01:28 -04:00
setHandler(in, this)
private def switchTo(sink: Sink[T, M], firstElement: T): M = {
2017-02-16 03:19:25 -05:00
var firstElementPushed = false
2016-07-07 07:01:28 -04:00
val subOutlet = new SubSourceOutlet[T]("LazySink")
2016-07-07 07:01:28 -04:00
val matVal = Source.fromGraph(subOutlet.source).runWith(sink)(interpreter.subFusingMaterializer)
2016-07-07 07:01:28 -04:00
def maybeCompleteStage(): Unit = {
if (isClosed(in) && subOutlet.isClosed) {
completeStage()
}
2016-07-07 07:01:28 -04:00
}
// The stage must not be shut down automatically; it is completed when maybeCompleteStage decides
setKeepGoing(true)
2016-07-07 07:01:28 -04:00
2019-03-13 10:56:20 +01:00
setHandler(
in,
new InHandler {
override def onPush(): Unit = {
subOutlet.push(grab(in))
}
override def onUpstreamFinish(): Unit = {
if (firstElementPushed) {
subOutlet.complete()
maybeCompleteStage()
}
}
override def onUpstreamFailure(ex: Throwable): Unit = {
// propagate exception irrespective if the cached element has been pushed or not
subOutlet.fail(ex)
// #25410 if we fail the stage here directly, the SubSource may not have been started yet,
// which can happen if upstream fails immediately after emitting a first value.
// The SubSource won't be started until the stream shuts down, which means downstream won't see the failure,
// scheduling it lets the interpreter first start the substream
getAsyncCallback[Throwable](failStage).invoke(ex)
2019-03-13 10:56:20 +01:00
}
})
subOutlet.setHandler(new OutHandler {
override def onPull(): Unit = {
if (firstElementPushed) {
pull(in)
} else {
// the demand can be satisfied right away by the cached element
firstElementPushed = true
subOutlet.push(firstElement)
// in.onUpstreamFinished was not propagated if it arrived before the cached element was pushed
// -> check if the completion must be propagated now
if (isClosed(in)) {
subOutlet.complete()
maybeCompleteStage()
}
}
}
override def onDownstreamFinish(cause: Throwable): Unit = {
if (!isClosed(in)) cancel(in, cause)
maybeCompleteStage()
}
})
matVal
2016-07-07 07:01:28 -04:00
}
}
(stageLogic, promise.future)
}
}