+str - Adds Sink.last and Sink.lastOption to mirror Sink.head and Sink.headOption
* Renames BlackholeSubscriber to SinkholeSunbscriber * Makes SinkholeSubscriber request Long.MaxValue * SinkholeSink seems like the best name ever
This commit is contained in:
parent
cb8d3c4609
commit
94fe1fb26d
13 changed files with 268 additions and 191 deletions
|
|
@ -1,17 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
|
||||||
*/
|
|
||||||
package akka.stream.tck
|
|
||||||
|
|
||||||
import akka.stream.impl.BlackholeSubscriber
|
|
||||||
import scala.concurrent.Promise
|
|
||||||
import org.reactivestreams.Publisher
|
|
||||||
import org.reactivestreams.Subscriber
|
|
||||||
|
|
||||||
class BlackholeSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
|
|
||||||
|
|
||||||
override def createSubscriber(): Subscriber[Int] = new BlackholeSubscriber[Int](2, Promise[Unit]())
|
|
||||||
|
|
||||||
override def createElement(element: Int): Int = element
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
@ -1,18 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
|
||||||
*/
|
|
||||||
package akka.stream.tck
|
|
||||||
|
|
||||||
import akka.stream.impl.HeadSink
|
|
||||||
import akka.stream.scaladsl._
|
|
||||||
import org.reactivestreams.Subscriber
|
|
||||||
|
|
||||||
import scala.concurrent.Promise
|
|
||||||
|
|
||||||
class HeadOptionSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
|
|
||||||
import HeadSink._
|
|
||||||
|
|
||||||
override def createSubscriber(): Subscriber[Int] = new HeadOptionSinkSubscriber[Int]
|
|
||||||
|
|
||||||
override def createElement(element: Int): Int = element
|
|
||||||
}
|
|
||||||
|
|
@ -0,0 +1,46 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.stream.tck
|
||||||
|
|
||||||
|
import akka.stream.impl.SinkholeSubscriber
|
||||||
|
import org.reactivestreams.tck.{ TestEnvironment, SubscriberWhiteboxVerification }
|
||||||
|
import org.reactivestreams.tck.SubscriberWhiteboxVerification.{ SubscriberPuppet, WhiteboxSubscriberProbe }
|
||||||
|
import org.scalatest.testng.{ TestNGSuiteLike }
|
||||||
|
import java.lang.{ Integer ⇒ JInt }
|
||||||
|
import scala.concurrent.Promise
|
||||||
|
import org.reactivestreams.{ Subscription, Subscriber }
|
||||||
|
|
||||||
|
class SinkholeSubscriberTest extends SubscriberWhiteboxVerification[JInt](new TestEnvironment()) with TestNGSuiteLike {
|
||||||
|
override def createSubscriber(probe: WhiteboxSubscriberProbe[JInt]): Subscriber[JInt] = {
|
||||||
|
new Subscriber[JInt] {
|
||||||
|
val hole = new SinkholeSubscriber[JInt](Promise[Unit]())
|
||||||
|
|
||||||
|
override def onError(t: Throwable): Unit = {
|
||||||
|
hole.onError(t)
|
||||||
|
probe.registerOnError(t)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onSubscribe(s: Subscription): Unit = {
|
||||||
|
probe.registerOnSubscribe(new SubscriberPuppet() {
|
||||||
|
override def triggerRequest(elements: Long): Unit = s.request(elements)
|
||||||
|
override def signalCancel(): Unit = s.cancel()
|
||||||
|
})
|
||||||
|
hole.onSubscribe(s)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onComplete(): Unit = {
|
||||||
|
hole.onComplete()
|
||||||
|
probe.registerOnComplete()
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onNext(t: JInt): Unit = {
|
||||||
|
hole.onNext(t)
|
||||||
|
probe.registerOnNext(t)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def createElement(element: Int): JInt = element
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -15,7 +15,7 @@ import scala.concurrent.Promise
|
||||||
import akka.stream.impl.SinkModule
|
import akka.stream.impl.SinkModule
|
||||||
import akka.stream.impl.StreamLayout.Module
|
import akka.stream.impl.StreamLayout.Module
|
||||||
import org.scalatest.concurrent.ScalaFutures
|
import org.scalatest.concurrent.ScalaFutures
|
||||||
import akka.stream.impl.BlackholeSubscriber
|
import akka.stream.impl.SinkholeSubscriber
|
||||||
|
|
||||||
object AttributesSpec {
|
object AttributesSpec {
|
||||||
|
|
||||||
|
|
@ -26,7 +26,7 @@ object AttributesSpec {
|
||||||
|
|
||||||
final class AttributesSink(val attributes: Attributes, shape: SinkShape[Nothing]) extends SinkModule[Nothing, Future[Attributes]](shape) {
|
final class AttributesSink(val attributes: Attributes, shape: SinkShape[Nothing]) extends SinkModule[Nothing, Future[Attributes]](shape) {
|
||||||
override def create(context: MaterializationContext) =
|
override def create(context: MaterializationContext) =
|
||||||
(new BlackholeSubscriber(0, Promise()), Future.successful(context.effectiveAttributes))
|
(new SinkholeSubscriber(Promise()), Future.successful(context.effectiveAttributes))
|
||||||
|
|
||||||
override protected def newInstance(shape: SinkShape[Nothing]): SinkModule[Nothing, Future[Attributes]] =
|
override protected def newInstance(shape: SinkShape[Nothing]): SinkModule[Nothing, Future[Attributes]] =
|
||||||
new AttributesSink(attributes, shape)
|
new AttributesSink(attributes, shape)
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,6 @@
|
||||||
package akka.stream.scaladsl
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
import org.reactivestreams.Subscriber
|
import org.reactivestreams.Subscriber
|
||||||
|
|
||||||
import scala.concurrent.Await
|
import scala.concurrent.Await
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
@ -49,27 +48,16 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
"yield the first error" in assertAllStagesStopped {
|
"yield the first error" in assertAllStagesStopped {
|
||||||
val p = TestPublisher.manualProbe[Int]()
|
|
||||||
val f = Source(p).runWith(Sink.head)
|
|
||||||
val proc = p.expectSubscription()
|
|
||||||
proc.expectRequest()
|
|
||||||
val ex = new RuntimeException("ex")
|
val ex = new RuntimeException("ex")
|
||||||
proc.sendError(ex)
|
intercept[RuntimeException] {
|
||||||
Await.ready(f, 100.millis)
|
Await.result(Source.failed[Int](ex).runWith(Sink.head), 1.second)
|
||||||
f.value.get should be(Failure(ex))
|
} should be theSameInstanceAs (ex)
|
||||||
}
|
}
|
||||||
|
|
||||||
"yield NoSuchElementExcption for empty stream" in assertAllStagesStopped {
|
"yield NoSuchElementException for empty stream" in assertAllStagesStopped {
|
||||||
val p = TestPublisher.manualProbe[Int]()
|
intercept[NoSuchElementException] {
|
||||||
val f = Source(p).runWith(Sink.head)
|
Await.result(Source.empty[Int].runWith(Sink.head), 1.second)
|
||||||
val proc = p.expectSubscription()
|
}.getMessage should be("head of empty stream")
|
||||||
proc.expectRequest()
|
|
||||||
proc.sendComplete()
|
|
||||||
Await.ready(f, 100.millis)
|
|
||||||
f.value.get match {
|
|
||||||
case Failure(e: NoSuchElementException) ⇒ e.getMessage should be("head of empty stream")
|
|
||||||
case x ⇒ fail("expected NoSuchElementException, got " + x)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
@ -86,23 +74,14 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
"yield the first error" in assertAllStagesStopped {
|
"yield the first error" in assertAllStagesStopped {
|
||||||
val p = TestPublisher.manualProbe[Int]()
|
|
||||||
val f = Source(p).runWith(Sink.head)
|
|
||||||
val proc = p.expectSubscription()
|
|
||||||
proc.expectRequest()
|
|
||||||
val ex = new RuntimeException("ex")
|
val ex = new RuntimeException("ex")
|
||||||
proc.sendError(ex)
|
intercept[RuntimeException] {
|
||||||
Await.ready(f, 100.millis)
|
Await.result(Source.failed[Int](ex).runWith(Sink.head), 1.second)
|
||||||
f.value.get should be(Failure(ex))
|
} should be theSameInstanceAs (ex)
|
||||||
}
|
}
|
||||||
|
|
||||||
"yield None for empty stream" in assertAllStagesStopped {
|
"yield None for empty stream" in assertAllStagesStopped {
|
||||||
val p = TestPublisher.manualProbe[Int]()
|
Await.result(Source.empty[Int].runWith(Sink.headOption), 1.second) should be(None)
|
||||||
val f = Source(p).runWith(Sink.headOption)
|
|
||||||
val proc = p.expectSubscription()
|
|
||||||
proc.expectRequest()
|
|
||||||
proc.sendComplete()
|
|
||||||
Await.result(f, 100.millis) should be(None)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,63 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.stream.scaladsl
|
||||||
|
|
||||||
|
import org.reactivestreams.Subscriber
|
||||||
|
|
||||||
|
import scala.concurrent.Await
|
||||||
|
import scala.concurrent.Future
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
import scala.util.Failure
|
||||||
|
|
||||||
|
import akka.stream.ActorMaterializer
|
||||||
|
import akka.stream.ActorMaterializerSettings
|
||||||
|
import akka.stream.testkit._
|
||||||
|
import akka.stream.testkit.Utils._
|
||||||
|
|
||||||
|
class LastSinkSpec extends AkkaSpec with ScriptedTest {
|
||||||
|
|
||||||
|
val settings = ActorMaterializerSettings(system)
|
||||||
|
|
||||||
|
implicit val materializer = ActorMaterializer(settings)
|
||||||
|
|
||||||
|
"A Flow with Sink.last" must {
|
||||||
|
|
||||||
|
"yield the last value" in assertAllStagesStopped {
|
||||||
|
Await.result(Source(1 to 42).map(identity).runWith(Sink.last), 1.second) should be(42)
|
||||||
|
}
|
||||||
|
|
||||||
|
"yield the first error" in assertAllStagesStopped {
|
||||||
|
val ex = new RuntimeException("ex")
|
||||||
|
intercept[RuntimeException] {
|
||||||
|
Await.result(Source.failed[Int](ex).runWith(Sink.last), 1.second)
|
||||||
|
} should be theSameInstanceAs (ex)
|
||||||
|
}
|
||||||
|
|
||||||
|
"yield NoSuchElementException for empty stream" in assertAllStagesStopped {
|
||||||
|
intercept[NoSuchElementException] {
|
||||||
|
Await.result(Source.empty[Int].runWith(Sink.last), 1.second)
|
||||||
|
}.getMessage should be("last of empty stream")
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
"A Flow with Sink.lastOption" must {
|
||||||
|
|
||||||
|
"yield the last value" in assertAllStagesStopped {
|
||||||
|
Await.result(Source(1 to 42).map(identity).runWith(Sink.lastOption), 1.second) should be(Some(42))
|
||||||
|
}
|
||||||
|
|
||||||
|
"yield the first error" in assertAllStagesStopped {
|
||||||
|
val ex = new RuntimeException("ex")
|
||||||
|
intercept[RuntimeException] {
|
||||||
|
Await.result(Source.failed[Int](ex).runWith(Sink.lastOption), 1.second)
|
||||||
|
} should be theSameInstanceAs (ex)
|
||||||
|
}
|
||||||
|
|
||||||
|
"yield None for empty stream" in assertAllStagesStopped {
|
||||||
|
Await.result(Source.empty[Int].runWith(Sink.lastOption), 1.second) should be(None)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -1,50 +0,0 @@
|
||||||
/**
|
|
||||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
|
||||||
*/
|
|
||||||
package akka.stream.impl
|
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
|
||||||
import scala.concurrent.Promise
|
|
||||||
import org.reactivestreams.{ Subscriber, Subscription }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* INTERNAL API
|
|
||||||
*/
|
|
||||||
|
|
||||||
private[akka] class BlackholeSubscriber[T](highWatermark: Int, onComplete: Promise[Unit]) extends Subscriber[T] {
|
|
||||||
|
|
||||||
private val lowWatermark: Int = Math.max(1, highWatermark / 2)
|
|
||||||
private var requested = 0L
|
|
||||||
private var subscription: Subscription = null
|
|
||||||
|
|
||||||
override def onSubscribe(sub: Subscription): Unit = {
|
|
||||||
ReactiveStreamsCompliance.requireNonNullSubscription(sub)
|
|
||||||
if (subscription ne null) sub.cancel()
|
|
||||||
else {
|
|
||||||
subscription = sub
|
|
||||||
requestMore()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override def onError(cause: Throwable): Unit = {
|
|
||||||
ReactiveStreamsCompliance.requireNonNullException(cause)
|
|
||||||
onComplete.tryFailure(cause)
|
|
||||||
}
|
|
||||||
|
|
||||||
override def onComplete(): Unit = {
|
|
||||||
onComplete.trySuccess(())
|
|
||||||
}
|
|
||||||
|
|
||||||
override def onNext(element: T): Unit = {
|
|
||||||
ReactiveStreamsCompliance.requireNonNullElement(element)
|
|
||||||
requested -= 1
|
|
||||||
requestMore()
|
|
||||||
}
|
|
||||||
|
|
||||||
protected def requestMore(): Unit =
|
|
||||||
if (requested < lowWatermark) {
|
|
||||||
val amount = highWatermark - requested
|
|
||||||
requested += amount
|
|
||||||
subscription.request(amount)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -0,0 +1,34 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||||
|
*/
|
||||||
|
package akka.stream.impl
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
import scala.concurrent.Promise
|
||||||
|
import org.reactivestreams.{ Subscriber, Subscription }
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
|
||||||
|
private[akka] final class SinkholeSubscriber[T](whenComplete: Promise[Unit]) extends Subscriber[T] {
|
||||||
|
private[this] var running: Boolean = false
|
||||||
|
|
||||||
|
override def onSubscribe(sub: Subscription): Unit = {
|
||||||
|
ReactiveStreamsCompliance.requireNonNullSubscription(sub)
|
||||||
|
if (running) sub.cancel()
|
||||||
|
else {
|
||||||
|
running = true
|
||||||
|
sub.request(Long.MaxValue)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onError(cause: Throwable): Unit = {
|
||||||
|
ReactiveStreamsCompliance.requireNonNullException(cause)
|
||||||
|
whenComplete.tryFailure(cause)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onComplete(): Unit = whenComplete.trySuccess(())
|
||||||
|
|
||||||
|
override def onNext(element: T): Unit = ReactiveStreamsCompliance.requireNonNullElement(element)
|
||||||
|
}
|
||||||
|
|
@ -3,18 +3,17 @@
|
||||||
*/
|
*/
|
||||||
package akka.stream.impl
|
package akka.stream.impl
|
||||||
|
|
||||||
import akka.actor.{ Deploy, ActorRef, Props }
|
import akka.actor.{ ActorRef, Props }
|
||||||
import akka.dispatch.ExecutionContexts
|
|
||||||
import akka.stream.actor.ActorPublisherMessage.Request
|
import akka.stream.actor.ActorPublisherMessage.Request
|
||||||
import akka.stream.impl.StreamLayout.Module
|
import akka.stream.impl.StreamLayout.Module
|
||||||
import akka.stream._
|
import akka.stream._
|
||||||
|
import akka.stream.stage.{ InHandler, GraphStageLogic, SinkStage }
|
||||||
import akka.util.Timeout
|
import akka.util.Timeout
|
||||||
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
|
import org.reactivestreams.{ Publisher, Subscriber }
|
||||||
import scala.annotation.unchecked.uncheckedVariance
|
import scala.annotation.unchecked.uncheckedVariance
|
||||||
import scala.concurrent.duration.{ FiniteDuration, _ }
|
import scala.concurrent.duration.{ FiniteDuration, _ }
|
||||||
import scala.concurrent.{ Future, Promise }
|
import scala.concurrent.{ Future, Promise }
|
||||||
import scala.language.postfixOps
|
import scala.language.postfixOps
|
||||||
import scala.util.Try
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -87,75 +86,22 @@ private[akka] final class FanoutPublisherSink[In](
|
||||||
new FanoutPublisherSink[In](attr, amendShape(attr))
|
new FanoutPublisherSink[In](attr, amendShape(attr))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* INTERNAL API
|
|
||||||
*/
|
|
||||||
private[akka] object HeadSink {
|
|
||||||
final class HeadOptionSinkSubscriber[In] extends Subscriber[In] {
|
|
||||||
private[this] var subscription: Subscription = null
|
|
||||||
private[this] val promise: Promise[Option[In]] = Promise[Option[In]]()
|
|
||||||
def future: Future[Option[In]] = promise.future
|
|
||||||
override def onSubscribe(s: Subscription): Unit = {
|
|
||||||
ReactiveStreamsCompliance.requireNonNullSubscription(s)
|
|
||||||
if (subscription ne null) s.cancel()
|
|
||||||
else {
|
|
||||||
subscription = s
|
|
||||||
s.request(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
override def onNext(elem: In): Unit = {
|
|
||||||
ReactiveStreamsCompliance.requireNonNullElement(elem)
|
|
||||||
promise.trySuccess(Some(elem))
|
|
||||||
subscription.cancel()
|
|
||||||
subscription = null
|
|
||||||
}
|
|
||||||
|
|
||||||
override def onError(t: Throwable): Unit = {
|
|
||||||
ReactiveStreamsCompliance.requireNonNullException(t)
|
|
||||||
promise.tryFailure(t)
|
|
||||||
}
|
|
||||||
|
|
||||||
override def onComplete(): Unit =
|
|
||||||
promise.trySuccess(None)
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* INTERNAL API
|
|
||||||
* Holds a [[scala.concurrent.Future]] that will be fulfilled with the first
|
|
||||||
* element that is signaled to this stream (wrapped in a [[Some]]),
|
|
||||||
* which can be either an element (after which the upstream subscription is canceled),
|
|
||||||
* an error condition (putting the Future into the corresponding failed state) or
|
|
||||||
* the end-of-stream (yielding [[None]]).
|
|
||||||
*/
|
|
||||||
private[akka] final class HeadOptionSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Future[Option[In]]](shape) {
|
|
||||||
override def create(context: MaterializationContext) = {
|
|
||||||
val sub = new HeadSink.HeadOptionSinkSubscriber[In]
|
|
||||||
(sub, sub.future)
|
|
||||||
}
|
|
||||||
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Future[Option[In]]] = new HeadOptionSink[In](attributes, shape)
|
|
||||||
override def withAttributes(attr: Attributes): Module = new HeadOptionSink[In](attr, amendShape(attr))
|
|
||||||
override def toString: String = "HeadOptionSink"
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
* Attaches a subscriber to this stream which will just discard all received
|
* Attaches a subscriber to this stream which will just discard all received
|
||||||
* elements.
|
* elements.
|
||||||
*/
|
*/
|
||||||
private[akka] final class BlackholeSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Future[Unit]](shape) {
|
private[akka] final class SinkholeSink(val attributes: Attributes, shape: SinkShape[Any]) extends SinkModule[Any, Future[Unit]](shape) {
|
||||||
|
|
||||||
override def create(context: MaterializationContext) = {
|
override def create(context: MaterializationContext) = {
|
||||||
val effectiveSettings = ActorMaterializer.downcast(context.materializer).effectiveSettings(context.effectiveAttributes)
|
val effectiveSettings = ActorMaterializer.downcast(context.materializer).effectiveSettings(context.effectiveAttributes)
|
||||||
val p = Promise[Unit]()
|
val p = Promise[Unit]()
|
||||||
(new BlackholeSubscriber[Any](effectiveSettings.maxInputBufferSize, p), p.future)
|
(new SinkholeSubscriber[Any](p), p.future)
|
||||||
}
|
}
|
||||||
|
|
||||||
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Unit]] = new BlackholeSink(attributes, shape)
|
override protected def newInstance(shape: SinkShape[Any]): SinkModule[Any, Future[Unit]] = new SinkholeSink(attributes, shape)
|
||||||
override def withAttributes(attr: Attributes): Module = new BlackholeSink(attr, amendShape(attr))
|
override def withAttributes(attr: Attributes): Module = new SinkholeSink(attr, amendShape(attr))
|
||||||
override def toString: String = "BlackholeSink"
|
override def toString: String = "SinkholeSink"
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -246,3 +192,58 @@ private[akka] final class AcknowledgeSink[In](bufferSize: Int, val attributes: A
|
||||||
new AcknowledgeSink[In](bufferSize, attr, amendShape(attr), timeout)
|
new AcknowledgeSink[In](bufferSize, attr, amendShape(attr), timeout)
|
||||||
override def toString: String = "AcknowledgeSink"
|
override def toString: String = "AcknowledgeSink"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private[akka] final class LastOptionStage[T] extends SinkStage[T, Future[Option[T]]]("lastOption") {
|
||||||
|
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
|
||||||
|
val p: Promise[Option[T]] = Promise()
|
||||||
|
(new GraphStageLogic(shape) {
|
||||||
|
override def preStart(): Unit = pull(in)
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
private[this] var prev: T = null.asInstanceOf[T]
|
||||||
|
|
||||||
|
override def onPush(): Unit = {
|
||||||
|
prev = grab(in)
|
||||||
|
pull(in)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onUpstreamFinish(): Unit = {
|
||||||
|
val head = prev
|
||||||
|
prev = null.asInstanceOf[T]
|
||||||
|
p.trySuccess(Option(head))
|
||||||
|
completeStage()
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onUpstreamFailure(ex: Throwable): Unit = {
|
||||||
|
prev = null.asInstanceOf[T]
|
||||||
|
p.tryFailure(ex)
|
||||||
|
failStage(ex)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}, p.future)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private[akka] final class HeadOptionStage[T] extends SinkStage[T, Future[Option[T]]]("headOption") {
|
||||||
|
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
|
||||||
|
val p: Promise[Option[T]] = Promise()
|
||||||
|
(new GraphStageLogic(shape) {
|
||||||
|
override def preStart(): Unit = pull(in)
|
||||||
|
setHandler(in, new InHandler {
|
||||||
|
override def onPush(): Unit = {
|
||||||
|
p.trySuccess(Option(grab(in)))
|
||||||
|
completeStage()
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onUpstreamFinish(): Unit = {
|
||||||
|
p.trySuccess(None)
|
||||||
|
completeStage()
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onUpstreamFailure(ex: Throwable): Unit = {
|
||||||
|
p.tryFailure(ex)
|
||||||
|
failStage(ex)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}, p.future)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -25,8 +25,6 @@ private[stream] object Stages {
|
||||||
object DefaultAttributes {
|
object DefaultAttributes {
|
||||||
val IODispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher")
|
val IODispatcher = ActorAttributes.Dispatcher("akka.stream.default-blocking-io-dispatcher")
|
||||||
|
|
||||||
val timerTransform = name("timerTransform")
|
|
||||||
val stageFactory = name("stageFactory")
|
|
||||||
val fused = name("fused")
|
val fused = name("fused")
|
||||||
val map = name("map")
|
val map = name("map")
|
||||||
val log = name("log")
|
val log = name("log")
|
||||||
|
|
@ -63,9 +61,6 @@ private[stream] object Stages {
|
||||||
val zip = name("zip")
|
val zip = name("zip")
|
||||||
val unzip = name("unzip")
|
val unzip = name("unzip")
|
||||||
val concat = name("concat")
|
val concat = name("concat")
|
||||||
val flexiMerge = name("flexiMerge")
|
|
||||||
val flexiRoute = name("flexiRoute")
|
|
||||||
val identityJunction = name("identityJunction")
|
|
||||||
val repeat = name("repeat")
|
val repeat = name("repeat")
|
||||||
|
|
||||||
val publisherSource = name("publisherSource")
|
val publisherSource = name("publisherSource")
|
||||||
|
|
@ -90,6 +85,8 @@ private[stream] object Stages {
|
||||||
val cancelledSink = name("cancelledSink")
|
val cancelledSink = name("cancelledSink")
|
||||||
val headSink = name("headSink") and inputBuffer(initial = 1, max = 1)
|
val headSink = name("headSink") and inputBuffer(initial = 1, max = 1)
|
||||||
val headOptionSink = name("headOptionSink") and inputBuffer(initial = 1, max = 1)
|
val headOptionSink = name("headOptionSink") and inputBuffer(initial = 1, max = 1)
|
||||||
|
val lastSink = name("lastSink")
|
||||||
|
val lastOptionSink = name("lastOptionSink")
|
||||||
val publisherSink = name("publisherSink")
|
val publisherSink = name("publisherSink")
|
||||||
val fanoutPublisherSink = name("fanoutPublisherSink")
|
val fanoutPublisherSink = name("fanoutPublisherSink")
|
||||||
val ignoreSink = name("ignoreSink")
|
val ignoreSink = name("ignoreSink")
|
||||||
|
|
|
||||||
|
|
@ -113,6 +113,27 @@ object Sink {
|
||||||
new Sink(scaladsl.Sink.headOption[In].mapMaterializedValue(
|
new Sink(scaladsl.Sink.headOption[In].mapMaterializedValue(
|
||||||
_.map(akka.japi.Option.fromScalaOption)(ExecutionContexts.sameThreadExecutionContext)))
|
_.map(akka.japi.Option.fromScalaOption)(ExecutionContexts.sameThreadExecutionContext)))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A `Sink` that materializes into a `Future` of the last value received.
|
||||||
|
* If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]].
|
||||||
|
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
|
||||||
|
*
|
||||||
|
* See also [[lastOption]].
|
||||||
|
*/
|
||||||
|
def last[In](): Sink[In, Future[In]] =
|
||||||
|
new Sink(scaladsl.Sink.last[In])
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A `Sink` that materializes into a `Future` of the optional last value received.
|
||||||
|
* If the stream completes before signaling at least a single element, the value of the Future will be an empty [[akka.japi.Option]].
|
||||||
|
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
|
||||||
|
*
|
||||||
|
* See also [[head]].
|
||||||
|
*/
|
||||||
|
def lastOption[In](): Sink[In, Future[akka.japi.Option[In]]] =
|
||||||
|
new Sink(scaladsl.Sink.lastOption[In].mapMaterializedValue(
|
||||||
|
_.map(akka.japi.Option.fromScalaOption)(ExecutionContexts.sameThreadExecutionContext)))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends the elements of the stream to the given `ActorRef`.
|
* Sends the elements of the stream to the given `ActorRef`.
|
||||||
* If the target actor terminates the stream will be canceled.
|
* If the target actor terminates the stream will be canceled.
|
||||||
|
|
|
||||||
|
|
@ -85,7 +85,8 @@ object Sink {
|
||||||
*
|
*
|
||||||
* See also [[headOption]].
|
* See also [[headOption]].
|
||||||
*/
|
*/
|
||||||
def head[T]: Sink[T, Future[T]] = new Sink[T, Future[Option[T]]](new HeadOptionSink[T](DefaultAttributes.headSink, shape("HeadSink")))
|
def head[T]: Sink[T, Future[T]] =
|
||||||
|
Sink.fromGraph(new HeadOptionStage[T]).withAttributes(DefaultAttributes.headSink)
|
||||||
.mapMaterializedValue(e ⇒ e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContexts.sameThreadExecutionContext))
|
.mapMaterializedValue(e ⇒ e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContexts.sameThreadExecutionContext))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -95,7 +96,27 @@ object Sink {
|
||||||
*
|
*
|
||||||
* See also [[head]].
|
* See also [[head]].
|
||||||
*/
|
*/
|
||||||
def headOption[T]: Sink[T, Future[Option[T]]] = new Sink(new HeadOptionSink[T](DefaultAttributes.headSink, shape("HeadOptionSink")))
|
def headOption[T]: Sink[T, Future[Option[T]]] =
|
||||||
|
Sink.fromGraph(new HeadOptionStage[T]).withAttributes(DefaultAttributes.headOptionSink)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A `Sink` that materializes into a `Future` of the last value received.
|
||||||
|
* If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]].
|
||||||
|
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
|
||||||
|
*
|
||||||
|
* See also [[lastOption]].
|
||||||
|
*/
|
||||||
|
def last[T]: Sink[T, Future[T]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastSink)
|
||||||
|
.mapMaterializedValue(e ⇒ e.map(_.getOrElse(throw new NoSuchElementException("last of empty stream")))(ExecutionContexts.sameThreadExecutionContext))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A `Sink` that materializes into a `Future` of the optional last value received.
|
||||||
|
* If the stream completes before signaling at least a single element, the value of the Future will be [[None]].
|
||||||
|
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
|
||||||
|
*
|
||||||
|
* See also [[last]].
|
||||||
|
*/
|
||||||
|
def lastOption[T]: Sink[T, Future[Option[T]]] = Sink.fromGraph(new LastOptionStage[T]).withAttributes(DefaultAttributes.lastOptionSink)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]].
|
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]].
|
||||||
|
|
@ -117,7 +138,7 @@ object Sink {
|
||||||
* A `Sink` that will consume the stream and discard the elements.
|
* A `Sink` that will consume the stream and discard the elements.
|
||||||
*/
|
*/
|
||||||
def ignore: Sink[Any, Future[Unit]] =
|
def ignore: Sink[Any, Future[Unit]] =
|
||||||
new Sink(new BlackholeSink(DefaultAttributes.ignoreSink, shape("BlackholeSink")))
|
new Sink(new SinkholeSink(DefaultAttributes.ignoreSink, shape("SinkholeSink")))
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
|
* A `Sink` that will invoke the given procedure for each received element. The sink is materialized
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue