+str #18906 add Sink.headOption

This commit is contained in:
Konrad Malawski 2015-11-10 15:15:59 +01:00
parent 985e3b1c2f
commit 8650d0c473
8 changed files with 92 additions and 33 deletions

View file

@ -82,26 +82,19 @@ trait RouteTestResultComponent {
case s: HttpEntity.Strict () s case s: HttpEntity.Strict () s
case HttpEntity.Default(contentType, contentLength, data) case HttpEntity.Default(contentType, contentLength, data)
val dataChunks = awaitAllElements(data); val dataChunks = awaitAllElements(data); { () HttpEntity.Default(contentType, contentLength, Source(dataChunks)) }
{ () HttpEntity.Default(contentType, contentLength, Source(dataChunks)) }
case HttpEntity.CloseDelimited(contentType, data) case HttpEntity.CloseDelimited(contentType, data)
val dataChunks = awaitAllElements(data); val dataChunks = awaitAllElements(data); { () HttpEntity.CloseDelimited(contentType, Source(dataChunks)) }
{ () HttpEntity.CloseDelimited(contentType, Source(dataChunks)) }
case HttpEntity.Chunked(contentType, chunks) case HttpEntity.Chunked(contentType, data)
val dataChunks = awaitAllElements(chunks); val dataChunks = awaitAllElements(data); { () HttpEntity.Chunked(contentType, Source(dataChunks)) }
{ () HttpEntity.Chunked(contentType, Source(dataChunks)) }
} }
private def failNeitherCompletedNorRejected(): Nothing = private def failNeitherCompletedNorRejected(): Nothing =
failTest("Request was neither completed nor rejected within " + timeout) failTest("Request was neither completed nor rejected within " + timeout)
private def awaitAllElements[T](data: Source[T, _]): immutable.Seq[T] = { private def awaitAllElements[T](data: Source[T, _]): immutable.Seq[T] =
data.grouped(100000).runWith(Sink.head).recover({ data.grouped(100000).runWith(Sink.headOption).awaitResult(timeout).getOrElse(Nil)
case e: NoSuchElementException Nil
})(ExecutionContexts.sameThreadExecutionContext)
.awaitResult(timeout)
}
} }
} }

View file

@ -20,8 +20,8 @@ class StreamingResponseSpecs extends RoutingSpec {
val route = complete(response) val route = complete(response)
Get() ~> route ~> check { Get() ~> route ~> check {
status should ===(OK) status should ===(StatusCodes.OK)
responseAs[String] shouldEqual "" responseAs[String] should === ("")
} }
} }

View file

@ -9,10 +9,10 @@ import org.reactivestreams.Subscriber
import scala.concurrent.Promise import scala.concurrent.Promise
class HeadSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { class HeadOptionSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
import HeadSink._ import HeadSink._
override def createSubscriber(): Subscriber[Int] = new HeadSinkSubscriber[Int] override def createSubscriber(): Subscriber[Int] = new HeadOptionSinkSubscriber[Int]
override def createElement(element: Int): Int = element override def createElement(element: Int): Int = element
} }

View file

@ -67,11 +67,44 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest {
proc.sendComplete() proc.sendComplete()
Await.ready(f, 100.millis) Await.ready(f, 100.millis)
f.value.get match { f.value.get match {
case Failure(e: NoSuchElementException) e.getMessage should be("empty stream") case Failure(e: NoSuchElementException) e.getMessage should be("head of empty stream")
case x fail("expected NoSuchElementException, got " + x) case x fail("expected NoSuchElementException, got " + x)
} }
} }
} }
"A Flow with Sink.headOption" must {
"yield the first value" in assertAllStagesStopped {
val p = TestPublisher.manualProbe[Int]()
val f: Future[Option[Int]] = Source(p).map(identity).runWith(Sink.headOption)
val proc = p.expectSubscription()
proc.expectRequest()
proc.sendNext(42)
Await.result(f, 100.millis) should be(Some(42))
proc.expectCancellation()
}
"yield the first error" in assertAllStagesStopped {
val p = TestPublisher.manualProbe[Int]()
val f = Source(p).runWith(Sink.head)
val proc = p.expectSubscription()
proc.expectRequest()
val ex = new RuntimeException("ex")
proc.sendError(ex)
Await.ready(f, 100.millis)
f.value.get should be(Failure(ex))
}
"yield None for empty stream" in assertAllStagesStopped {
val p = TestPublisher.manualProbe[Int]()
val f = Source(p).runWith(Sink.headOption)
val proc = p.expectSubscription()
proc.expectRequest()
proc.sendComplete()
Await.result(f, 100.millis) should be(None)
}
}
} }

