=str - Adds minor touchups to Unfold & UnfoldAsync

* Makes the classes final
* Sprinkles pixiedust
* Switches to ScalaFutures in the test
This commit is contained in:
Viktor Klang 2015-12-14 12:28:49 +01:00 committed by Roland Kuhn
parent ddceadff7b
commit 0f3d3c21e1
2 changed files with 46 additions and 51 deletions

View file

@ -3,18 +3,20 @@
*/
package akka.stream.scaladsl
import scala.concurrent.Await
import akka.testkit.DefaultTimeout
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{ Span, Millis }
import scala.concurrent.{ Future, Await }
import scala.concurrent.duration._
import scala.util.{ Success, Failure }
import scala.util.Failure
import scala.util.control.NoStackTrace
import akka.stream.{ SourceShape, ActorMaterializer }
import akka.stream.testkit._
import akka.stream.impl.{ PublisherSource, ReactiveStreamsCompliance }
import scala.concurrent.Future
class SourceSpec extends AkkaSpec {
class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures {
implicit val materializer = ActorMaterializer()
implicit val config = PatienceConfig(timeout = Span(timeout.duration.toMillis, Millis))
"Single Source" must {
"produce element" in {
@ -213,10 +215,9 @@ class SourceSpec extends AkkaSpec {
"Repeat Source" must {
"repeat as long as it takes" in {
import GraphDSL.Implicits._
val result = Await.result(Source.repeat(42).grouped(10000).runWith(Sink.head), 1.second)
result.size should ===(10000)
result.toSet should ===(Set(42))
val f = Source.repeat(42).grouped(1000).runWith(Sink.head)
f.futureValue.size should ===(1000)
f.futureValue.toSet should ===(Set(42))
}
}
@ -224,36 +225,46 @@ class SourceSpec extends AkkaSpec {
val expected = List(9227465, 5702887, 3524578, 2178309, 1346269, 832040, 514229, 317811, 196418, 121393, 75025, 46368, 28657, 17711, 10946, 6765, 4181, 2584, 1597, 987, 610, 377, 233, 144, 89, 55, 34, 21, 13, 8, 5, 3, 2, 1, 1, 0)
"generate a finite fibonacci sequence" in {
val source = Source.unfold((0, 1)) {
Source.unfold((0, 1)) {
case (a, _) if a > 10000000 None
case (a, b) Some((b, a + b) a)
}.runFold(List.empty[Int]) { case (xs, x) x :: xs }
.futureValue should ===(expected)
}
"terminate with a failure if there is an exception thrown" in {
val t = new RuntimeException("expected")
whenReady(
Source.unfold((0, 1)) {
case (a, _) if a > 10000000 throw t
case (a, b) Some((b, a + b) a)
}.runFold(List.empty[Int]) { case (xs, x) x :: xs }.failed) {
_ should be theSameInstanceAs (t)
}
val result = Await.result(source.runFold(List.empty[Int]) { case (xs, x) x :: xs }, 1.second)
result should ===(expected)
}
"generate a finite fibonacci sequence asynchronously" in {
val source = Source.unfoldAsync((0, 1)) {
Source.unfoldAsync((0, 1)) {
case (a, _) if a > 10000000 Future.successful(None)
case (a, b) Future.successful(Some((b, a + b) a))
}
val result = Await.result(source.runFold(List.empty[Int]) { case (xs, x) x :: xs }, 1.second)
result should ===(expected)
case (a, b) Future(Some((b, a + b) a))(system.dispatcher)
}.runFold(List.empty[Int]) { case (xs, x) x :: xs }
.futureValue should ===(expected)
}
"generate an infinite fibonacci sequence" in {
val source = Source.unfoldInf((0, 1)) {
case (a, b) (b, a + b) a
}
val result = Await.result(source.take(36).runFold(List.empty[Int]) { case (xs, x) x :: xs }, 1.second)
result should ===(expected)
"generate an unbounded fibonacci sequence" in {
Source.unfoldInf((0, 1))({ case (a, b) (b, a + b) a })
.take(36)
.runFold(List.empty[Int]) { case (xs, x) x :: xs }
.futureValue should ===(expected)
}
}
"Iterator Source" must {
"properly iterate" in {
val result = Await.result(Source.fromIterator(() Iterator.iterate(false)(!_)).grouped(10).runWith(Sink.head), 1.second)
result should ===(Seq(false, true, false, true, false, true, false, true, false, true))
Source.fromIterator(() Iterator.iterate(false)(!_))
.grouped(10)
.runWith(Sink.head)
.futureValue should ===(Seq(false, true, false, true, false, true, false, true, false, true))
}
}

View file

@ -10,19 +10,13 @@ import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
/**
* Unfold `GraphStage` class
* @param s initial state
* @param f unfold function
* @tparam S state
* @tparam E element
* INTERNAL API
*/
private[akka] class Unfold[S, E](s: S, f: S Option[(S, E)]) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("Unfold")
private[akka] final class Unfold[S, E](s: S, f: S Option[(S, E)]) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("Unfold.out")
override val shape: SourceShape[E] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private[this] var state = s
@ -37,36 +31,27 @@ private[akka] class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphSta
})
}
}
}
/**
* UnfoldAsync `GraphStage` class
* @param s initial state
* @param f unfold function
* @tparam S state
* @tparam E element
* INTERNAL API
*/
private[akka] class UnfoldAsync[S, E](s: S, f: S Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("UnfoldAsync")
private[akka] final class UnfoldAsync[S, E](s: S, f: S Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("UnfoldAsync.out")
override val shape: SourceShape[E] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
private[this] var state = s
private[this] var asyncHandler: Function1[Try[Option[(S, E)]], Unit] = _
override def preStart() = {
val ac = getAsyncCallback[Try[Option[(S, E)]]] {
case Failure(ex) fail(out, ex)
case Success(None) complete(out)
case Success(Some((newS, elem))) {
case Success(Some((newS, elem)))
push(out, elem)
state = newS
}
}
asyncHandler = ac.invoke
}
@ -76,4 +61,3 @@ private[akka] class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) ext
})
}
}
}