diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 21a8bdac02..ce3576b635 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -3,18 +3,20 @@ */ package akka.stream.scaladsl -import scala.concurrent.Await +import akka.testkit.DefaultTimeout +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.time.{ Span, Millis } +import scala.concurrent.{ Future, Await } import scala.concurrent.duration._ -import scala.util.{ Success, Failure } +import scala.util.Failure import scala.util.control.NoStackTrace import akka.stream.{ SourceShape, ActorMaterializer } import akka.stream.testkit._ -import akka.stream.impl.{ PublisherSource, ReactiveStreamsCompliance } -import scala.concurrent.Future -class SourceSpec extends AkkaSpec { +class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures { implicit val materializer = ActorMaterializer() + implicit val config = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis)) "Single Source" must { "produce element" in { @@ -213,10 +215,9 @@ class SourceSpec extends AkkaSpec { "Repeat Source" must { "repeat as long as it takes" in { - import GraphDSL.Implicits._ - val result = Await.result(Source.repeat(42).grouped(10000).runWith(Sink.head), 1.second) - result.size should ===(10000) - result.toSet should ===(Set(42)) + val f = Source.repeat(42).grouped(1000).runWith(Sink.head) + f.futureValue.size should ===(1000) + f.futureValue.toSet should ===(Set(42)) } } @@ -224,36 +225,46 @@ class SourceSpec extends AkkaSpec { val expected = List(9227465, 5702887, 3524578, 2178309, 1346269, 832040, 514229, 317811, 196418, 121393, 75025, 46368, 28657, 17711, 10946, 6765, 4181, 2584, 1597, 987, 610, 377, 233, 144, 89, 55, 34, 21, 13, 8, 5, 3, 2, 1, 1, 0) "generate a finite fibonacci sequence" in { - val source = Source.unfold((0, 1)) { + Source.unfold((0, 1)) { case (a, _) if a > 10000000 ⇒ None case (a, b) ⇒ Some((b, a + b) → a) - } - val result = Await.result(source.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs }, 1.second) - result should ===(expected) + }.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs } + .futureValue should ===(expected) + } + + "terminate with a failure if there is an exception thrown" in { + val t = new RuntimeException("expected") + whenReady( + Source.unfold((0, 1)) { + case (a, _) if a > 10000000 ⇒ throw t + case (a, b) ⇒ Some((b, a + b) → a) + }.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs }.failed) { + _ should be theSameInstanceAs (t) + } } "generate a finite fibonacci sequence asynchronously" in { - val source = Source.unfoldAsync((0, 1)) { + Source.unfoldAsync((0, 1)) { case (a, _) if a > 10000000 ⇒ Future.successful(None) - case (a, b) ⇒ Future.successful(Some((b, a + b) → a)) - } - val result = Await.result(source.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs }, 1.second) - result should ===(expected) + case (a, b) ⇒ Future(Some((b, a + b) → a))(system.dispatcher) + }.runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs } + .futureValue should ===(expected) } - "generate an infinite fibonacci sequence" in { - val source = Source.unfoldInf((0, 1)) { - case (a, b) ⇒ (b, a + b) → a - } - val result = Await.result(source.take(36).runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs }, 1.second) - result should ===(expected) + "generate an unbounded fibonacci sequence" in { + Source.unfoldInf((0, 1))({ case (a, b) ⇒ (b, a + b) → a }) + .take(36) + .runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs } + .futureValue should ===(expected) } } "Iterator Source" must { "properly iterate" in { - val result = Await.result(Source.fromIterator(() ⇒ Iterator.iterate(false)(!_)).grouped(10).runWith(Sink.head), 1.second) - result should ===(Seq(false, true, false, true, false, true, false, true, false, true)) + Source.fromIterator(() ⇒ Iterator.iterate(false)(!_)) + .grouped(10) + .runWith(Sink.head) + .futureValue should ===(Seq(false, true, false, true, false, true, false, true, false, true)) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala index 0a8d9217e7..5a873fe54f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala @@ -10,19 +10,13 @@ import scala.concurrent.{ ExecutionContext, Future } import scala.util.{ Failure, Success, Try } /** - * Unfold `GraphStage` class - * @param s initial state - * @param f unfold function - * @tparam S state - * @tparam E element + * INTERNAL API */ -private[akka] class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphStage[SourceShape[E]] { - - val out: Outlet[E] = Outlet("Unfold") - +private[akka] final class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphStage[SourceShape[E]] { + val out: Outlet[E] = Outlet("Unfold.out") override val shape: SourceShape[E] = SourceShape(out) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private[this] var state = s @@ -36,36 +30,27 @@ private[akka] class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphSta } }) } - } } /** - * UnfoldAsync `GraphStage` class - * @param s initial state - * @param f unfold function - * @tparam S state - * @tparam E element + * INTERNAL API */ -private[akka] class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] { - - val out: Outlet[E] = Outlet("UnfoldAsync") - +private[akka] final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] { + val out: Outlet[E] = Outlet("UnfoldAsync.out") override val shape: SourceShape[E] = SourceShape(out) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private[this] var state = s - private[this] var asyncHandler: Function1[Try[Option[(S, E)]], Unit] = _ override def preStart() = { val ac = getAsyncCallback[Try[Option[(S, E)]]] { case Failure(ex) ⇒ fail(out, ex) case Success(None) ⇒ complete(out) - case Success(Some((newS, elem))) ⇒ { + case Success(Some((newS, elem))) ⇒ push(out, elem) state = newS - } } asyncHandler = ac.invoke } @@ -75,5 +60,4 @@ private[akka] class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) ext f(state).onComplete(asyncHandler)(akka.dispatch.ExecutionContexts.sameThreadExecutionContext) }) } - } }