Merge pull request #18905 from ktoso/wip-throws-in-tests-empty-stream-ktoso
=str #18902 in tests (due to grouped+Sink.head) empty response stream…
This commit is contained in:
commit
89461c2b2a
8 changed files with 121 additions and 27 deletions
|
|
@ -5,9 +5,11 @@
|
|||
package akka.http.scaladsl.testkit
|
||||
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import akka.dispatch.ExecutionContexts
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.{ Await, ExecutionContext }
|
||||
import akka.stream.Materializer
|
||||
import akka.stream.scaladsl._
|
||||
import akka.http.scaladsl.model.HttpEntity.ChunkStreamPart
|
||||
|
|
@ -80,22 +82,19 @@ trait RouteTestResultComponent {
|
|||
case s: HttpEntity.Strict ⇒ () ⇒ s
|
||||
|
||||
case HttpEntity.Default(contentType, contentLength, data) ⇒
|
||||
val dataChunks = awaitAllElements(data);
|
||||
{ () ⇒ HttpEntity.Default(contentType, contentLength, Source(dataChunks)) }
|
||||
val dataChunks = awaitAllElements(data); { () ⇒ HttpEntity.Default(contentType, contentLength, Source(dataChunks)) }
|
||||
|
||||
case HttpEntity.CloseDelimited(contentType, data) ⇒
|
||||
val dataChunks = awaitAllElements(data);
|
||||
{ () ⇒ HttpEntity.CloseDelimited(contentType, Source(dataChunks)) }
|
||||
val dataChunks = awaitAllElements(data); { () ⇒ HttpEntity.CloseDelimited(contentType, Source(dataChunks)) }
|
||||
|
||||
case HttpEntity.Chunked(contentType, chunks) ⇒
|
||||
val dataChunks = awaitAllElements(chunks);
|
||||
{ () ⇒ HttpEntity.Chunked(contentType, Source(dataChunks)) }
|
||||
case HttpEntity.Chunked(contentType, data) ⇒
|
||||
val dataChunks = awaitAllElements(data); { () ⇒ HttpEntity.Chunked(contentType, Source(dataChunks)) }
|
||||
}
|
||||
|
||||
private def failNeitherCompletedNorRejected(): Nothing =
|
||||
failTest("Request was neither completed nor rejected within " + timeout)
|
||||
|
||||
private def awaitAllElements[T](data: Source[T, _]): immutable.Seq[T] =
|
||||
data.grouped(100000).runWith(Sink.head).awaitResult(timeout)
|
||||
data.grouped(100000).runWith(Sink.headOption).awaitResult(timeout).getOrElse(Nil)
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
|
||||
package akka.http.scaladsl.server
|
||||
|
||||
import akka.http.scaladsl.model.{ ContentTypes, HttpEntity, HttpResponse, StatusCodes }
|
||||
import akka.stream.scaladsl.Source
|
||||
import akka.util.ByteString
|
||||
|
||||
class StreamingResponseSpecs extends RoutingSpec {
|
||||
|
||||
"streaming ByteString responses" should {
|
||||
"should render empty string if stream was empty" in {
|
||||
import StatusCodes._
|
||||
|
||||
val src = Source.empty[ByteString]
|
||||
val entity = HttpEntity.Chunked.fromData(ContentTypes.`application/json`, src)
|
||||
val response = HttpResponse(status = StatusCodes.OK, entity = entity)
|
||||
val route = complete(response)
|
||||
|
||||
Get() ~> route ~> check {
|
||||
status should ===(StatusCodes.OK)
|
||||
responseAs[String] should === ("")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -9,10 +9,10 @@ import org.reactivestreams.Subscriber
|
|||
|
||||
import scala.concurrent.Promise
|
||||
|
||||
class HeadSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
|
||||
class HeadOptionSinkSubscriberTest extends AkkaSubscriberBlackboxVerification[Int] {
|
||||
import HeadSink._
|
||||
|
||||
override def createSubscriber(): Subscriber[Int] = new HeadSinkSubscriber[Int]
|
||||
override def createSubscriber(): Subscriber[Int] = new HeadOptionSinkSubscriber[Int]
|
||||
|
||||
override def createElement(element: Int): Int = element
|
||||
}
|
||||
|
|
@ -67,11 +67,44 @@ class HeadSinkSpec extends AkkaSpec with ScriptedTest {
|
|||
proc.sendComplete()
|
||||
Await.ready(f, 100.millis)
|
||||
f.value.get match {
|
||||
case Failure(e: NoSuchElementException) ⇒ e.getMessage should be("empty stream")
|
||||
case Failure(e: NoSuchElementException) ⇒ e.getMessage should be("head of empty stream")
|
||||
case x ⇒ fail("expected NoSuchElementException, got " + x)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
"A Flow with Sink.headOption" must {
|
||||
|
||||
"yield the first value" in assertAllStagesStopped {
|
||||
val p = TestPublisher.manualProbe[Int]()
|
||||
val f: Future[Option[Int]] = Source(p).map(identity).runWith(Sink.headOption)
|
||||
val proc = p.expectSubscription()
|
||||
proc.expectRequest()
|
||||
proc.sendNext(42)
|
||||
Await.result(f, 100.millis) should be(Some(42))
|
||||
proc.expectCancellation()
|
||||
}
|
||||
|
||||
"yield the first error" in assertAllStagesStopped {
|
||||
val p = TestPublisher.manualProbe[Int]()
|
||||
val f = Source(p).runWith(Sink.head)
|
||||
val proc = p.expectSubscription()
|
||||
proc.expectRequest()
|
||||
val ex = new RuntimeException("ex")
|
||||
proc.sendError(ex)
|
||||
Await.ready(f, 100.millis)
|
||||
f.value.get should be(Failure(ex))
|
||||
}
|
||||
|
||||
"yield None for empty stream" in assertAllStagesStopped {
|
||||
val p = TestPublisher.manualProbe[Int]()
|
||||
val f = Source(p).runWith(Sink.headOption)
|
||||
val proc = p.expectSubscription()
|
||||
proc.expectRequest()
|
||||
proc.sendComplete()
|
||||
Await.result(f, 100.millis) should be(None)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
package akka.stream.impl
|
||||
|
||||
import akka.actor.{ Deploy, ActorRef, Props }
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import akka.stream.actor.ActorPublisherMessage.Request
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
import akka.stream._
|
||||
|
|
@ -90,10 +91,10 @@ private[akka] final class FanoutPublisherSink[In](
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object HeadSink {
|
||||
final class HeadSinkSubscriber[In] extends Subscriber[In] {
|
||||
final class HeadOptionSinkSubscriber[In] extends Subscriber[In] {
|
||||
private[this] var subscription: Subscription = null
|
||||
private[this] val promise: Promise[In] = Promise[In]()
|
||||
def future: Future[In] = promise.future
|
||||
private[this] val promise: Promise[Option[In]] = Promise[Option[In]]()
|
||||
def future: Future[Option[In]] = promise.future
|
||||
override def onSubscribe(s: Subscription): Unit = {
|
||||
ReactiveStreamsCompliance.requireNonNullSubscription(s)
|
||||
if (subscription ne null) s.cancel()
|
||||
|
|
@ -105,7 +106,7 @@ private[akka] object HeadSink {
|
|||
|
||||
override def onNext(elem: In): Unit = {
|
||||
ReactiveStreamsCompliance.requireNonNullElement(elem)
|
||||
promise.trySuccess(elem)
|
||||
promise.trySuccess(Some(elem))
|
||||
subscription.cancel()
|
||||
subscription = null
|
||||
}
|
||||
|
|
@ -116,7 +117,7 @@ private[akka] object HeadSink {
|
|||
}
|
||||
|
||||
override def onComplete(): Unit =
|
||||
promise.tryFailure(new NoSuchElementException("empty stream"))
|
||||
promise.trySuccess(None)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -124,19 +125,19 @@ private[akka] object HeadSink {
|
|||
/**
|
||||
* INTERNAL API
|
||||
* Holds a [[scala.concurrent.Future]] that will be fulfilled with the first
|
||||
* thing that is signaled to this stream, which can be either an element (after
|
||||
* which the upstream subscription is canceled), an error condition (putting
|
||||
* the Future into the corresponding failed state) or the end-of-stream
|
||||
* (failing the Future with a NoSuchElementException).
|
||||
* element that is signaled to this stream (wrapped in a [[Some]]),
|
||||
* which can be either an element (after which the upstream subscription is canceled),
|
||||
* an error condition (putting the Future into the corresponding failed state) or
|
||||
* the end-of-stream (yielding [[None]]).
|
||||
*/
|
||||
private[akka] final class HeadSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Future[In]](shape) {
|
||||
private[akka] final class HeadOptionSink[In](val attributes: Attributes, shape: SinkShape[In]) extends SinkModule[In, Future[Option[In]]](shape) {
|
||||
override def create(context: MaterializationContext) = {
|
||||
val sub = new HeadSink.HeadSinkSubscriber[In]
|
||||
val sub = new HeadSink.HeadOptionSinkSubscriber[In]
|
||||
(sub, sub.future)
|
||||
}
|
||||
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Future[In]] = new HeadSink[In](attributes, shape)
|
||||
override def withAttributes(attr: Attributes): Module = new HeadSink[In](attr, amendShape(attr))
|
||||
override def toString: String = "HeadSink"
|
||||
override protected def newInstance(shape: SinkShape[In]): SinkModule[In, Future[Option[In]]] = new HeadOptionSink[In](attributes, shape)
|
||||
override def withAttributes(attr: Attributes): Module = new HeadOptionSink[In](attr, amendShape(attr))
|
||||
override def toString: String = "HeadOptionSink"
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -87,6 +87,7 @@ private[stream] object Stages {
|
|||
val subscriberSink = name("subscriberSink")
|
||||
val cancelledSink = name("cancelledSink")
|
||||
val headSink = name("headSink") and inputBuffer(initial = 1, max = 1)
|
||||
val headOptionSink = name("headOptionSink") and inputBuffer(initial = 1, max = 1)
|
||||
val publisherSink = name("publisherSink")
|
||||
val fanoutPublisherSink = name("fanoutPublisherSink")
|
||||
val ignoreSink = name("ignoreSink")
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
package akka.stream.javadsl
|
||||
|
||||
import akka.actor.{ ActorRef, Props }
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import akka.japi.function
|
||||
import akka.stream.impl.StreamLayout
|
||||
import akka.stream.{ javadsl, scaladsl, _ }
|
||||
|
|
@ -90,10 +91,25 @@ object Sink {
|
|||
|
||||
/**
|
||||
* A `Sink` that materializes into a `Future` of the first value received.
|
||||
* If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]].
|
||||
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
|
||||
*
|
||||
* See also [[headOption]].
|
||||
*/
|
||||
def head[In](): Sink[In, Future[In]] =
|
||||
new Sink(scaladsl.Sink.head[In])
|
||||
|
||||
/**
|
||||
* A `Sink` that materializes into a `Future` of the optional first value received.
|
||||
* If the stream completes before signaling at least a single element, the value of the Future will be an empty [[akka.japi.Option]].
|
||||
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
|
||||
*
|
||||
* See also [[head]].
|
||||
*/
|
||||
def headOption[In](): Sink[In, Future[akka.japi.Option[In]]] =
|
||||
new Sink(scaladsl.Sink.headOption[In].mapMaterializedValue(
|
||||
_.map(akka.japi.Option.fromScalaOption)(ExecutionContexts.sameThreadExecutionContext)))
|
||||
|
||||
/**
|
||||
* Sends the elements of the stream to the given `ActorRef`.
|
||||
* If the target actor terminates the stream will be canceled.
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
package akka.stream.scaladsl
|
||||
|
||||
import akka.actor.{ ActorRef, Props }
|
||||
import akka.dispatch.ExecutionContexts
|
||||
import akka.stream.actor.ActorSubscriber
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
|
|
@ -75,8 +76,22 @@ object Sink {
|
|||
|
||||
/**
|
||||
* A `Sink` that materializes into a `Future` of the first value received.
|
||||
* If the stream completes before signaling at least a single element, the Future will be failed with a [[NoSuchElementException]].
|
||||
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
|
||||
*
|
||||
* See also [[headOption]].
|
||||
*/
|
||||
def head[T]: Sink[T, Future[T]] = new Sink(new HeadSink[T](DefaultAttributes.headSink, shape("HeadSink")))
|
||||
def head[T]: Sink[T, Future[T]] = new Sink[T, Future[Option[T]]](new HeadOptionSink[T](DefaultAttributes.headSink, shape("HeadSink")))
|
||||
.mapMaterializedValue(e ⇒ e.map(_.getOrElse(throw new NoSuchElementException("head of empty stream")))(ExecutionContexts.sameThreadExecutionContext))
|
||||
|
||||
/**
|
||||
* A `Sink` that materializes into a `Future` of the optional first value received.
|
||||
* If the stream completes before signaling at least a single element, the value of the Future will be [[None]].
|
||||
* If the stream signals an error errors before signaling at least a single element, the Future will be failed with the streams exception.
|
||||
*
|
||||
* See also [[head]].
|
||||
*/
|
||||
def headOption[T]: Sink[T, Future[Option[T]]] = new Sink(new HeadOptionSink[T](DefaultAttributes.headSink, shape("HeadOptionSink")))
|
||||
|
||||
/**
|
||||
* A `Sink` that materializes into a [[org.reactivestreams.Publisher]].
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue