+str 20129 add lazySink (#20579)

This commit is contained in:
Alexander Golubev 2016-07-07 07:01:28 -04:00 committed by Konrad Malawski
parent b6f6438e96
commit e0562abba9
5 changed files with 283 additions and 5 deletions

View file

@ -0,0 +1,147 @@
/**
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.stream.scaladsl
import java.util.concurrent.TimeoutException
import akka.stream.ActorAttributes.supervisionStrategy
import akka.stream.Supervision._
import akka.stream._
import akka.stream.testkit.TestPublisher
import akka.stream.testkit.TestSubscriber.Probe
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink
import akka.testkit.AkkaSpec
import scala.concurrent.{ Promise, Future, Await }
import scala.concurrent.duration._
class LazySinkSpec extends AkkaSpec {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 1, maxSize = 1)
implicit val materializer = ActorMaterializer(settings)
val fallback = () fail("Must not call fallback function")
val ex = TE("")
"A LazySink" must {
"work in happy case" in assertAllStagesStopped {
val futureProbe = Source(0 to 10).runWith(Sink.lazyInit[Int, Probe[Int]](_ Future.successful(TestSink.probe[Int]), fallback))
val probe = Await.result(futureProbe, 300.millis)
probe.request(100)
(0 to 10).foreach(probe.expectNext)
}
"work with slow sink init" in assertAllStagesStopped {
val p = Promise[Sink[Int, Probe[Int]]]()
val sourceProbe = TestPublisher.manualProbe[Int]()
val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ p.future, fallback))
val sourceSub = sourceProbe.expectSubscription()
sourceSub.expectRequest(1)
sourceSub.sendNext(0)
sourceSub.expectRequest(1)
sourceProbe.expectNoMsg(200.millis)
a[TimeoutException] shouldBe thrownBy { Await.result(futureProbe, 200.millis) }
p.success(TestSink.probe[Int])
val probe = Await.result(futureProbe, 300.millis)
probe.request(100)
probe.expectNext(0)
(1 to 10).foreach(i {
sourceSub.sendNext(i)
probe.expectNext(i)
})
sourceSub.sendComplete()
}
"complete when there was no elements in stream" in assertAllStagesStopped {
val futureProbe = Source.empty.runWith(Sink.lazyInit[Int, Future[Int]](_ Future.successful(Sink.fold[Int, Int](0)(_ + _)), () Future.successful(0)))
val futureResult = Await.result(futureProbe, 300.millis)
Await.result(futureResult, 300.millis) should ===(0)
}
"complete normally when upstream is completed" in assertAllStagesStopped {
val futureProbe = Source.single(1).runWith(Sink.lazyInit[Int, Probe[Int]](_ Future.successful(TestSink.probe[Int]), fallback))
val futureResult = Await.result(futureProbe, 300.millis)
futureResult.request(1)
.expectNext(1)
.expectComplete()
}
"failed gracefully when sink factory method failed" in assertAllStagesStopped {
val sourceProbe = TestPublisher.manualProbe[Int]()
val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ throw ex, fallback))
val sourceSub = sourceProbe.expectSubscription()
sourceSub.expectRequest(1)
sourceSub.sendNext(0)
sourceSub.expectCancellation()
a[RuntimeException] shouldBe thrownBy { Await.result(futureProbe, 300.millis) }
}
"failed gracefully when upstream failed" in assertAllStagesStopped {
val sourceProbe = TestPublisher.manualProbe[Int]()
val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ Future.successful(TestSink.probe[Int]), fallback))
val sourceSub = sourceProbe.expectSubscription()
sourceSub.expectRequest(1)
sourceSub.sendNext(0)
val probe = Await.result(futureProbe, 300.millis)
probe.request(1)
.expectNext(0)
sourceSub.sendError(ex)
probe.expectError(ex)
}
"failed gracefully when factory future failed" in assertAllStagesStopped {
val sourceProbe = TestPublisher.manualProbe[Int]()
val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ Future.failed(ex), fallback)
.withAttributes(supervisionStrategy(stoppingDecider)))
val sourceSub = sourceProbe.expectSubscription()
sourceSub.expectRequest(1)
sourceSub.sendNext(0)
a[TE] shouldBe thrownBy { Await.result(futureProbe, 300.millis) }
}
"cancel upstream when internal sink is cancelled" in assertAllStagesStopped {
val sourceProbe = TestPublisher.manualProbe[Int]()
val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](_ Future.successful(TestSink.probe[Int]), fallback))
val sourceSub = sourceProbe.expectSubscription()
sourceSub.expectRequest(1)
sourceSub.sendNext(0)
sourceSub.expectRequest(1)
val probe = Await.result(futureProbe, 300.millis)
probe.request(1)
.expectNext(0)
probe.cancel()
sourceSub.expectCancellation()
}
"continue if supervision is resume" in assertAllStagesStopped {
val sourceProbe = TestPublisher.manualProbe[Int]()
val futureProbe = Source.fromPublisher(sourceProbe).runWith(Sink.lazyInit[Int, Probe[Int]](a
if (a == 0) throw ex else Future.successful(TestSink.probe[Int]), fallback)
.withAttributes(supervisionStrategy(resumingDecider)))
val sourceSub = sourceProbe.expectSubscription()
sourceSub.expectRequest(1)
sourceSub.sendNext(0)
sourceSub.expectRequest(1)
sourceSub.sendNext(1)
val probe = Await.result(futureProbe, 300.millis)
probe.request(1)
probe.expectNext(1)
probe.cancel()
}
"fail future when zero throws exception" in assertAllStagesStopped {
val futureProbe = Source.empty.runWith(Sink.lazyInit[Int, Future[Int]](_ Future.successful(Sink.fold[Int, Int](0)(_ + _)), () throw ex))
a[TE] shouldBe thrownBy { Await.result(futureProbe, 300.millis) }
}
}
}

View file

@ -3,7 +3,11 @@
*/
package akka.stream.impl
import akka.dispatch.ExecutionContexts
import akka.stream.ActorAttributes.SupervisionStrategy
import akka.stream.Supervision.{ stoppingDecider, Stop }
import akka.stream.impl.QueueSink.{ Output, Pull }
import akka.stream.impl.fusing.GraphInterpreter
import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Actor, Props }
import akka.stream.Attributes.InputBuffer
@ -20,11 +24,11 @@ import akka.stream.stage._
import org.reactivestreams.{ Publisher, Subscriber }
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable
import scala.concurrent.{ Promise, Future }
import scala.concurrent.{ ExecutionContext, Promise, Future }
import scala.language.postfixOps
import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try }
import akka.stream.scaladsl.{ SinkQueueWithCancel, SinkQueue }
import akka.stream.scaladsl.{ Source, Sink, SinkQueueWithCancel, SinkQueue }
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters._
import scala.compat.java8.OptionConverters._
@ -444,3 +448,101 @@ private[akka] final class ReducerState[T, R](val collector: java.util.stream.Col
def finish(): R = collector.finisher().apply(reduced)
}
/**
* INTERNAL API
*/
final private[stream] class LazySink[T, M](sinkFactory: T Future[Sink[T, M]], zeroMat: () M) extends GraphStageWithMaterializedValue[SinkShape[T], Future[M]] {
val in = Inlet[T]("lazySink.in")
override def initialAttributes = DefaultAttributes.lazySink
override val shape: SinkShape[T] = SinkShape.of(in)
override def toString: String = "LazySink"
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
lazy val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(stoppingDecider)
val promise = Promise[M]()
val stageLogic = new GraphStageLogic(shape) with InHandler {
override def preStart(): Unit = pull(in)
override def onPush(): Unit = {
try {
val element = grab(in)
val cb: AsyncCallback[Try[Sink[T, M]]] = getAsyncCallback {
case Success(sink) initInternalSource(sink, element)
case Failure(e) failure(e)
}
sinkFactory(element).onComplete { cb.invoke }(ExecutionContexts.sameThreadExecutionContext)
} catch {
case NonFatal(e) decider(e) match {
case Supervision.Stop failure(e)
case _ pull(in)
}
}
}
private def failure(ex: Throwable): Unit = {
failStage(ex)
promise.failure(ex)
}
override def onUpstreamFinish(): Unit = {
completeStage()
promise.tryComplete(Try(zeroMat()))
}
override def onUpstreamFailure(ex: Throwable): Unit = failure(ex)
setHandler(in, this)
private def initInternalSource(sink: Sink[T, M], firstElement: T): Unit = {
val sourceOut = new SubSourceOutlet[T]("LazySink")
var completed = false
def switchToFirstElementHandlers(): Unit = {
sourceOut.setHandler(new OutHandler {
override def onPull(): Unit = {
sourceOut.push(firstElement)
if (completed) internalSourceComplete() else switchToFinalHandlers()
}
override def onDownstreamFinish(): Unit = internalSourceComplete()
})
setHandler(in, new InHandler {
override def onPush(): Unit = sourceOut.push(grab(in))
override def onUpstreamFinish(): Unit = {
setKeepGoing(true)
completed = true
}
override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex)
})
}
def switchToFinalHandlers(): Unit = {
sourceOut.setHandler(new OutHandler {
override def onPull(): Unit = pull(in)
override def onDownstreamFinish(): Unit = internalSourceComplete()
})
setHandler(in, new InHandler {
override def onPush(): Unit = sourceOut.push(grab(in))
override def onUpstreamFinish(): Unit = internalSourceComplete()
override def onUpstreamFailure(ex: Throwable): Unit = internalSourceFailure(ex)
})
}
def internalSourceComplete(): Unit = {
sourceOut.complete()
completeStage()
}
def internalSourceFailure(ex: Throwable): Unit = {
sourceOut.fail(ex)
failStage(ex)
}
switchToFirstElementHandlers()
promise.trySuccess(Source.fromGraph(sourceOut.source).runWith(sink)(interpreter.subFusingMaterializer))
}
}
(stageLogic, promise.future)
}
}

