From e62a61fe916d676aca35cbb007bbcbff14d04e7d Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 1 Apr 2014 19:35:47 +0200 Subject: [PATCH] !str handle empty stream toFuture --- .../src/main/scala/akka/stream/impl/FlowImpl.scala | 4 +++- .../test/scala/akka/stream/FlowToFutureSpec.scala | 13 +++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index 467df1ce41..80fd4c19d8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -74,7 +74,9 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: def toFuture(generator: ProcessorGenerator): Future[O] = { val p = Promise[O]() - transformRecover(0)((x, in) ⇒ { p complete in; 1 -> Nil }, isComplete = _ == 1).consume(generator) + transformRecover(0)((x, in) ⇒ { p complete in; 1 -> Nil }, + onComplete = _ ⇒ { p.tryFailure(new NoSuchElementException("empty stream")); Nil }, + isComplete = _ == 1).consume(generator) p.future } diff --git a/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala index fbfc8b0e90..9b6664fd48 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala @@ -43,6 +43,19 @@ class FlowToFutureSpec extends AkkaSpec with ScriptedTest { f.value.get should be(Failure(ex)) } + "yield NoSuchElementExcption for empty stream" in { + val p = StreamTestKit.producerProbe[Int] + val f = Flow(p).toFuture(gen) + val proc = p.expectSubscription + proc.expectRequestMore() + proc.sendComplete() + Await.ready(f, 100.millis) + f.value.get match { + case Failure(e: NoSuchElementException) ⇒ e.getMessage() should be("empty stream") + case x ⇒ fail("expected NoSuchElementException, got " + x) + } + } + } } \ No newline at end of file