diff --git a/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/RouteTestResultComponent.scala b/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/RouteTestResultComponent.scala index 609e6f8236..6d12b01f9c 100644 --- a/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/RouteTestResultComponent.scala +++ b/akka-http-testkit/src/main/scala/akka/http/scaladsl/testkit/RouteTestResultComponent.scala @@ -82,26 +82,19 @@ trait RouteTestResultComponent { case s: HttpEntity.Strict ⇒ () ⇒ s case HttpEntity.Default(contentType, contentLength, data) ⇒ - val dataChunks = awaitAllElements(data); - { () ⇒ HttpEntity.Default(contentType, contentLength, Source(dataChunks)) } + val dataChunks = awaitAllElements(data); { () ⇒ HttpEntity.Default(contentType, contentLength, Source(dataChunks)) } case HttpEntity.CloseDelimited(contentType, data) ⇒ - val dataChunks = awaitAllElements(data); - { () ⇒ HttpEntity.CloseDelimited(contentType, Source(dataChunks)) } + val dataChunks = awaitAllElements(data); { () ⇒ HttpEntity.CloseDelimited(contentType, Source(dataChunks)) } - case HttpEntity.Chunked(contentType, chunks) ⇒ - val dataChunks = awaitAllElements(chunks); - { () ⇒ HttpEntity.Chunked(contentType, Source(dataChunks)) } + case HttpEntity.Chunked(contentType, data) ⇒ + val dataChunks = awaitAllElements(data); { () ⇒ HttpEntity.Chunked(contentType, Source(dataChunks)) } } private def failNeitherCompletedNorRejected(): Nothing = failTest("Request was neither completed nor rejected within " + timeout) - private def awaitAllElements[T](data: Source[T, _]): immutable.Seq[T] = { - data.grouped(100000).runWith(Sink.head).recover({ - case e: NoSuchElementException ⇒ Nil - })(ExecutionContexts.sameThreadExecutionContext) - .awaitResult(timeout) - } + private def awaitAllElements[T](data: Source[T, _]): immutable.Seq[T] = + data.grouped(100000).runWith(Sink.headOption).awaitResult(timeout).getOrElse(Nil) } } \ No newline at end of file diff --git a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/StreamingResponseSpecs.scala b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/StreamingResponseSpecs.scala index ca8ce7d645..c5b5c19f45 100644 --- a/akka-http-tests/src/test/scala/akka/http/scaladsl/server/StreamingResponseSpecs.scala +++ b/akka-http-tests/src/test/scala/akka/http/scaladsl/server/StreamingResponseSpecs.scala @@ -20,8 +20,8 @@ class StreamingResponseSpecs extends RoutingSpec { val route = complete(response) Get() ~> route ~> check { - status should ===(OK) - responseAs[String] shouldEqual "" + status should ===(StatusCodes.OK) + responseAs[String] should === ("") } } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/HeadSinkSubscriberTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/HeadOptionSinkSubscriberTest.scala similarity index 64% rename from akka-stream-tck/src/test/scala/akka/stream/tck/HeadSinkSubscriberTest.scala rename to akka-stream-tck/src/test/scala/akka/stream/tck/HeadOptionSinkSubscriberTest.scala index d56527cfb3..ceaca5f123 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/HeadSinkSubscriberTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/HeadOptionSinkSubscriberTest.scala @@ -9,10 +9,10 @@ import org.reactivestreams.Subscriber import scala.concurrent.Promise -class HeadSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { +class HeadOptionSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] { import HeadSink._ - override def createSubscriber(): Subscriber[Int] = new HeadSinkSubscriber[Int] + override def createSubscriber(): Subscriber[Int] = new HeadOptionSinkSubscriber[Int] override def createElement(element: Int): Int = element } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala index 52c3516b57..1d7df4564e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/HeadSinkSpec.scala @@ -67,11 +67,44 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest { proc.sendComplete() Await.ready(f, 100.millis) f.value.get match { - case Failure(e: NoSuchElementException) ⇒ e.getMessage should be("empty stream") + case Failure(e: NoSuchElementException) ⇒ e.getMessage should be("head of empty stream") case x ⇒ fail("expected NoSuchElementException, got " + x) } } } + "A Flow with Sink.headOption" must { + + "yield the first value" in assertAllStagesStopped { + val p = TestPublisher.manualProbe[Int]() + val f: Future[Option[Int]] = Source(p).map(identity).runWith(Sink.headOption) + val proc = p.expectSubscription() + proc.expectRequest() + proc.sendNext(42) + Await.result(f, 100.millis) should be(Some(42)) + proc.expectCancellation() + } + + "yield the first error" in assertAllStagesStopped { + val p = TestPublisher.manualProbe[Int]() + val f = Source(p).runWith(Sink.head) + val proc = p.expectSubscription() + proc.expectRequest() + val ex = new RuntimeException("ex") + proc.sendError(ex) + Await.ready(f, 100.millis) + f.value.get should be(Failure(ex)) + } + + "yield None for empty stream" in assertAllStagesStopped { + val p = TestPublisher.manualProbe[Int]() + val f = Source(p).runWith(Sink.headOption) + val proc = p.expectSubscription() + proc.expectRequest() + proc.sendComplete() + Await.result(f, 100.millis) should be(None) + } + + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 8cf2d6c34f..31ee8fa994 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -4,6 +4,7 @@ package akka.stream.impl import akka.actor.{ Deploy, ActorRef, Props } +import akka.dispatch.ExecutionContexts import akka.stream.actor.ActorPublisherMessage.Request import akka.stream.impl.StreamLayout.Module import akka.stream._ @@ -90,10 +91,10 @@ private[akka] final class FanoutPublisherSink[In]( * INTERNAL API */ private[akka] object HeadSink { - final class HeadSinkSubscriber[In] extends Subscriber[In] { + final class HeadOptionSinkSubscriber[In] extends Subscriber[In] { private[this] var subscription: Subscription = null - private[this] val promise: Promise[In] = Promise[In]() - def future: Future[In] = promise.future + private[this] val promise: Promise[Option[In]] = Promise[Option[In]]() + def future: Future[Option[In]] = promise.future override def onSubscribe(s: Subscription): Unit = { ReactiveStreamsCompliance.requireNonNullSubscription(s) if (subscription ne null) s.cancel() @@ -105,7 +106,7 @@ private[akka] object HeadSink { override def onNext(elem: In): Unit = { ReactiveStreamsCompliance.requireNonNullElement(elem) - promise.trySuccess(elem) + promise.trySuccess(Some(elem)) subscription.cancel() subscription = null } @@ -116,7 +117,7 @@ private[akka] object HeadSink { } override def onComplete(): Unit = - promise.tryFailure(new NoSuchElementException("empty stream")) + promise.trySuccess(None) } } @@ -124,19 +125,19 @@ private[akka] object HeadSink { /** * INTERNAL API * Holds a [[scala.concurrent.Future]] that will be fulfilled with the first - * thing that is signaled to this stream, which can be either an element (after - * which the upstream subscription is canceled), an error condition (putting - * the Future into the corresponding failed state) or the end-of-stream - * (failing the Future with a NoSuchElementException). + * element that is signaled to this stream (wrapped in a [[Some]]), + * which can be either an element (after which the upstream subscription is canceled), + * an error condition (putting the Future into the corresponding failed state) or + * the end-of-stream (yielding [[None]]). */ -private[akka] final class HeadSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Future[In]](shape) { +private[akka] final class HeadOptionSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Future[Option[In]]](shape) { override def create(context: MaterializationContext) = { - val sub = new HeadSink.HeadSinkSubscriber[In] + val sub = new HeadSink.HeadOptionSinkSubscriber[In] (sub, sub.future) } - override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Future[In]] = new HeadSink[In](attributes, shape) - override def withAttributes(attr: Attributes): Module = new HeadSink[In](attr, amendShape(attr)) - override def toString: String = "HeadSink" + override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Future[Option[In]]] = new HeadOptionSink[In](attributes, shape) + override def withAttributes(attr: Attributes): Module = new HeadOptionSink[In](attr, amendShape(attr)) + override def toString: String = "HeadOptionSink" } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 6a0e9d9fcc..e67fbc01fd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -87,6 +87,7 @@ private[stream] object Stages { val subscriberSink = name("subscriberSink") val cancelledSink = name("cancelledSink") val headSink = name("headSink") and inputBuffer(initial = 1, max = 1) + val headOptionSink = name("headOptionSink") and inputBuffer(initial = 1, max = 1) val publisherSink = name("publisherSink") val fanoutPublisherSink = name("fanoutPublisherSink") val ignoreSink = name("ignoreSink") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index 7298f87d77..257d78fc10 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -4,6 +4,7 @@ package akka.stream.javadsl import akka.actor.{ ActorRef, Props } +import akka.dispatch.ExecutionContexts import akka.japi.function import akka.stream.impl.StreamLayout import akka.stream.{ javadsl, scaladsl, _ } @@ -90,10 +91,25 @@ object Sink { /** * A `Sink` that materializes into a `Future` of the first value received. + * If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]]. + * If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception. + * + * See also [[headOption]]. */ def head[In](): Sink[In, Future[In]] = new Sink(scaladsl.Sink.head[In]) + /** + * A `Sink` that materializes into a `Future` of the optional first value received. + * If the stream completes before signaling at least a single element, the value of the Future will be an empty [[akka.japi.Option]]. + * If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception. + * + * See also [[head]]. + */ + def headOption[In](): Sink[In, Future[akka.japi.Option[In]]] = + new Sink(scaladsl.Sink.headOption[In].mapMaterializedValue( + _.map(akka.japi.Option.fromScalaOption)(ExecutionContexts.sameThreadExecutionContext))) + /** * Sends the elements of the stream to the given `ActorRef`. * If the target actor terminates the stream will be canceled. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index ef8a82e4b6..22f7993056 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -4,6 +4,7 @@ package akka.stream.scaladsl import akka.actor.{ ActorRef, Props } +import akka.dispatch.ExecutionContexts import akka.stream.actor.ActorSubscriber import akka.stream.impl.Stages.DefaultAttributes import akka.stream.impl.StreamLayout.Module @@ -75,8 +76,22 @@ object Sink { /** * A `Sink` that materializes into a `Future` of the first value received. + * If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]]. + * If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception. + * + * See also [[headOption]]. */ - def head[T]: Sink[T, Future[T]] = new Sink(new HeadSink[T](DefaultAttributes.headSink, shape("HeadSink"))) + def head[T]: Sink[T, Future[T]] = new Sink[T, Future[Option[T]]](new HeadOptionSink[T](DefaultAttributes.headSink, shape("HeadSink"))) + .mapMaterializedValue(e ⇒ e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContexts.sameThreadExecutionContext)) + + /** + * A `Sink` that materializes into a `Future` of the optional first value received. + * If the stream completes before signaling at least a single element, the value of the Future will be [[None]]. + * If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception. + * + * See also [[head]]. + */ + def headOption[T]: Sink[T, Future[Option[T]]] = new Sink(new HeadOptionSink[T](DefaultAttributes.headSink, shape("HeadOptionSink"))) /** * A `Sink` that materializes into a [[org.reactivestreams.Publisher]].