-str - Removes Source.unfoldInf

Motivation: Since it can be easily implemented inline on top of unfold
This commit is contained in:
Viktor Klang 2016-01-11 17:15:44 +01:00
parent 22ccbcea39
commit d65efc35c7
6 changed files with 18 additions and 37 deletions

View file

@ -10,7 +10,7 @@ import org.scalactic.ConversionCheckedTripleEquals
import akka.stream.Attributes._ import akka.stream.Attributes._
import akka.stream.Fusing.FusedGraph import akka.stream.Fusing.FusedGraph
import scala.annotation.tailrec import scala.annotation.tailrec
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.{ CopiedModule, Module }
import org.scalatest.concurrent.ScalaFutures import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._ import scala.concurrent.duration._
import akka.stream.impl.fusing.GraphInterpreter import akka.stream.impl.fusing.GraphInterpreter
@ -23,7 +23,7 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple
implicit val patience = PatienceConfig(1.second) implicit val patience = PatienceConfig(1.second)
def graph(async: Boolean) = def graph(async: Boolean) =
Source.unfoldInf(1)(x (x, x)).filter(_ % 2 == 1) Source.unfold(1)(x Some(x -> x)).filter(_ % 2 == 1)
.alsoTo(Flow[Int].fold(0)(_ + _).to(Sink.head.named("otherSink")).addAttributes(if (async) Attributes.asyncBoundary else Attributes.none)) .alsoTo(Flow[Int].fold(0)(_ + _).to(Sink.head.named("otherSink")).addAttributes(if (async) Attributes.asyncBoundary else Attributes.none))
.via(Flow[Int].fold(1)(_ + _).named("mainSink")) .via(Flow[Int].fold(1)(_ + _).named("mainSink"))
@ -36,13 +36,13 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple
@tailrec def rec(curr: Module): Unit = { @tailrec def rec(curr: Module): Unit = {
if (Debug) println(extractName(curr, "unknown")) if (Debug) println(extractName(curr, "unknown"))
if (curr.attributes.contains(to)) () // done curr match {
else { case CopiedModule(_, attributes, copyOf) if (attributes and copyOf.attributes).contains(to) ()
case other if other.attributes.contains(to) ()
case _
val outs = curr.inPorts.map(ups) val outs = curr.inPorts.map(ups)
outs.size should ===(1) outs.size should ===(1)
val out = outs.head rec(owner(outs.head))
val next = owner(out)
rec(next)
} }
} }
@ -57,8 +57,8 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple
module.downstreams.size should ===(modules - 1) module.downstreams.size should ===(modules - 1)
module.info.downstreams.size should be >= downstreams module.info.downstreams.size should be >= downstreams
module.info.upstreams.size should be >= downstreams module.info.upstreams.size should be >= downstreams
singlePath(fused, Attributes.Name("mainSink"), Attributes.Name("unfoldInf")) singlePath(fused, Attributes.Name("mainSink"), Attributes.Name("unfold"))
singlePath(fused, Attributes.Name("otherSink"), Attributes.Name("unfoldInf")) singlePath(fused, Attributes.Name("otherSink"), Attributes.Name("unfold"))
} }
"fuse a moderately complex graph" in { "fuse a moderately complex graph" in {

View file

@ -252,7 +252,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures {
} }
"generate an unbounded fibonacci sequence" in { "generate an unbounded fibonacci sequence" in {
Source.unfoldInf((0, 1))({ case (a, b) (b, a + b) a }) Source.unfold((0, 1))({ case (a, b) Some((b, a + b) a) })
.take(36) .take(36)
.runFold(List.empty[Int]) { case (xs, x) x :: xs } .runFold(List.empty[Int]) { case (xs, x) x :: xs }
.futureValue should ===(expected) .futureValue should ===(expected)

View file

@ -65,7 +65,6 @@ private[stream] object Stages {
val repeat = name("repeat") val repeat = name("repeat")
val unfold = name("unfold") val unfold = name("unfold")
val unfoldAsync = name("unfoldAsync") val unfoldAsync = name("unfoldAsync")
val unfoldInf = name("unfoldInf")
val publisherSource = name("publisherSource") val publisherSource = name("publisherSource")
val iterableSource = name("iterableSource") val iterableSource = name("iterableSource")

View file

@ -3,10 +3,11 @@
*/ */
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.stage.{ OutHandler, GraphStageLogic, GraphStage } import akka.stream.stage.{ OutHandler, GraphStageLogic, GraphStage }
import akka.stream._ import akka.stream._
import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.Future
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
/** /**
@ -15,7 +16,7 @@ import scala.util.{ Failure, Success, Try }
private[akka] final class Unfold[S, E](s: S, f: S Option[(S, E)]) extends GraphStage[SourceShape[E]] { private[akka] final class Unfold[S, E](s: S, f: S Option[(S, E)]) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("Unfold.out") val out: Outlet[E] = Outlet("Unfold.out")
override val shape: SourceShape[E] = SourceShape(out) override val shape: SourceShape[E] = SourceShape(out)
override def initialAttributes: Attributes = DefaultAttributes.unfold
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) { new GraphStageLogic(shape) {
private[this] var state = s private[this] var state = s
@ -38,7 +39,7 @@ private[akka] final class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends Gr
private[akka] final class UnfoldAsync[S, E](s: S, f: S Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] { private[akka] final class UnfoldAsync[S, E](s: S, f: S Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("UnfoldAsync.out") val out: Outlet[E] = Outlet("UnfoldAsync.out")
override val shape: SourceShape[E] = SourceShape(out) override val shape: SourceShape[E] = SourceShape(out)
override def initialAttributes: Attributes = DefaultAttributes.unfoldAsync
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) { new GraphStageLogic(shape) {
private[this] var state = s private[this] var state = s

View file

@ -184,13 +184,6 @@ object Source {
def unfoldAsync[S, E](s: S, f: function.Function[S, Future[Option[(S, E)]]]): Source[E, Unit] = def unfoldAsync[S, E](s: S, f: function.Function[S, Future[Option[(S, E)]]]): Source[E, Unit] =
new Source(scaladsl.Source.unfoldAsync(s)((s: S) f.apply(s))) new Source(scaladsl.Source.unfoldAsync(s)((s: S) f.apply(s)))
/**
* Simpler [[unfold]], for infinite sequences.
*/
def unfoldInf[S, E](s: S, f: function.Function[S, (S, E)]): Source[E, Unit] = {
new Source(scaladsl.Source.unfoldInf(s)((s: S) f.apply(s)))
}
/** /**
* Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`. * Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`.
*/ */

View file

@ -260,7 +260,7 @@ object Source {
* }}} * }}}
*/ */
def unfold[S, E](s: S)(f: S Option[(S, E)]): Source[E, Unit] = def unfold[S, E](s: S)(f: S Option[(S, E)]): Source[E, Unit] =
Source.fromGraph(new Unfold(s, f)).withAttributes(DefaultAttributes.unfold) Source.fromGraph(new Unfold(s, f))
/** /**
* Same as [[unfold]], but uses an async function to generate the next state-element tuple. * Same as [[unfold]], but uses an async function to generate the next state-element tuple.
@ -278,19 +278,7 @@ object Source {
* }}} * }}}
*/ */
def unfoldAsync[S, E](s: S)(f: S Future[Option[(S, E)]]): Source[E, Unit] = def unfoldAsync[S, E](s: S)(f: S Future[Option[(S, E)]]): Source[E, Unit] =
Source.fromGraph(new UnfoldAsync(s, f)).withAttributes(DefaultAttributes.unfoldAsync) Source.fromGraph(new UnfoldAsync(s, f))
/**
* Simpler [[unfold]], for infinite sequences.
*
* {{{
* Source.unfoldInf(0 1) {
* case (a, b) (b (a + b)) a
* }
* }}}
*/
def unfoldInf[S, E](s: S)(f: S (S, E)): Source[E, Unit] =
unfold(s)(s Some(f(s))).withAttributes(DefaultAttributes.unfoldInf)
/** /**
* A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`. * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.