diff --git a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala index a997dab019..a5843afa2d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/FusingSpec.scala @@ -10,7 +10,7 @@ import org.scalactic.ConversionCheckedTripleEquals import akka.stream.Attributes._ import akka.stream.Fusing.FusedGraph import scala.annotation.tailrec -import akka.stream.impl.StreamLayout.Module +import akka.stream.impl.StreamLayout.{ CopiedModule, Module } import org.scalatest.concurrent.ScalaFutures import scala.concurrent.duration._ import akka.stream.impl.fusing.GraphInterpreter @@ -23,7 +23,7 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple implicit val patience = PatienceConfig(1.second) def graph(async: Boolean) = - Source.unfoldInf(1)(x ⇒ (x, x)).filter(_ % 2 == 1) + Source.unfold(1)(x ⇒ Some(x -> x)).filter(_ % 2 == 1) .alsoTo(Flow[Int].fold(0)(_ + _).to(Sink.head.named("otherSink")).addAttributes(if (async) Attributes.asyncBoundary else Attributes.none)) .via(Flow[Int].fold(1)(_ + _).named("mainSink")) @@ -36,13 +36,13 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple @tailrec def rec(curr: Module): Unit = { if (Debug) println(extractName(curr, "unknown")) - if (curr.attributes.contains(to)) () // done - else { - val outs = curr.inPorts.map(ups) - outs.size should ===(1) - val out = outs.head - val next = owner(out) - rec(next) + curr match { + case CopiedModule(_, attributes, copyOf) if (attributes and copyOf.attributes).contains(to) ⇒ () + case other if other.attributes.contains(to) ⇒ () + case _ ⇒ + val outs = curr.inPorts.map(ups) + outs.size should ===(1) + rec(owner(outs.head)) } } @@ -57,8 +57,8 @@ class FusingSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTriple module.downstreams.size should ===(modules - 1) module.info.downstreams.size should be >= downstreams module.info.upstreams.size should be >= downstreams - singlePath(fused, Attributes.Name("mainSink"), Attributes.Name("unfoldInf")) - singlePath(fused, Attributes.Name("otherSink"), Attributes.Name("unfoldInf")) + singlePath(fused, Attributes.Name("mainSink"), Attributes.Name("unfold")) + singlePath(fused, Attributes.Name("otherSink"), Attributes.Name("unfold")) } "fuse a moderately complex graph" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 6bcd64aa69..21c1f52239 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -252,7 +252,7 @@ class SourceSpec extends AkkaSpec with DefaultTimeout with ScalaFutures { } "generate an unbounded fibonacci sequence" in { - Source.unfoldInf((0, 1))({ case (a, b) ⇒ (b, a + b) → a }) + Source.unfold((0, 1))({ case (a, b) ⇒ Some((b, a + b) → a) }) .take(36) .runFold(List.empty[Int]) { case (xs, x) ⇒ x :: xs } .futureValue should ===(expected) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 088ed69981..0fa12df2ea 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -65,7 +65,6 @@ private[stream] object Stages { val repeat = name("repeat") val unfold = name("unfold") val unfoldAsync = name("unfoldAsync") - val unfoldInf = name("unfoldInf") val publisherSource = name("publisherSource") val iterableSource = name("iterableSource") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala index 5a873fe54f..db34a84088 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Unfold.scala @@ -3,10 +3,11 @@ */ package akka.stream.scaladsl +import akka.stream.impl.Stages.DefaultAttributes import akka.stream.stage.{ OutHandler, GraphStageLogic, GraphStage } import akka.stream._ -import scala.concurrent.{ ExecutionContext, Future } +import scala.concurrent.Future import scala.util.{ Failure, Success, Try } /** @@ -15,7 +16,7 @@ import scala.util.{ Failure, Success, Try } private[akka] final class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends GraphStage[SourceShape[E]] { val out: Outlet[E] = Outlet("Unfold.out") override val shape: SourceShape[E] = SourceShape(out) - + override def initialAttributes: Attributes = DefaultAttributes.unfold override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private[this] var state = s @@ -38,7 +39,7 @@ private[akka] final class Unfold[S, E](s: S, f: S ⇒ Option[(S, E)]) extends Gr private[akka] final class UnfoldAsync[S, E](s: S, f: S ⇒ Future[Option[(S, E)]]) extends GraphStage[SourceShape[E]] { val out: Outlet[E] = Outlet("UnfoldAsync.out") override val shape: SourceShape[E] = SourceShape(out) - + override def initialAttributes: Attributes = DefaultAttributes.unfoldAsync override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { private[this] var state = s diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index e2867f67c4..583a68ef38 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -184,13 +184,6 @@ object Source { def unfoldAsync[S, E](s: S, f: function.Function[S, Future[Option[(S, E)]]]): Source[E, Unit] = new Source(scaladsl.Source.unfoldAsync(s)((s: S) ⇒ f.apply(s))) - /** - * Simpler [[unfold]], for infinite sequences. - */ - def unfoldInf[S, E](s: S, f: function.Function[S, (S, E)]): Source[E, Unit] = { - new Source(scaladsl.Source.unfoldInf(s)((s: S) ⇒ f.apply(s))) - } - /** * Create a `Source` that immediately ends the stream with the `cause` failure to every connected `Sink`. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index d64e6b5bb4..55bd1c21a4 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -260,7 +260,7 @@ object Source { * }}} */ def unfold[S, E](s: S)(f: S ⇒ Option[(S, E)]): Source[E, Unit] = - Source.fromGraph(new Unfold(s, f)).withAttributes(DefaultAttributes.unfold) + Source.fromGraph(new Unfold(s, f)) /** * Same as [[unfold]], but uses an async function to generate the next state-element tuple. @@ -278,19 +278,7 @@ object Source { * }}} */ def unfoldAsync[S, E](s: S)(f: S ⇒ Future[Option[(S, E)]]): Source[E, Unit] = - Source.fromGraph(new UnfoldAsync(s, f)).withAttributes(DefaultAttributes.unfoldAsync) - - /** - * Simpler [[unfold]], for infinite sequences. - * - * {{{ - * Source.unfoldInf(0 → 1) { - * case (a, b) ⇒ (b → (a + b)) → a - * } - * }}} - */ - def unfoldInf[S, E](s: S)(f: S ⇒ (S, E)): Source[E, Unit] = - unfold(s)(s ⇒ Some(f(s))).withAttributes(DefaultAttributes.unfoldInf) + Source.fromGraph(new UnfoldAsync(s, f)) /** * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.