View file

@ -4,6 +4,7 @@
package akka.stream.impl package akka.stream.impl
import akka.actor.{ Deploy, ActorRef, Props } import akka.actor.{ Deploy, ActorRef, Props }
import akka.dispatch.ExecutionContexts
import akka.stream.actor.ActorPublisherMessage.Request import akka.stream.actor.ActorPublisherMessage.Request
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.stream._ import akka.stream._
@ -90,10 +91,10 @@ private[akka] final class FanoutPublisherSink[In](
* INTERNAL API * INTERNAL API
*/ */
private[akka] object HeadSink { private[akka] object HeadSink {
final class HeadSinkSubscriber[In] extends Subscriber[In] { final class HeadOptionSinkSubscriber[In] extends Subscriber[In] {
private[this] var subscription: Subscription = null private[this] var subscription: Subscription = null
private[this] val promise: Promise[In] = Promise[In]() private[this] val promise: Promise[Option[In]] = Promise[Option[In]]()
def future: Future[In] = promise.future def future: Future[Option[In]] = promise.future
override def onSubscribe(s: Subscription): Unit = { override def onSubscribe(s: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(s) ReactiveStreamsCompliance.requireNonNullSubscription(s)
if (subscription ne null) s.cancel() if (subscription ne null) s.cancel()
@ -105,7 +106,7 @@ private[akka] object HeadSink {
override def onNext(elem: In): Unit = { override def onNext(elem: In): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(elem) ReactiveStreamsCompliance.requireNonNullElement(elem)
promise.trySuccess(elem) promise.trySuccess(Some(elem))
subscription.cancel() subscription.cancel()
subscription = null subscription = null
} }
@ -116,7 +117,7 @@ private[akka] object HeadSink {
} }
override def onComplete(): Unit = override def onComplete(): Unit =
promise.tryFailure(new NoSuchElementException("empty stream")) promise.trySuccess(None)
} }
} }
@ -124,19 +125,19 @@ private[akka] object HeadSink {
/** /**
* INTERNAL API * INTERNAL API
* Holds a [[scala.concurrent.Future]] that will be fulfilled with the first * Holds a [[scala.concurrent.Future]] that will be fulfilled with the first
* thing that is signaled to this stream, which can be either an element (after * element that is signaled to this stream (wrapped in a [[Some]]),
* which the upstream subscription is canceled), an error condition (putting * which can be either an element (after which the upstream subscription is canceled),
* the Future into the corresponding failed state) or the end-of-stream * an error condition (putting the Future into the corresponding failed state) or
* (failing the Future with a NoSuchElementException). * the end-of-stream (yielding [[None]]).
*/ */
private[akka] final class HeadSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Future[In]](shape) { private[akka] final class HeadOptionSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Future[Option[In]]](shape) {
override def create(context: MaterializationContext) = { override def create(context: MaterializationContext) = {
val sub = new HeadSink.HeadSinkSubscriber[In] val sub = new HeadSink.HeadOptionSinkSubscriber[In]
(sub, sub.future) (sub, sub.future)
} }
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Future[In]] = new HeadSink[In](attributes, shape) override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Future[Option[In]]] = new HeadOptionSink[In](attributes, shape)
override def withAttributes(attr: Attributes): Module = new HeadSink[In](attr, amendShape(attr)) override def withAttributes(attr: Attributes): Module = new HeadOptionSink[In](attr, amendShape(attr))
override def toString: String = "HeadSink" override def toString: String = "HeadOptionSink"
} }
/** /**

View file

@ -87,6 +87,7 @@ private[stream] object Stages {
val subscriberSink = name("subscriberSink") val subscriberSink = name("subscriberSink")
val cancelledSink = name("cancelledSink") val cancelledSink = name("cancelledSink")
val headSink = name("headSink") and inputBuffer(initial = 1, max = 1) val headSink = name("headSink") and inputBuffer(initial = 1, max = 1)
val headOptionSink = name("headOptionSink") and inputBuffer(initial = 1, max = 1)
val publisherSink = name("publisherSink") val publisherSink = name("publisherSink")
val fanoutPublisherSink = name("fanoutPublisherSink") val fanoutPublisherSink = name("fanoutPublisherSink")
val ignoreSink = name("ignoreSink") val ignoreSink = name("ignoreSink")

View file

@ -4,6 +4,7 @@
package akka.stream.javadsl package akka.stream.javadsl
import akka.actor.{ ActorRef, Props } import akka.actor.{ ActorRef, Props }
import akka.dispatch.ExecutionContexts
import akka.japi.function import akka.japi.function
import akka.stream.impl.StreamLayout import akka.stream.impl.StreamLayout
import akka.stream.{ javadsl, scaladsl, _ } import akka.stream.{ javadsl, scaladsl, _ }
@ -90,10 +91,25 @@ object Sink {
/** /**
* A `Sink` that materializes into a `Future` of the first value received. * A `Sink` that materializes into a `Future` of the first value received.
* If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]].
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
*
* See also [[headOption]].
*/ */
def head[In](): Sink[In, Future[In]] = def head[In](): Sink[In, Future[In]] =
new Sink(scaladsl.Sink.head[In]) new Sink(scaladsl.Sink.head[In])
/**
* A `Sink` that materializes into a `Future` of the optional first value received.
* If the stream completes before signaling at least a single element, the value of the Future will be an empty [[akka.japi.Option]].
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
*
* See also [[head]].
*/
def headOption[In](): Sink[In, Future[akka.japi.Option[In]]] =
new Sink(scaladsl.Sink.headOption[In].mapMaterializedValue(
_.map(akka.japi.Option.fromScalaOption)(ExecutionContexts.sameThreadExecutionContext)))
/** /**
* Sends the elements of the stream to the given `ActorRef`. * Sends the elements of the stream to the given `ActorRef`.
* If the target actor terminates the stream will be canceled. * If the target actor terminates the stream will be canceled.

View file

@ -4,6 +4,7 @@
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.actor.{ ActorRef, Props } import akka.actor.{ ActorRef, Props }
import akka.dispatch.ExecutionContexts
import akka.stream.actor.ActorSubscriber import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
@ -75,8 +76,22 @@ object Sink {
/** /**
* A `Sink` that materializes into a `Future` of the first value received. * A `Sink` that materializes into a `Future` of the first value received.
* If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]].
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
*
* See also [[headOption]].
*/ */
def head[T]: Sink[T, Future[T]] = new Sink(new HeadSink[T](DefaultAttributes.headSink, shape("HeadSink"))) def head[T]: Sink[T, Future[T]] = new Sink[T, Future[Option[T]]](new HeadOptionSink[T](DefaultAttributes.headSink, shape("HeadSink")))
.mapMaterializedValue(e e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContexts.sameThreadExecutionContext))
/**
* A `Sink` that materializes into a `Future` of the optional first value received.
* If the stream completes before signaling at least a single element, the value of the Future will be [[None]].
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
*
* See also [[head]].
*/
def headOption[T]: Sink[T, Future[Option[T]]] = new Sink(new HeadOptionSink[T](DefaultAttributes.headSink, shape("HeadOptionSink")))
/** /**
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]]. * A `Sink` that materializes into a [[org.reactivestreams.Publisher]].