View file

@ -127,6 +127,7 @@ private[stream] object Stages {
val actorRefWithAck = name("actorRefWithAckSink")
val actorSubscriberSink = name("actorSubscriberSink")
val queueSink = name("queueSink")
val lazySink = name("lazySink")
val outputStreamSink = name("outputStreamSink") and IODispatcher
val inputStreamSink = name("inputStreamSink") and IODispatcher
val fileSink = name("fileSink") and IODispatcher

View file

@ -8,15 +8,14 @@ import akka.{ Done, NotUsed }
import akka.actor.{ ActorRef, Props }
import akka.dispatch.ExecutionContexts
import akka.japi.function
import akka.stream.impl.StreamLayout
import akka.stream.impl.{ LazySink, StreamLayout, SinkQueueAdapter }
import akka.stream.{ javadsl, scaladsl, _ }
import org.reactivestreams.{ Publisher, Subscriber }
import scala.compat.java8.OptionConverters._
import scala.concurrent.{ Future, ExecutionContext }
import scala.util.Try
import java.util.concurrent.CompletionStage
import scala.compat.java8.FutureConverters.FutureOps
import akka.stream.impl.SinkQueueAdapter
import scala.compat.java8.FutureConverters._
/** Java API */
object Sink {
@ -247,6 +246,21 @@ object Sink {
*/
def queue[T](): Sink[T, SinkQueueWithCancel[T]] =
new Sink(scaladsl.Sink.queue[T]().mapMaterializedValue(new SinkQueueAdapter(_)))
/**
* Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements,
* because of completion or error.
*
* If `sinkFactory` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Stop]] the `Future` will be completed with failure. For all other supervision options it will
* try to create sink with next element
*
* `fallback` will be executed when there was no elements and completed is received from upstream.
*/
def lazyInit[T, M](sinkFactory: function.Function[T, CompletionStage[Sink[T, M]]], fallback: function.Creator[M]): Sink[T, CompletionStage[M]] =
new Sink(scaladsl.Sink.lazyInit[T, M](
t sinkFactory.apply(t).toScala.map(_.asScala)(ExecutionContexts.sameThreadExecutionContext),
() fallback.create()).mapMaterializedValue(_.toJava))
}
/**

View file

@ -348,4 +348,18 @@ object Sink {
*/
def queue[T](): Sink[T, SinkQueueWithCancel[T]] =
Sink.fromGraph(new QueueSink())
/**
* Creates a real `Sink` upon receiving the first element. Internal `Sink` will not be created if there are no elements,
* because of completion or error.
*
* If `sinkFactory` throws an exception and the supervision decision is
* [[akka.stream.Supervision.Stop]] the `Future` will be completed with failure. For all other supervision options it will
* try to create sink with next element
*
* `fallback` will be executed when there was no elements and completed is received from upstream.
*/
def lazyInit[T, M](sinkFactory: T Future[Sink[T, M]], fallback: () M): Sink[T, Future[M]] =
Sink.fromGraph(new LazySink(sinkFactory, fallback))
}