diff --git a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala index 646b7f2ab4..abf3f267f6 100644 --- a/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala +++ b/akka-http-core/src/main/scala/akka/http/util/StreamUtils.scala @@ -152,7 +152,7 @@ private[http] object StreamUtils { } else ByteString.empty } - Props(new IteratorPublisherImpl(iterator, materializer.settings)).withDispatcher(materializer.settings.fileIODispatcher) + IteratorPublisher.props(iterator, materializer.settings).withDispatcher(materializer.settings.fileIODispatcher) } new AtomicBoolean(false) with SimpleActorFlowSource[ByteString] { diff --git a/akka-http-tests/src/test/scala/akka/http/server/directives/CodingDirectivesSpec.scala b/akka-http-tests/src/test/scala/akka/http/server/directives/CodingDirectivesSpec.scala index 2f6576ec30..c3f9ad5ff6 100644 --- a/akka-http-tests/src/test/scala/akka/http/server/directives/CodingDirectivesSpec.scala +++ b/akka-http-tests/src/test/scala/akka/http/server/directives/CodingDirectivesSpec.scala @@ -158,7 +158,7 @@ class CodingDirectivesSpec extends RoutingSpec { "correctly encode the chunk stream produced by a chunked response" in { val text = "This is a somewhat lengthy text that is being chunked by the autochunk directive!" val textChunks = - text.grouped(8).map { chars ⇒ + () ⇒ text.grouped(8).map { chars ⇒ Chunk(chars.mkString): ChunkStreamPart } val chunkedTextEntity = HttpEntity.Chunked(MediaTypes.`text/plain`, Source(textChunks)) diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala index c7e942e068..295d5b8986 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaIdentityProcessorVerification.scala @@ -3,7 +3,9 @@ */ package akka.stream.tck -import scala.collection.immutable +import akka.event.Logging + +import scala.collection.{ mutable, immutable } import akka.actor.ActorSystem import akka.stream.FlowMaterializer import akka.stream.scaladsl.Sink @@ -24,14 +26,14 @@ abstract class AkkaIdentityProcessorVerification[T](val system: ActorSystem, env system.eventStream.publish(TestEvent.Mute(EventFilter[RuntimeException]("Test exception"))) /** Readable way to ignore TCK specs; Return this for `createErrorStatePublisher` to skip tests including it */ - final def ignored: Publisher[T] = null + final def ignored: mutable.Publisher[T] = null def this(system: ActorSystem, printlnDebug: Boolean) { this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug), Timeouts.publisherShutdownTimeoutMillis) } def this(printlnDebug: Boolean) { - this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug) + this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug) } def this() { diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala index 4c0a11b528..3b1e82aeba 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaPublisherVerification.scala @@ -3,6 +3,8 @@ */ package akka.stream.tck +import akka.event.Logging + import scala.concurrent.duration._ import akka.actor.ActorSystem @@ -27,7 +29,7 @@ abstract class AkkaPublisherVerification[T](val system: ActorSystem, env: TestEn } def this(printlnDebug: Boolean) { - this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug) + this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug) } def this() { diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala index 88f824ead7..5fb4e009e7 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/AkkaSubscriberVerification.scala @@ -3,6 +3,8 @@ */ package akka.stream.tck +import akka.event.Logging + import scala.collection.immutable import scala.concurrent.duration._ import akka.actor.ActorSystem @@ -27,7 +29,7 @@ abstract class AkkaSubscriberBlackboxVerification[T](val system: ActorSystem, en } def this(printlnDebug: Boolean) { - this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug) + this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug) } def this() { @@ -44,7 +46,7 @@ abstract class AkkaSubscriberWhiteboxVerification[T](val system: ActorSystem, en } def this(printlnDebug: Boolean) { - this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug) + this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug) } def this() { diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala index 71e5984464..69e526c120 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/FusableProcessorTest.scala @@ -22,7 +22,7 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] { val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet() val processor = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode( - Ast.Fusable(Vector(akka.stream.impl.fusing.Map[Int, Int](identity)), "identity"), flowName, 1) + Ast.Fused(List(akka.stream.impl.fusing.Map[Int, Int](identity)), "identity"), flowName, 1) processor.asInstanceOf[Processor[Int, Int]] } diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala index 7d39ee4549..c170b59bfb 100644 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala +++ b/akka-stream-tck/src/test/scala/akka/stream/tck/IterablePublisherTest.scala @@ -19,5 +19,4 @@ class IterablePublisherTest extends AkkaPublisherVerification[Int] { Source(iterable).runWith(Sink.publisher) } - } \ No newline at end of file diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/IteratorPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/IteratorPublisherTest.scala deleted file mode 100644 index 1d47443dfc..0000000000 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/IteratorPublisherTest.scala +++ /dev/null @@ -1,21 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.tck - -import scala.collection.immutable -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import org.reactivestreams.Publisher - -class IteratorPublisherTest extends AkkaPublisherVerification[Int](true) { - - def createPublisher(elements: Long): Publisher[Int] = { - val iterable: immutable.Iterable[Int] = - if (elements == 0) new immutable.Iterable[Int] { override def iterator = Iterator from 0 } - else 0 until elements.toInt - - Source(iterable).runWith(Sink.publisher) - } - -} \ No newline at end of file diff --git a/akka-stream-tck/src/test/scala/akka/stream/tck/SimpleCallbackPublisherTest.scala b/akka-stream-tck/src/test/scala/akka/stream/tck/SimpleCallbackPublisherTest.scala deleted file mode 100644 index 5e6066e468..0000000000 --- a/akka-stream-tck/src/test/scala/akka/stream/tck/SimpleCallbackPublisherTest.scala +++ /dev/null @@ -1,18 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.tck - -import akka.stream.scaladsl.Sink -import akka.stream.scaladsl.Source -import org.reactivestreams._ - -class SimpleCallbackPublisherTest extends AkkaPublisherVerification[Int] { - - def createPublisher(elements: Long): Publisher[Int] = { - val iter = Iterator from 0 - val iter2 = if (elements > 0) iter take elements.toInt else iter - Source(() ⇒ if (iter2.hasNext) Some(iter2.next()) else None).runWith(Sink.publisher) - } - -} \ No newline at end of file diff --git a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala index 1213d9a2e2..f950ce414a 100644 --- a/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala +++ b/akka-stream-testkit/src/test/scala/akka/stream/testkit/TwoStreamsSetup.scala @@ -3,6 +3,7 @@ package akka.stream.testkit import akka.stream.MaterializerSettings import akka.stream.scaladsl._ import org.reactivestreams.Publisher +import scala.collection.immutable import scala.util.control.NoStackTrace import akka.stream.FlowMaterializer @@ -40,7 +41,7 @@ abstract class TwoStreamsSetup extends AkkaSpec { def completedPublisher[T]: Publisher[T] = StreamTestKit.emptyPublisher[T] - def nonemptyPublisher[T](elems: Iterator[T]): Publisher[T] = Source(elems).runWith(Sink.publisher) + def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher) def soonToFailPublisher[T]: Publisher[T] = StreamTestKit.lazyErrorPublisher[T](TestException) diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java index de5da1c7ec..84cef63ceb 100644 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java @@ -61,7 +61,7 @@ public class ActorSubscriberTest extends StreamTest { final JavaTestKit probe = new JavaTestKit(system); final ActorRef ref = system.actorOf(Props.create(TestSubscriber.class, probe.getRef()).withDispatcher("akka.test.stream-dispatcher")); final Subscriber subscriber = UntypedActorSubscriber.create(ref); - final java.util.Iterator input = Arrays.asList(1, 2, 3).iterator(); + final java.lang.Iterable input = Arrays.asList(1, 2, 3); Source.from(input).runWith(Sink.create(subscriber), materializer); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index de8b10fcfd..2c2804845e 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -43,7 +43,7 @@ public class FlowTest extends StreamTest { public void mustBeAbleToUseSimpleOperators() { final JavaTestKit probe = new JavaTestKit(system); final String[] lookup = { "a", "b", "c", "d", "e", "f" }; - final java.util.Iterator input = Arrays.asList(0, 1, 2, 3, 4, 5).iterator(); + final java.lang.Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5); final Source ints = Source.from(input); ints.drop(2).take(3).takeWithin(FiniteDuration.create(10, TimeUnit.SECONDS)).map(new Function() { @@ -79,7 +79,7 @@ public class FlowTest extends StreamTest { @Test public void mustBeAbleToUseVoidTypeInForeach() { final JavaTestKit probe = new JavaTestKit(system); - final java.util.Iterator input = Arrays.asList("a", "b", "c").iterator(); + final java.lang.Iterable input = Arrays.asList("a", "b", "c"); Source ints = Source.from(input); Future completion = ints.foreach(new Procedure() { @@ -414,17 +414,11 @@ public class FlowTest extends StreamTest { @Test public void mustBeAbleToUseCallableInput() { final JavaTestKit probe = new JavaTestKit(system); - final akka.stream.javadsl.japi.Creator> input = new akka.stream.javadsl.japi.Creator>() { - int countdown = 5; - + final Iterable input1 = Arrays.asList(4,3,2,1,0); + final akka.stream.javadsl.japi.Creator> input = new akka.stream.javadsl.japi.Creator>() { @Override - public akka.japi.Option create() { - if (countdown == 0) { - return akka.japi.Option.none(); - } else { - countdown -= 1; - return akka.japi.Option.option(countdown); - } + public Iterator create() { + return input1.iterator(); } }; Source.from(input).foreach(new Procedure() { diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/SynchronousPublisherFromIterableSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/SynchronousPublisherFromIterableSpec.scala index 04b1883367..e9064beccd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/SynchronousPublisherFromIterableSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/SynchronousPublisherFromIterableSpec.scala @@ -29,14 +29,15 @@ class SynchronousPublisherFromIterableSpec extends AkkaSpec { "complete empty" in { val p = SynchronousPublisherFromIterable(List.empty[Int]) - val c = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(c) - c.expectComplete() - c.expectNoMsg(100.millis) + def verifyNewSubscriber(i: Int): Unit = { + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + c.expectSubscription() + c.expectComplete() + c.expectNoMsg(100.millis) + } - val c2 = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(c2) - c2.expectComplete() + 1 to 10 foreach verifyNewSubscriber } "produce elements with multiple subscribers" in { @@ -171,8 +172,8 @@ class SynchronousPublisherFromIterableSpec extends AkkaSpec { probe.expectMsg("complete") } - "have nice toString" in { - SynchronousPublisherFromIterable(List(1, 2, 3)).toString should be("SynchronousPublisherFromIterable(1, 2, 3)") + "have a toString that doesn't OOME" in { + SynchronousPublisherFromIterable(List(1, 2, 3)).toString should be(classOf[SynchronousPublisherFromIterable[_]].getSimpleName) } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala index db796d4d52..effe9503e4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/TcpFlowSpec.scala @@ -6,6 +6,7 @@ package akka.stream.io import akka.stream.scaladsl.Flow import akka.stream.testkit.AkkaSpec import akka.util.ByteString +import scala.collection.immutable import scala.concurrent.Await import scala.concurrent.duration._ import akka.stream.scaladsl.Source @@ -44,7 +45,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val server = new Server() val (tcpProcessor, serverConnection) = connect(server) - val testInput = Iterator.range(0, 256).map(ByteString(_)) + val testInput = (0 to 255).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) serverConnection.read(256) @@ -59,7 +60,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val server = new Server() val (tcpProcessor, serverConnection) = connect(server) - val testInput = Iterator.range(0, 256).map(ByteString(_)) + val testInput = (0 to 255).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) for (in ← testInput) serverConnection.write(in) @@ -155,7 +156,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val server = echoServer(serverAddress) val conn = connect(serverAddress) - val testInput = Iterator.range(0, 256).map(ByteString(_)) + val testInput = (0 to 255).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) Source(testInput).runWith(Sink(conn.outputStream)) @@ -175,7 +176,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper { val conn2 = connect(serverAddress) val conn3 = connect(serverAddress) - val testInput = Iterator.range(0, 256).map(ByteString(_)) + val testInput = (0 to 255).map(ByteString(_)) val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte])) Source(testInput).runWith(Sink(conn1.outputStream)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala index 4c3031aba9..e9db3ed95a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowBufferSpec.scala @@ -24,20 +24,20 @@ class FlowBufferSpec extends AkkaSpec { "Buffer" must { "pass elements through normally in backpressured mode" in { - val future: Future[Seq[Int]] = Source((1 to 1000).iterator).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001). + val future: Future[Seq[Int]] = Source(1 to 1000).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001). runWith(Sink.head) Await.result(future, 3.seconds) should be(1 to 1000) } "pass elements through normally in backpressured mode with buffer size one" in { val futureSink = Sink.head[Seq[Int]] - val future = Source((1 to 1000).iterator).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001). + val future = Source(1 to 1000).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001). runWith(Sink.head) Await.result(future, 3.seconds) should be(1 to 1000) } "pass elements through a chain of backpressured buffers of different size" in { - val future = Source((1 to 1000).iterator) + val future = Source(1 to 1000) .buffer(1, overflowStrategy = OverflowStrategy.backpressure) .buffer(10, overflowStrategy = OverflowStrategy.backpressure) .buffer(256, overflowStrategy = OverflowStrategy.backpressure) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala index 9678152922..edd87f1e5a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala @@ -23,11 +23,11 @@ class FlowConcatAllSpec extends AkkaSpec { val testException = new Exception("test") with NoStackTrace "work in the happy case" in { - val s1 = Source((1 to 2).iterator) + val s1 = Source(1 to 2) val s2 = Source(List.empty[Int]) val s3 = Source(List(3)) - val s4 = Source((4 to 6).iterator) - val s5 = Source((7 to 10).iterator) + val s4 = Source(4 to 6) + val s5 = Source(7 to 10) val main = Source(List(s1, s2, s3, s4, s5)) @@ -42,7 +42,7 @@ class FlowConcatAllSpec extends AkkaSpec { "work together with SplitWhen" in { val subscriber = StreamTestKit.SubscriberProbe[Int]() - Source((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).runWith(Sink(subscriber)) + Source(1 to 10).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).runWith(Sink(subscriber)) val subscription = subscriber.expectSubscription() subscription.request(10) subscriber.probe.receiveN(10) should be((1 to 10).map(StreamTestKit.OnNext(_))) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala index dc0eb56c1c..4574c7c4d4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConflateSpec.scala @@ -57,7 +57,7 @@ class FlowConflateSpec extends AkkaSpec { } "work on a variable rate chain" in { - val future = Source((1 to 1000).iterator) + val future = Source(1 to 1000) .conflate(seed = i ⇒ i)(aggregate = (sum, i) ⇒ sum + i) .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } .fold(0)(_ + _) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala index ea2ea2fee0..c9d6837dad 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowExpandSpec.scala @@ -67,7 +67,7 @@ class FlowExpandSpec extends AkkaSpec { } "work on a variable rate chain" in { - val future = Source((1 to 100).iterator) + val future = Source(1 to 100) .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } .expand(seed = i ⇒ i)(extrapolate = i ⇒ (i, i)) .fold(Set.empty[Int])(_ + _) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala index 435a1e08ea..2ccd933e64 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFilterSpec.scala @@ -30,8 +30,7 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest { implicit val materializer = FlowMaterializer(settings) val probe = StreamTestKit.SubscriberProbe[Int]() - Source(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0). - runWith(Sink(probe)) + Source(List.fill(1000)(0) ::: List(1)).filter(_ != 0).runWith(Sink(probe)) val subscription = probe.expectSubscription() for (_ ← 1 to 10000) { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala index 4d82228f49..c5d7be6650 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphCompileSpec.scala @@ -24,6 +24,8 @@ class FlowGraphCompileSpec extends AkkaSpec { } } + val apples = () ⇒ Iterator.continually(new Apple) + val f1 = Flow[String].transform("f1", op[String, String]) val f2 = Flow[String].transform("f2", op[String, String]) val f3 = Flow[String].transform("f3", op[String, String]) @@ -314,8 +316,8 @@ class FlowGraphCompileSpec extends AkkaSpec { FlowGraph { b ⇒ val merge = Merge[Fruit] b. - addEdge(Source[Fruit](() ⇒ Some(new Apple)), Flow[Fruit], merge). - addEdge(Source[Apple](() ⇒ Some(new Apple)), Flow[Apple], merge). + addEdge(Source[Fruit](apples), Flow[Fruit], merge). + addEdge(Source[Apple](apples), Flow[Apple], merge). addEdge(merge, Flow[Fruit].map(identity), out) } } @@ -330,8 +332,8 @@ class FlowGraphCompileSpec extends AkkaSpec { val unzip = Unzip[Int, String] val whatever = Sink.publisher[Any] import FlowGraphImplicits._ - Source[Fruit](() ⇒ Some(new Apple)) ~> merge - Source[Apple](() ⇒ Some(new Apple)) ~> merge + Source[Fruit](apples) ~> merge + Source[Apple](apples) ~> merge inA ~> merge inB ~> merge inA ~> Flow[Fruit].map(identity) ~> merge @@ -341,9 +343,9 @@ class FlowGraphCompileSpec extends AkkaSpec { UndefinedSource[Apple] ~> Flow[Apple].map(identity) ~> merge merge ~> Flow[Fruit].map(identity) ~> outA - Source[Apple](() ⇒ Some(new Apple)) ~> Broadcast[Apple] ~> merge - Source[Apple](() ⇒ Some(new Apple)) ~> Broadcast[Apple] ~> outB - Source[Apple](() ⇒ Some(new Apple)) ~> Broadcast[Apple] ~> UndefinedSink[Fruit] + Source[Apple](apples) ~> Broadcast[Apple] ~> merge + Source[Apple](apples) ~> Broadcast[Apple] ~> outB + Source[Apple](apples) ~> Broadcast[Apple] ~> UndefinedSink[Fruit] inB ~> Broadcast[Apple] ~> merge Source(List(1 -> "a", 2 -> "b", 3 -> "c")) ~> unzip.in diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphInitSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphInitSpec.scala index d1811c736a..3f149407ac 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphInitSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGraphInitSpec.scala @@ -45,7 +45,7 @@ class FlowGraphInitSpec extends AkkaSpec { val s = Source(1 to 5) val b = Broadcast[Int] - val sink: KeyedSink[Int] = Sink.foreach[Int](i ⇒ i) + val sink: KeyedSink[Int] = Sink.foreach[Int](_ ⇒ ()) val otherSink: KeyedSink[Int] = Sink.foreach[Int](i ⇒ 2 * i) FlowGraph { implicit builder ⇒ diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index d4152b0ac3..90115b17f4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -33,7 +33,7 @@ class FlowGroupBySpec extends AkkaSpec { } class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) { - val source = Source((1 to elementCount).iterator).runWith(Sink.publisher) + val source = Source(1 to elementCount).runWith(Sink.publisher) val groupStream = Source(source).groupBy(_ % groupCount).runWith(Sink.publisher) val masterSubscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedSpec.scala index 525dadbb5f..4ae9575b2f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupedSpec.scala @@ -18,15 +18,19 @@ class FlowGroupedSpec extends AkkaSpec with ScriptedTest { "A Grouped" must { + def randomSeq(n: Int) = immutable.Seq.fill(n)(random.nextInt()) + def randomTest(n: Int) = { val s = randomSeq(n); s -> immutable.Seq(s) } + "group evenly" in { - def script = Script(TestConfig.RandomTestRange map { _ ⇒ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }: _*) - TestConfig.RandomTestRange foreach (_ ⇒ runScript(script, settings)(_.grouped(3))) + val testLen = random.nextInt(1, 16) + def script = Script(TestConfig.RandomTestRange map { _ ⇒ randomTest(testLen) }: _*) + TestConfig.RandomTestRange foreach (_ ⇒ runScript(script, settings)(_.grouped(testLen))) } "group with rest" in { - def script = Script((TestConfig.RandomTestRange.map { _ ⇒ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) } - :+ { val x = random.nextInt(); Seq(x) -> Seq(immutable.Seq(x)) }): _*) - TestConfig.RandomTestRange foreach (_ ⇒ runScript(script, settings)(_.grouped(3))) + val testLen = random.nextInt(1, 16) + def script = Script((TestConfig.RandomTestRange.map { _ ⇒ randomTest(testLen) } :+ randomTest(1)): _*) + TestConfig.RandomTestRange foreach (_ ⇒ runScript(script, settings)(_.grouped(testLen))) } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIterableSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIterableSpec.scala index 1c98a448ff..508de463a0 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIterableSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIterableSpec.scala @@ -20,7 +20,7 @@ class FlowIterableSpec extends AkkaSpec { "A Flow based on an iterable" must { "produce elements" in { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher) + val p = Source(1 to 3).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -37,16 +37,18 @@ class FlowIterableSpec extends AkkaSpec { val p = Source(List.empty[Int]).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) + c.expectSubscription() c.expectComplete() c.expectNoMsg(100.millis) val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c2) + c2.expectSubscription() c2.expectComplete() } "produce elements with multiple subscribers" in { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher) + val p = Source(1 to 3).runWith(Sink.publisher) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c1) @@ -70,7 +72,7 @@ class FlowIterableSpec extends AkkaSpec { } "produce elements to later subscriber" in { - val p = Source(List(1, 2, 3)).runWith(Sink.publisher) + val p = Source(1 to 3).runWith(Sink.publisher) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c1) @@ -96,7 +98,7 @@ class FlowIterableSpec extends AkkaSpec { } "produce elements with one transformation step" in { - val p = Source(List(1, 2, 3)).map(_ * 2).runWith(Sink.publisher) + val p = Source(1 to 3).map(_ * 2).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -117,23 +119,5 @@ class FlowIterableSpec extends AkkaSpec { c.expectNext(8) c.expectComplete() } - - "allow cancel before receiving all elements" in { - val count = 100000 - val p = Source(1 to count).runWith(Sink.publisher) - val c = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(c) - val sub = c.expectSubscription() - sub.request(count) - c.expectNext(1) - sub.cancel() - val got = c.probe.receiveWhile(3.seconds) { - case _: OnNext[_] ⇒ - case OnComplete ⇒ fail("Cancel expected before OnComplete") - case OnError(e) ⇒ fail(e) - } - got.size should be < (count - 1) - } - } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala index 56888c0c7b..2c9b73ec17 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIteratorSpec.scala @@ -21,9 +21,9 @@ class FlowIteratorSpec extends AkkaSpec { implicit val materializer = FlowMaterializer(settings) - "A Flow based on an iterator" must { + "A Flow based on an iterator producing function" must { "produce elements" in { - val p = Source(List(1, 2, 3).iterator).runWith(Sink.publisher) + val p = Source(() ⇒ (1 to 3).iterator).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -37,7 +37,7 @@ class FlowIteratorSpec extends AkkaSpec { } "complete empty" in { - val p = Source[Int](Iterator.empty).runWith(Sink.publisher) + val p = Source[Int](() ⇒ Iterator.empty).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) c.expectSubscription() @@ -46,11 +46,12 @@ class FlowIteratorSpec extends AkkaSpec { val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c2) + c2.expectSubscription() c2.expectComplete() } "produce elements with multiple subscribers" in { - val p = Source(List(1, 2, 3).iterator).runWith(Sink.publisher) + val p = Source(() ⇒ (1 to 3).iterator).runWith(Sink.publisher) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c1) @@ -74,7 +75,7 @@ class FlowIteratorSpec extends AkkaSpec { } "produce elements to later subscriber" in { - val p = Source(List(1, 2, 3).iterator).runWith(Sink.publisher) + val p = Source(() ⇒ (1 to 3).iterator).runWith(Sink.publisher) val c1 = StreamTestKit.SubscriberProbe[Int]() val c2 = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c1) @@ -86,7 +87,7 @@ class FlowIteratorSpec extends AkkaSpec { p.subscribe(c2) val sub2 = c2.expectSubscription() sub2.request(3) - // element 1 is already gone + c2.expectNext(1) c2.expectNext(2) c2.expectNext(3) c2.expectComplete() @@ -97,7 +98,7 @@ class FlowIteratorSpec extends AkkaSpec { } "produce elements with one transformation step" in { - val p = Source(List(1, 2, 3).iterator).map(_ * 2).runWith(Sink.publisher) + val p = Source(() ⇒ (1 to 3).iterator).map(_ * 2).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -109,7 +110,7 @@ class FlowIteratorSpec extends AkkaSpec { } "produce elements with two transformation steps" in { - val p = Source(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).runWith(Sink.publisher) + val p = Source(() ⇒ (1 to 4).iterator).filter(_ % 2 == 0).map(_ * 2).runWith(Sink.publisher) val c = StreamTestKit.SubscriberProbe[Int]() p.subscribe(c) val sub = c.expectSubscription() @@ -118,23 +119,5 @@ class FlowIteratorSpec extends AkkaSpec { c.expectNext(8) c.expectComplete() } - - "allow cancel before receiving all elements" in { - val count = 100000 - val p = Source((1 to count).iterator).runWith(Sink.publisher) - val c = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(c) - val sub = c.expectSubscription() - sub.request(count) - c.expectNext(1) - sub.cancel() - val got = c.probe.receiveWhile(3.seconds) { - case _: OnNext[_] ⇒ - case OnComplete ⇒ fail("Cancel expected before OnComplete") - case OnError(e) ⇒ fail(e) - } - got.size should be < (count - 1) - } - } } \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala index 982d84a14a..bcdf64b00a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapSpec.scala @@ -27,7 +27,7 @@ class FlowMapSpec extends AkkaSpec with ScriptedTest { "not blow up with high request counts" in { val probe = StreamTestKit.SubscriberProbe[Int]() - Source(List(1).iterator). + Source(List(1)). map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1). runWith(Sink.publisher).subscribe(probe) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index 12981b4444..c68ba442cd 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -49,7 +49,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { "work on longer inputs" in { val futureSink = newHeadSink - val fut = Source((1 to 10).iterator).prefixAndTail(5).runWith(futureSink) + val fut = Source(1 to 10).prefixAndTail(5).runWith(futureSink) val (takes, tail) = Await.result(fut, 3.seconds) takes should be(1 to 5) @@ -60,7 +60,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { "handle zero take count" in { val futureSink = newHeadSink - val fut = Source((1 to 10).iterator).prefixAndTail(0).runWith(futureSink) + val fut = Source(1 to 10).prefixAndTail(0).runWith(futureSink) val (takes, tail) = Await.result(fut, 3.seconds) takes should be(Nil) @@ -71,7 +71,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { "handle negative take count" in { val futureSink = newHeadSink - val fut = Source((1 to 10).iterator).prefixAndTail(-1).runWith(futureSink) + val fut = Source(1 to 10).prefixAndTail(-1).runWith(futureSink) val (takes, tail) = Await.result(fut, 3.seconds) takes should be(Nil) @@ -82,7 +82,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { "work if size of take is equal to stream size" in { val futureSink = newHeadSink - val fut = Source((1 to 10).iterator).prefixAndTail(10).runWith(futureSink) + val fut = Source(1 to 10).prefixAndTail(10).runWith(futureSink) val (takes, tail) = Await.result(fut, 3.seconds) takes should be(1 to 10) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala new file mode 100644 index 0000000000..51c5f2ead5 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowScanSpec.scala @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } + +import scala.collection.immutable + +import akka.stream.FlowMaterializer +import akka.stream.MaterializerSettings +import akka.stream.testkit.AkkaSpec + +class FlowScanSpec extends AkkaSpec { + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + .withFanOutBuffer(initialSize = 1, maxSize = 16) + + implicit val materializer = FlowMaterializer(settings) + + "A Scan" must { + + def scan(s: Source[Int], duration: Duration = 5.seconds): immutable.Seq[Int] = + Await.result(s.scan(0)(_ + _).fold(immutable.Seq.empty[Int])(_ :+ _), duration) + + "Scan" in { + val v = Vector.fill(random.nextInt(100, 1000))(random.nextInt()) + scan(Source(v)) should be(v.scan(0)(_ + _)) + } + + "Scan empty failed" in { + val e = new Exception("fail!") + intercept[Exception](scan(Source.failed[Int](e))) should be theSameInstanceAs (e) + } + + "Scan empty" in { + val v = Vector.empty[Int] + scan(Source(v)) should be(v.scan(0)(_ + _)) + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index 78c9ccc403..071b39b340 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -13,8 +13,8 @@ import scala.concurrent.duration._ import akka.actor.{ Props, ActorRefFactory, ActorRef } import akka.stream.{ TransformerLike, MaterializerSettings } import akka.stream.FlowMaterializer -import akka.stream.impl.{ ActorProcessorFactory, TransformProcessorImpl, StreamSupervisor, ActorBasedFlowMaterializer } -import akka.stream.impl.Ast.{ Transform, Fusable, AstNode } +import akka.stream.impl._ +import akka.stream.impl.Ast._ import akka.stream.testkit.{ StreamTestKit, AkkaSpec } import akka.stream.testkit.ChainSetup import akka.testkit._ @@ -25,6 +25,7 @@ import org.reactivestreams.{ Processor, Subscriber, Publisher } object FlowSpec { class Fruit class Apple extends Fruit + val apples = () ⇒ Iterator.continually(new Apple) val flowNameCounter = new AtomicLong(0) @@ -68,13 +69,24 @@ object FlowSpec { supervisor: ActorRef, flowNameCounter: AtomicLong, namePrefix: String, - brokenMessage: Any) extends ActorBasedFlowMaterializer(settings, supervisor, flowNameCounter, namePrefix) { + optimizations: Optimizations, + brokenMessage: Any) extends ActorBasedFlowMaterializer(settings, supervisor, flowNameCounter, namePrefix, optimizations) { - override def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { + override def processorForNode[In, Out](op: AstNode, flowName: String, n: Int): Processor[In, Out] = { val props = op match { - case t: Transform ⇒ Props(new BrokenTransformProcessorImpl(settings, t.mkTransformer(), brokenMessage)) - case f: Fusable ⇒ Props(new BrokenActorInterpreter(settings, f.ops, brokenMessage)).withDispatcher(settings.dispatcher) - case o ⇒ ActorProcessorFactory.props(this, o) + case t: Transform ⇒ Props(new BrokenTransformProcessorImpl(settings, t.mkTransformer(), brokenMessage)) + case f: Fused ⇒ Props(new BrokenActorInterpreter(settings, f.ops, brokenMessage)).withDispatcher(settings.dispatcher) + case Map(f) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Map(f)), brokenMessage)) + case Filter(p) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Filter(p)), brokenMessage)) + case Drop(n) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Drop(n)), brokenMessage)) + case Take(n) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Take(n)), brokenMessage)) + case Collect(pf) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Collect(pf)), brokenMessage)) + case Scan(z, f) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Scan(z, f)), brokenMessage)) + case Expand(s, f) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Expand(s, f)), brokenMessage)) + case Conflate(s, f) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Conflate(s, f)), brokenMessage)) + case Buffer(n, s) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.Buffer(n, s)), brokenMessage)) + case MapConcat(f) ⇒ Props(new BrokenActorInterpreter(settings, List(fusing.MapConcat(f)), brokenMessage)) + case o ⇒ ActorProcessorFactory.props(this, o) } val impl = actorOf(props, s"$flowName-$n-${op.name}") ActorProcessorFactory(impl) @@ -87,6 +99,7 @@ object FlowSpec { context.actorOf(StreamSupervisor.props(settings).withDispatcher(settings.dispatcher)), flowNameCounter, "brokenflow", + Optimizations.none, brokenMessage) } } @@ -316,11 +329,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "be covariant" in { - val f1: Source[Fruit] = Source[Fruit](() ⇒ Some(new Apple)) - val p1: Publisher[Fruit] = Source[Fruit](() ⇒ Some(new Apple)).runWith(Sink.publisher) - val f2: Source[Source[Fruit]] = Source[Fruit](() ⇒ Some(new Apple)).splitWhen(_ ⇒ true) - val f3: Source[(Boolean, Source[Fruit])] = Source[Fruit](() ⇒ Some(new Apple)).groupBy(_ ⇒ true) - val f4: Source[(immutable.Seq[Fruit], Source[Fruit])] = Source[Fruit](() ⇒ Some(new Apple)).prefixAndTail(1) + val f1: Source[Fruit] = Source[Fruit](apples) + val p1: Publisher[Fruit] = Source[Fruit](apples).runWith(Sink.publisher) + val f2: Source[Source[Fruit]] = Source[Fruit](apples).splitWhen(_ ⇒ true) + val f3: Source[(Boolean, Source[Fruit])] = Source[Fruit](apples).groupBy(_ ⇒ true) + val f4: Source[(immutable.Seq[Fruit], Source[Fruit])] = Source[Fruit](apples).prefixAndTail(1) val d1: Flow[String, Source[Fruit]] = Flow[String].map(_ ⇒ new Apple).splitWhen(_ ⇒ true) val d2: Flow[String, (Boolean, Source[Fruit])] = Flow[String].map(_ ⇒ new Apple).groupBy(_ ⇒ true) val d3: Flow[String, (immutable.Seq[Apple], Source[Fruit])] = Flow[String].map(_ ⇒ new Apple).prefixAndTail(1) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala index 99368be6d0..8b2e348e6d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSplitWhenSpec.scala @@ -32,7 +32,7 @@ class FlowSplitWhenSpec extends AkkaSpec { } class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) { - val source = Source((1 to elementCount).iterator) + val source = Source(1 to elementCount) val groupStream = source.splitWhen(_ == splitWhen).runWith(Sink.publisher) val masterSubscriber = StreamTestKit.SubscriberProbe[Source[Int]]() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThunkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThunkSpec.scala deleted file mode 100644 index 2c9d4f2ab4..0000000000 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowThunkSpec.scala +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.scaladsl - -import scala.concurrent.duration._ - -import akka.stream.FlowMaterializer -import akka.stream.testkit.AkkaSpec -import akka.stream.testkit.StreamTestKit -import akka.stream.testkit.StreamTestKit.OnComplete -import akka.stream.testkit.StreamTestKit.OnError -import akka.stream.testkit.StreamTestKit.OnNext - -class FlowThunkSpec extends AkkaSpec { - - implicit val materializer = FlowMaterializer() - - "A Flow based on a thunk generator" must { - "produce elements" in { - - val iter = List(1, 2, 3).iterator - val p = Source(() ⇒ if (iter.hasNext) Some(iter.next()) else None).map(_ + 10).runWith(Sink.publisher) - val c = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(c) - val sub = c.expectSubscription() - sub.request(1) - c.expectNext(11) - c.expectNoMsg(100.millis) - sub.request(3) - c.expectNext(12) - c.expectNext(13) - c.expectComplete() - } - - "complete empty" in { - val p = Source(() ⇒ None).runWith(Sink.publisher) - val c = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(c) - val sub = c.expectSubscription() - sub.request(1) - c.expectComplete() - c.expectNoMsg(100.millis) - } - - "allow cancel before receiving all elements" in { - val count = 100000 - val iter = (1 to count).iterator - val p = Source(() ⇒ if (iter.hasNext) Some(iter.next()) else None).runWith(Sink.publisher) - val c = StreamTestKit.SubscriberProbe[Int]() - p.subscribe(c) - val sub = c.expectSubscription() - sub.request(count) - c.expectNext(1) - sub.cancel() - val got = c.probe.receiveWhile(3.seconds) { - case _: OnNext[_] ⇒ - case OnComplete ⇒ fail("Cancel expected before OnComplete") - case OnError(e) ⇒ fail(e) - } - got.size should be < (count - 1) - } - - } -} \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTransformRecoverSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTransformRecoverSpec.scala index dc9d52b89c..bd65eb64b6 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTransformRecoverSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTransformRecoverSpec.scala @@ -41,7 +41,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "A Flow with transformRecover operations" must { "produce one-to-one transformation as expected" in { - val p = Source(List(1, 2, 3).iterator).runWith(Sink.publisher) + val p = Source(1 to 3).runWith(Sink.publisher) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -69,7 +69,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "produce one-to-several transformation as expected" in { - val p = Source(List(1, 2, 3).iterator).runWith(Sink.publisher) + val p = Source(1 to 3).runWith(Sink.publisher) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -100,7 +100,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "produce dropping transformation as expected" in { - val p = Source(List(1, 2, 3, 4).iterator).runWith(Sink.publisher) + val p = Source(1 to 4).runWith(Sink.publisher) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { var tot = 0 @@ -128,7 +128,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "produce multi-step transformation as expected" in { - val p = Source(List("a", "bc", "def").iterator).runWith(Sink.publisher) + val p = Source(List("a", "bc", "def")).runWith(Sink.publisher) val p2 = Source(p). transform("transform", () ⇒ new TryRecoveryTransformer[String, Int] { var concat = "" @@ -171,7 +171,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "invoke onComplete when done" in { - val p = Source(List("a").iterator).runWith(Sink.publisher) + val p = Source(List("a")).runWith(Sink.publisher) val p2 = Source(p). transform("transform", () ⇒ new TryRecoveryTransformer[String, String] { var s = "" @@ -241,7 +241,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "report error when exception is thrown" in { - val p = Source(List(1, 2, 3).iterator).runWith(Sink.publisher) + val p = Source(1 to 3).runWith(Sink.publisher) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = { @@ -267,7 +267,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "report error after emitted elements" in { EventFilter[IllegalArgumentException]("two not allowed") intercept { - val p2 = Source(List(1, 2, 3).iterator). + val p2 = Source(1 to 3). mapConcat { elem ⇒ if (elem == 2) throw new IllegalArgumentException("two not allowed") else (1 to 5).map(elem * 100 + _) @@ -367,7 +367,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { } "support cancel as expected" in { - val p = Source(List(1, 2, 3).iterator).runWith(Sink.publisher) + val p = Source(1 to 3).runWith(Sink.publisher) val p2 = Source(p). transform("transform", () ⇒ new Transformer[Int, Int] { override def onNext(elem: Int) = List(elem, elem) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala index aa7c4f3543..b9ca0100a4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphConcatSpec.scala @@ -27,11 +27,11 @@ class GraphConcatSpec extends TwoStreamsSetup { val concat1 = Concat[Int]("concat1") val concat2 = Concat[Int]("concat2") - Source(List.empty[Int].iterator) ~> concat1.first - Source((1 to 4).iterator) ~> concat1.second + Source(List.empty[Int]) ~> concat1.first + Source(1 to 4) ~> concat1.second concat1.out ~> concat2.first - Source((5 to 10).iterator) ~> concat2.second + Source(5 to 10) ~> concat2.second concat2.out ~> Sink(probe) }.run() @@ -49,7 +49,7 @@ class GraphConcatSpec extends TwoStreamsSetup { commonTests() "work with one immediately completed and one nonempty publisher" in { - val subscriber1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator)) + val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) val subscription1 = subscriber1.expectSubscription() subscription1.request(5) subscriber1.expectNext(1) @@ -58,7 +58,7 @@ class GraphConcatSpec extends TwoStreamsSetup { subscriber1.expectNext(4) subscriber1.expectComplete() - val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher) + val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) val subscription2 = subscriber2.expectSubscription() subscription2.request(5) subscriber2.expectNext(1) @@ -69,7 +69,7 @@ class GraphConcatSpec extends TwoStreamsSetup { } "work with one delayed completed and one nonempty publisher" in { - val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator)) + val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) val subscription1 = subscriber1.expectSubscription() subscription1.request(5) subscriber1.expectNext(1) @@ -78,7 +78,7 @@ class GraphConcatSpec extends TwoStreamsSetup { subscriber1.expectNext(4) subscriber1.expectComplete() - val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher) + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) val subscription2 = subscriber2.expectSubscription() subscription2.request(5) subscriber2.expectNext(1) @@ -89,18 +89,18 @@ class GraphConcatSpec extends TwoStreamsSetup { } "work with one immediately failed and one nonempty publisher" in { - val subscriber1 = setup(failedPublisher, nonemptyPublisher((1 to 4).iterator)) + val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) subscriber1.expectErrorOrSubscriptionFollowedByError(TestException) - val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), failedPublisher) + val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) subscriber2.expectErrorOrSubscriptionFollowedByError(TestException) } "work with one delayed failed and one nonempty publisher" in { - val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher((1 to 4).iterator)) + val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4)) subscriber1.expectErrorOrSubscriptionFollowedByError(TestException) - val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToFailPublisher) + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher) subscriber2.expectErrorOrSubscriptionFollowedByError(TestException) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala index 4eb55fd436..e438113a0a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMergeSpec.scala @@ -20,9 +20,9 @@ class GraphMergeSpec extends TwoStreamsSetup { "work in the happy case" in { // Different input sizes (4 and 6) - val source1 = Source((0 to 3).iterator) - val source2 = Source((4 to 9).iterator) - val source3 = Source(List.empty[Int].iterator) + val source1 = Source(0 to 3) + val source2 = Source(4 to 9) + val source3 = Source(List[Int]()) val probe = StreamTestKit.SubscriberProbe[Int]() FlowGraph { implicit b ⇒ @@ -54,7 +54,7 @@ class GraphMergeSpec extends TwoStreamsSetup { val source3 = Source(List(3)) val source4 = Source(List(4)) val source5 = Source(List(5)) - val source6 = Source(List.empty[Int]) + val source6 = Source(List[Int]()) val probe = StreamTestKit.SubscriberProbe[Int]() @@ -85,7 +85,7 @@ class GraphMergeSpec extends TwoStreamsSetup { commonTests() "work with one immediately completed and one nonempty publisher" in { - val subscriber1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator)) + val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) val subscription1 = subscriber1.expectSubscription() subscription1.request(4) subscriber1.expectNext(1) @@ -94,7 +94,7 @@ class GraphMergeSpec extends TwoStreamsSetup { subscriber1.expectNext(4) subscriber1.expectComplete() - val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher) + val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) val subscription2 = subscriber2.expectSubscription() subscription2.request(4) subscriber2.expectNext(1) @@ -105,7 +105,7 @@ class GraphMergeSpec extends TwoStreamsSetup { } "work with one delayed completed and one nonempty publisher" in { - val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator)) + val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) val subscription1 = subscriber1.expectSubscription() subscription1.request(4) subscriber1.expectNext(1) @@ -114,7 +114,7 @@ class GraphMergeSpec extends TwoStreamsSetup { subscriber1.expectNext(4) subscriber1.expectComplete() - val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher) + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) val subscription2 = subscriber2.expectSubscription() subscription2.request(4) subscriber2.expectNext(1) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala index ef4b80e09d..2501f7bb86 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipSpec.scala @@ -42,34 +42,34 @@ class GraphZipSpec extends TwoStreamsSetup { commonTests() "work with one immediately completed and one nonempty publisher" in { - val subscriber1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator)) + val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4)) subscriber1.expectCompletedOrSubscriptionFollowedByComplete() - val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher) + val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher) subscriber2.expectCompletedOrSubscriptionFollowedByComplete() } "work with one delayed completed and one nonempty publisher" in { - val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator)) + val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4)) subscriber1.expectCompletedOrSubscriptionFollowedByComplete() - val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher) + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher) subscriber2.expectCompletedOrSubscriptionFollowedByComplete() } "work with one immediately failed and one nonempty publisher" in { - val subscriber1 = setup(failedPublisher, nonemptyPublisher((1 to 4).iterator)) + val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4)) subscriber1.expectErrorOrSubscriptionFollowedByError(TestException) - val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), failedPublisher) + val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher) subscriber2.expectErrorOrSubscriptionFollowedByError(TestException) } "work with one delayed failed and one nonempty publisher" in { - val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher((1 to 4).iterator)) + val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4)) subscriber1.expectErrorOrSubscriptionFollowedByError(TestException) - val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToFailPublisher) + val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher) val subscription2 = subscriber2.expectErrorOrSubscriptionFollowedByError(TestException) } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/OptimizingActorBasedFlowMaterializerSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/OptimizingActorBasedFlowMaterializerSpec.scala new file mode 100644 index 0000000000..1621b2bd79 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/OptimizingActorBasedFlowMaterializerSpec.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.{ FlowMaterializer, MaterializerSettings } +import akka.stream.impl.{ Optimizations, ActorBasedFlowMaterializer } +import akka.stream.testkit.AkkaSpec +import akka.testkit._ + +import scala.concurrent.duration._ +import scala.concurrent.Await + +class OptimizingActorBasedFlowMaterializerSpec extends AkkaSpec with ImplicitSender { + + "ActorBasedFlowMaterializer" must { + //FIXME Add more and meaningful tests to verify that optimizations occur and have the same semantics as the non-optimized code + "optimize filter + map" in { + implicit val mat = FlowMaterializer().asInstanceOf[ActorBasedFlowMaterializer].copy(optimizations = Optimizations.all) + val f = Source(1 to 100). + drop(4). + drop(5). + transform("identity", () ⇒ FlowOps.identityTransformer). + filter(_ % 2 == 0). + map(_ * 2). + map(identity). + take(20). + take(10). + drop(5). + fold(0)(_ + _) + + val expected = (1 to 100). + drop(9). + filter(_ % 2 == 0). + map(_ * 2). + take(10). + drop(5). + fold(0)(_ + _) + + Await.result(f, 5.seconds) should be(expected) + } + } +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 8d2735beda..af65f9d2c1 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -6,10 +6,7 @@ package akka.stream import java.util.Locale import java.util.concurrent.TimeUnit -import akka.stream.impl.ActorBasedFlowMaterializer -import akka.stream.impl.Ast -import akka.stream.impl.FlowNameCounter -import akka.stream.impl.StreamSupervisor +import akka.stream.impl._ import scala.collection.immutable @@ -63,7 +60,8 @@ object FlowMaterializer { materializerSettings, context.actorOf(StreamSupervisor.props(materializerSettings).withDispatcher(materializerSettings.dispatcher)), FlowNameCounter(system).counter, - namePrefix) + namePrefix, + optimizations = Optimizations.none) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 4247c5525c..2d47ae8c67 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -5,6 +5,7 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicLong +import akka.event.Logging import akka.stream.impl.fusing.{ ActorInterpreter, Op } import scala.annotation.tailrec @@ -35,37 +36,74 @@ import org.reactivestreams.{ Processor, Publisher, Subscriber } * INTERNAL API */ private[akka] object Ast { - sealed trait AstNode { + sealed abstract class AstNode { def name: String } + // FIXME Replace with Operate + final case class Transform(name: String, mkTransformer: () ⇒ Transformer[Any, Any]) extends AstNode + // FIXME Replace with Operate + final case class TimerTransform(name: String, mkTransformer: () ⇒ TimerTransformer[Any, Any]) extends AstNode - case class Transform(name: String, mkTransformer: () ⇒ Transformer[Any, Any]) extends AstNode - - case class TimerTransform(name: String, mkTransformer: () ⇒ TimerTransformer[Any, Any]) extends AstNode - - case class Fusable(ops: immutable.Seq[Op[_, _, _, _, _]], name: String) extends AstNode - - case class MapAsync(f: Any ⇒ Future[Any]) extends AstNode { - override def name = "mapAsync" + final case class Operate(mkOp: () ⇒ fusing.Op[_, _, _, _, _]) extends AstNode { + override def name = "operate" } - case class MapAsyncUnordered(f: Any ⇒ Future[Any]) extends AstNode { - override def name = "mapAsyncUnordered" + object Fused { + def apply(ops: immutable.Seq[Op[_, _, _, _, _]]): Fused = + Fused(ops, ops.map(x ⇒ Logging.simpleName(x).toLowerCase).mkString("+")) //FIXME change to something more performant for name + } + final case class Fused(ops: immutable.Seq[Op[_, _, _, _, _]], override val name: String) extends AstNode + + final case class Map(f: Any ⇒ Any) extends AstNode { override def name = "map" } + + final case class Filter(p: Any ⇒ Boolean) extends AstNode { override def name = "filter" } + + final case class Collect(pf: PartialFunction[Any, Any]) extends AstNode { override def name = "collect" } + + // FIXME Replace with OperateAsync + final case class MapAsync(f: Any ⇒ Future[Any]) extends AstNode { override def name = "mapAsync" } + + //FIXME Should be OperateUnorderedAsync + final case class MapAsyncUnordered(f: Any ⇒ Future[Any]) extends AstNode { override def name = "mapAsyncUnordered" } + + final case class Grouped(n: Int) extends AstNode { + require(n > 0, "n must be greater than 0") + override def name = "grouped" } - case class GroupBy(f: Any ⇒ Any) extends AstNode { - override def name = "groupBy" + //FIXME should be `n: Long` + final case class Take(n: Int) extends AstNode { + override def name = "take" } - case class PrefixAndTail(n: Int) extends AstNode { - override def name = "prefixAndTail" + //FIXME should be `n: Long` + final case class Drop(n: Int) extends AstNode { + override def name = "drop" } - case class SplitWhen(p: Any ⇒ Boolean) extends AstNode { - override def name = "splitWhen" + final case class Scan(zero: Any, f: (Any, Any) ⇒ Any) extends AstNode { override def name = "scan" } + + final case class Buffer(size: Int, overflowStrategy: OverflowStrategy) extends AstNode { + require(size > 0, s"Buffer size must be larger than zero but was [$size]") + override def name = "buffer" + } + final case class Conflate(seed: Any ⇒ Any, aggregate: (Any, Any) ⇒ Any) extends AstNode { + override def name = "conflate" + } + final case class Expand(seed: Any ⇒ Any, extrapolate: Any ⇒ (Any, Any)) extends AstNode { + override def name = "expand" + } + final case class MapConcat(f: Any ⇒ immutable.Seq[Any]) extends AstNode { + override def name = "mapConcat" } - case object ConcatAll extends AstNode { + final case class GroupBy(f: Any ⇒ Any) extends AstNode { override def name = "groupBy" } + + final case class PrefixAndTail(n: Int) extends AstNode { override def name = "prefixAndTail" } + + final case class SplitWhen(p: Any ⇒ Boolean) extends AstNode { override def name = "splitWhen" } + + final case object ConcatAll extends AstNode { override def name = "concatFlatten" } @@ -77,6 +115,7 @@ private[akka] object Ast { sealed trait FanInAstNode extends JunctionAstNode sealed trait FanOutAstNode extends JunctionAstNode + // FIXME Why do we need this? case object IdentityAstNode extends JunctionAstNode { override def name = "identity" } @@ -119,54 +158,164 @@ private[akka] object Ast { } +/** + * INTERNAL API + */ +final object Optimizations { + val none: Optimizations = Optimizations(collapsing = false, elision = false, simplification = false, fusion = false) + val all: Optimizations = Optimizations(collapsing = true, elision = true, simplification = true, fusion = true) +} +/** + * INTERNAL API + */ +final case class Optimizations(collapsing: Boolean, elision: Boolean, simplification: Boolean, fusion: Boolean) { + def isEnabled: Boolean = collapsing || elision || simplification || fusion +} + /** * INTERNAL API */ case class ActorBasedFlowMaterializer(override val settings: MaterializerSettings, supervisor: ActorRef, flowNameCounter: AtomicLong, - namePrefix: String) + namePrefix: String, + optimizations: Optimizations) extends FlowMaterializer(settings) { import Ast.AstNode def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name) - private def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet() + private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet() - private def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}" + private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}" - @tailrec private def processorChain(topSubscriber: Subscriber[_], ops: immutable.Seq[AstNode], - flowName: String, n: Int): Subscriber[_] = { + @tailrec private[this] def processorChain(topProcessor: Processor[_, _], + ops: List[AstNode], + flowName: String, + n: Int): Processor[_, _] = ops match { case op :: tail ⇒ - val opProcessor: Processor[Any, Any] = processorForNode(op, flowName, n) - opProcessor.subscribe(topSubscriber.asInstanceOf[Subscriber[Any]]) + val opProcessor = processorForNode[Any, Any](op, flowName, n) + opProcessor.subscribe(topProcessor.asInstanceOf[Subscriber[Any]]) processorChain(opProcessor, tail, flowName, n - 1) - case _ ⇒ topSubscriber + case Nil ⇒ + topProcessor } + + //FIXME Optimize the implementation of the optimizer (no joke) + // AstNodes are in reverse order, Fusable Ops are in order + private[this] final def optimize(ops: List[Ast.AstNode]): (List[Ast.AstNode], Int) = { + @tailrec def analyze(rest: List[Ast.AstNode], optimized: List[Ast.AstNode], fuseCandidates: List[fusing.Op[_, _, _, _, _]]): (List[Ast.AstNode], Int) = { + + //The `verify` phase + def verify(rest: List[Ast.AstNode], orig: List[Ast.AstNode]): List[Ast.AstNode] = + rest match { + case (f: Ast.Fused) :: _ ⇒ throw new IllegalStateException("Fused AST nodes not allowed to be present in the input to the optimizer: " + f) + //TODO Ast.Take(-Long.MaxValue..0) == stream doesn't do anything. Perhaps output warning for that? + case noMatch ⇒ noMatch + } + + // The `elide` phase + // TODO / FIXME : This phase could be pulled out to be executed incrementally when building the Ast + def elide(rest: List[Ast.AstNode], orig: List[Ast.AstNode]): List[Ast.AstNode] = + rest match { + case noMatch if !optimizations.elision || (noMatch ne orig) ⇒ orig + //Collapses consecutive Take's into one + case (t1 @ Ast.Take(t1n)) :: (t2 @ Ast.Take(t2n)) :: rest ⇒ (if (t1n < t2n) t1 else t2) :: rest + + //Collapses consecutive Drop's into one + case (d1 @ Ast.Drop(d1n)) :: (d2 @ Ast.Drop(d2n)) :: rest ⇒ new Ast.Drop(d1n + d2n) :: rest + + case Ast.Drop(n) :: rest if n < 1 ⇒ rest // a 0 or negative drop is a NoOp + + case noMatch ⇒ noMatch + } + // The `simplify` phase + def simplify(rest: List[Ast.AstNode], orig: List[Ast.AstNode]): List[Ast.AstNode] = + rest match { + case noMatch if !optimizations.simplification || (noMatch ne orig) ⇒ orig + + // Two consecutive maps is equivalent to one pipelined map + case Ast.Map(second) :: Ast.Map(first) :: rest ⇒ Ast.Map(first compose second) :: rest + + case noMatch ⇒ noMatch + } + + // the `Collapse` phase + def collapse(rest: List[Ast.AstNode], orig: List[Ast.AstNode]): List[Ast.AstNode] = + rest match { + case noMatch if !optimizations.collapsing || (noMatch ne orig) ⇒ orig + + // Collapses a filter and a map into a collect + case Ast.Map(f) :: Ast.Filter(p) :: rest ⇒ Ast.Collect({ case i if p(i) ⇒ f(i) }) :: rest + + case noMatch ⇒ noMatch + } + + // Tries to squeeze AstNode into a single fused pipeline + def ast2op(head: Ast.AstNode, prev: List[fusing.Op[_, _, _, _, _]]): List[fusing.Op[_, _, _, _, _]] = + head match { + // Always-on below + case Ast.Operate(mkOp) ⇒ mkOp() :: prev + + // Optimizations below + case noMatch if !optimizations.fusion ⇒ prev + + case Ast.Take(n) ⇒ fusing.Take(n) :: prev + case Ast.Drop(n) ⇒ fusing.Drop(n) :: prev + case Ast.Filter(p) ⇒ fusing.Filter(p) :: prev + case Ast.Map(f) ⇒ fusing.Map(f) :: prev + case Ast.Collect(pf) ⇒ fusing.Collect(pf) :: prev + //FIXME Add more fusion goodies here + case _ ⇒ prev + } + + // First verify, then try to elide, then try to simplify, then try to fuse + collapse(rest, simplify(rest, elide(rest, verify(rest, rest)))) match { + + case Nil ⇒ + if (fuseCandidates.isEmpty) (optimized.reverse, optimized.length) // End of optimization run without fusion going on, wrap up + else ((Ast.Fused(fuseCandidates) :: optimized).reverse, optimized.length + 1) // End of optimization run with fusion going on, so add it to the optimized stack + + // If the Ast was changed this pass simply recur + case modified if modified ne rest ⇒ analyze(modified, optimized, fuseCandidates) + + // No changes to the Ast, lets try to see if we can squeeze the current head Ast node into a fusion pipeline + case head :: rest ⇒ + ast2op(head, fuseCandidates) match { + case Nil ⇒ analyze(rest, head :: optimized, Nil) + case `fuseCandidates` ⇒ analyze(rest, head :: Ast.Fused(fuseCandidates) :: optimized, Nil) + case newFuseCandidates ⇒ analyze(rest, optimized, newFuseCandidates) + } + } + } + val result = analyze(ops, Nil, Nil) + //println(s"before: $ops") + //println(s"after: ${result._1}") + result } // Ops come in reverse order - override def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedMap = { - val flowName = createFlowName() + override def materialize[In, Out](source: Source[In], sink: Sink[Out], rawOps: List[Ast.AstNode]): MaterializedMap = { + val flowName = createFlowName() //FIXME: Creates Id even when it is not used in all branches below - def throwUnknownType(typeName: String, s: Any): Nothing = - throw new MaterializationException(s"unknown $typeName type " + s.getClass) + def throwUnknownType(typeName: String, s: AnyRef): Nothing = + throw new MaterializationException(s"unknown $typeName type ${s.getClass}") - def attachSink(pub: Publisher[Out]) = sink match { + def attachSink(pub: Publisher[Out], flowName: String) = sink match { case s: ActorFlowSink[Out] ⇒ s.attach(pub, this, flowName) case s ⇒ throwUnknownType("Sink", s) } - def attachSource(sub: Subscriber[In]) = source match { + def attachSource(sub: Subscriber[In], flowName: String) = source match { case s: ActorFlowSource[In] ⇒ s.attach(sub, this, flowName) case s ⇒ throwUnknownType("Source", s) } - def createSink() = sink match { + def createSink(flowName: String) = sink match { case s: ActorFlowSink[In] ⇒ s.create(this, flowName) case s ⇒ throwUnknownType("Sink", s) } - def createSource() = source match { + def createSource(flowName: String) = source match { case s: ActorFlowSource[Out] ⇒ s.create(this, flowName) case s ⇒ throwUnknownType("Source", s) } @@ -178,72 +327,60 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting } val (sourceValue, sinkValue) = - if (ops.isEmpty) { + if (rawOps.isEmpty) { if (isActive(sink)) { - val (sub, value) = createSink() - (attachSource(sub), value) + val (sub, value) = createSink(flowName) + (attachSource(sub, flowName), value) } else if (isActive(source)) { - val (pub, value) = createSource() - (value, attachSink(pub)) + val (pub, value) = createSource(flowName) + (value, attachSink(pub, flowName)) } else { - val id: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]] - (attachSource(id), attachSink(id)) + val id = processorForNode[In, Out](identityTransform, flowName, 1) + (attachSource(id, flowName), attachSink(id, flowName)) } } else { - val opsSize = ops.size - val last = processorForNode(ops.head, flowName, opsSize).asInstanceOf[Processor[Any, Out]] + val (ops, opsSize) = if (optimizations.isEnabled) optimize(rawOps) else (rawOps, rawOps.length) + val last = processorForNode[Any, Out](ops.head, flowName, opsSize) val first = processorChain(last, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Any]] - (attachSource(first), attachSink(last)) + (attachSource(first, flowName), attachSink(last, flowName)) } new MaterializedPipe(source, sourceValue, sink, sinkValue) } - - private val identityTransform = Ast.Transform("identity", () ⇒ - new Transformer[Any, Any] { - override def onNext(element: Any) = List(element) - }) + //FIXME Should this be a dedicated AstNode? + private[this] val identityTransform = Ast.Transform("identity", () ⇒ FlowOps.identityTransformer[Any]) /** * INTERNAL API */ - private[akka] def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = { - val impl = actorOf(ActorProcessorFactory.props(this, op), s"$flowName-$n-${op.name}") - ActorProcessorFactory(impl) - } + private[akka] def processorForNode[In, Out](op: AstNode, flowName: String, n: Int): Processor[In, Out] = + ActorProcessorFactory[In, Out](actorOf(ActorProcessorFactory.props(this, op), s"$flowName-$n-${op.name}")) def actorOf(props: Props, name: String): ActorRef = supervisor match { case ref: LocalActorRef ⇒ - ref.underlying.attachChild(props, name, systemService = false) + ref.underlying.attachChild(props.withDispatcher(settings.dispatcher), name, systemService = false) case ref: RepointableActorRef ⇒ if (ref.isStarted) - ref.underlying.asInstanceOf[ActorCell].attachChild(props, name, systemService = false) + ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(settings.dispatcher), name, systemService = false) else { implicit val timeout = ref.system.settings.CreationTimeout - val f = (supervisor ? StreamSupervisor.Materialize(props, name)).mapTo[ActorRef] + val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(settings.dispatcher), name)).mapTo[ActorRef] Await.result(f, timeout.duration) } - case _ ⇒ - throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]") + case unknown ⇒ + throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]") } - + // FIXME Investigate possibility of using `enableOperationsFusion` in `materializeJunction` override def materializeJunction[In, Out](op: Ast.JunctionAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]]) = { - val flowName = createFlowName() - val actorName = s"$flowName-${op.name}" + val actorName = s"${createFlowName()}-${op.name}" op match { case fanin: Ast.FanInAstNode ⇒ val impl = fanin match { - case Ast.Merge ⇒ - actorOf(FairMerge.props(settings, inputCount).withDispatcher(settings.dispatcher), actorName) - case Ast.MergePreferred ⇒ - actorOf(UnfairMerge.props(settings, inputCount).withDispatcher(settings.dispatcher), actorName) - case zip: Ast.Zip ⇒ - actorOf(Zip.props(settings, zip.as).withDispatcher(settings.dispatcher), actorName) - case Ast.Concat ⇒ - actorOf(Concat.props(settings).withDispatcher(settings.dispatcher), actorName) - case Ast.FlexiMergeNode(merger) ⇒ - actorOf(FlexiMergeImpl.props(settings, inputCount, merger.createMergeLogic()). - withDispatcher(settings.dispatcher), actorName) + case Ast.Merge ⇒ actorOf(FairMerge.props(settings, inputCount), actorName) + case Ast.MergePreferred ⇒ actorOf(UnfairMerge.props(settings, inputCount), actorName) + case zip: Ast.Zip ⇒ actorOf(Zip.props(settings, zip.as), actorName) + case Ast.Concat ⇒ actorOf(Concat.props(settings), actorName) + case Ast.FlexiMergeNode(merger) ⇒ actorOf(FlexiMergeImpl.props(settings, inputCount, merger.createMergeLogic()), actorName) } val publisher = new ActorPublisher[Out](impl) @@ -253,15 +390,10 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting case fanout: Ast.FanOutAstNode ⇒ val impl = fanout match { - case Ast.Broadcast ⇒ - actorOf(Broadcast.props(settings, outputCount).withDispatcher(settings.dispatcher), actorName) - case Ast.Balance(waitForAllDownstreams) ⇒ - actorOf(Balance.props(settings, outputCount, waitForAllDownstreams).withDispatcher(settings.dispatcher), actorName) - case Ast.Unzip ⇒ - actorOf(Unzip.props(settings).withDispatcher(settings.dispatcher), actorName) - case Ast.FlexiRouteNode(route) ⇒ - actorOf(FlexiRouteImpl.props(settings, outputCount, route.createRouteLogic()). - withDispatcher(settings.dispatcher), actorName) + case Ast.Broadcast ⇒ actorOf(Broadcast.props(settings, outputCount), actorName) + case Ast.Balance(waitForAllDownstreams) ⇒ actorOf(Balance.props(settings, outputCount, waitForAllDownstreams), actorName) + case Ast.Unzip ⇒ actorOf(Unzip.props(settings), actorName) + case Ast.FlexiRouteNode(route) ⇒ actorOf(FlexiRouteImpl.props(settings, outputCount, route.createRouteLogic()), actorName) } val publishers = Vector.tabulate(outputCount)(id ⇒ new ActorPublisher[Out](impl) { @@ -271,9 +403,9 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting val subscriber = ActorSubscriber[In](impl) (List(subscriber), publishers) - case identity @ Ast.IdentityAstNode ⇒ - val id: Processor[In, Out] = processorForNode(identityTransform, identity.name, 1).asInstanceOf[Processor[In, Out]] - (List(id), List(id)) + case identity @ Ast.IdentityAstNode ⇒ // FIXME Why is IdentityAstNode a JunctionAStNode? + val id = List(processorForNode[In, Out](identityTransform, identity.name, 1)) // FIXME is `identity.name` appropriate/unique here? + (id, id) } } @@ -324,17 +456,33 @@ private[akka] object ActorProcessorFactory { import Ast._ def props(materializer: FlowMaterializer, op: AstNode): Props = { - val settings = materializer.settings + val settings = materializer.settings // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW (op match { - case Fusable(ops, _) ⇒ Props(new ActorInterpreter(materializer.settings, ops)) - case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.mkTransformer())) - case t: TimerTransform ⇒ Props(new TimerTransformerProcessorsImpl(settings, t.mkTransformer())) - case m: MapAsync ⇒ Props(new MapAsyncProcessorImpl(settings, m.f)) - case m: MapAsyncUnordered ⇒ Props(new MapAsyncUnorderedProcessorImpl(settings, m.f)) - case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) - case tt: PrefixAndTail ⇒ Props(new PrefixAndTailImpl(settings, tt.n)) - case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) - case ConcatAll ⇒ Props(new ConcatAllImpl(materializer)) + case Fused(ops, _) ⇒ Props(new ActorInterpreter(settings, ops)) + case Map(f) ⇒ Props(new ActorInterpreter(settings, List(fusing.Map(f)))) + case Filter(p) ⇒ Props(new ActorInterpreter(settings, List(fusing.Filter(p)))) + case Drop(n) ⇒ Props(new ActorInterpreter(settings, List(fusing.Drop(n)))) + case Take(n) ⇒ Props(new ActorInterpreter(settings, List(fusing.Take(n)))) + case Collect(pf) ⇒ Props(new ActorInterpreter(settings, List(fusing.Collect(pf)))) + case Scan(z, f) ⇒ Props(new ActorInterpreter(settings, List(fusing.Scan(z, f)))) + case Expand(s, f) ⇒ Props(new ActorInterpreter(settings, List(fusing.Expand(s, f)))) + case Conflate(s, f) ⇒ Props(new ActorInterpreter(settings, List(fusing.Conflate(s, f)))) + case Buffer(n, s) ⇒ Props(new ActorInterpreter(settings, List(fusing.Buffer(n, s)))) + case MapConcat(f) ⇒ Props(new ActorInterpreter(settings, List(fusing.MapConcat(f)))) + case Operate(mkOp) ⇒ Props(new ActorInterpreter(settings, List(mkOp()))) + case MapAsync(f) ⇒ Props(new MapAsyncProcessorImpl(settings, f)) + case MapAsyncUnordered(f) ⇒ Props(new MapAsyncUnorderedProcessorImpl(settings, f)) + case Grouped(n) ⇒ Props(new ActorInterpreter(settings, List(fusing.Grouped(n)))) + case GroupBy(f) ⇒ Props(new GroupByProcessorImpl(settings, f)) + case PrefixAndTail(n) ⇒ Props(new PrefixAndTailImpl(settings, n)) + case SplitWhen(p) ⇒ Props(new SplitWhenProcessorImpl(settings, p)) + case ConcatAll ⇒ Props(new ConcatAllImpl(materializer)) //FIXME closes over the materializer, is this good? + case t: Transform ⇒ + val tr = t.mkTransformer() + Props(new TransformProcessorImpl(settings, tr)) + case t: TimerTransform ⇒ + val tr = t.mkTransformer() + Props(new TimerTransformerProcessorsImpl(settings, tr)) }).withDispatcher(settings.dispatcher) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index 97ff8ccfad..fddfd9da77 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -13,13 +13,6 @@ import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings } import org.reactivestreams.{ Publisher, Subscriber } -/** - * INTERNAL API - */ -private[akka] object SimpleCallbackPublisher { - def props[T](f: () ⇒ T, settings: MaterializerSettings): Props = IteratorPublisher.props(Iterator.continually(f()), settings) -} - /** * INTERNAL API */ @@ -98,7 +91,7 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] { */ private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[_ >: T]) extends SubscriptionWithCursor[T] { override def request(elements: Long): Unit = - if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) + if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) else impl ! RequestMore(this, elements) override def cancel(): Unit = impl ! Cancel(this) } @@ -124,20 +117,23 @@ private[akka] trait SoftShutdown { this: Actor ⇒ /** * INTERNAL API */ -private[akka] object IteratorPublisherImpl { - case object Flush +private[akka] object IteratorPublisher { + private[IteratorPublisher] case object Flush + + def props[T](iterator: Iterator[T], settings: MaterializerSettings): Props = + Props(new IteratorPublisher(iterator, settings)) } /** * INTERNAL API */ -private[akka] class IteratorPublisherImpl[T](iterator: Iterator[T], settings: MaterializerSettings) +private[akka] class IteratorPublisher[T](iterator: Iterator[T], settings: MaterializerSettings) extends Actor with ActorLogging with SubscriberManagement[T] with SoftShutdown { - import IteratorPublisherImpl.Flush + import IteratorPublisher.Flush type S = ActorSubscription[T] private var demand = 0L diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala index fcedcab827..deef5342e4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala @@ -44,6 +44,7 @@ private[akka] object FuturePublisher { /** * INTERNAL API */ +//FIXME why do we need to have an actor to drive a Future? private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerSettings) extends Actor with SoftShutdown { import akka.stream.impl.FuturePublisher.FutureSubscription import akka.stream.impl.FuturePublisher.FutureSubscription.Cancel diff --git a/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala deleted file mode 100644 index 9ca213e351..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/IterablePublisher.scala +++ /dev/null @@ -1,175 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import scala.annotation.tailrec -import scala.collection.immutable -import scala.util.control.NonFatal - -import akka.actor.{ Actor, ActorRef, Props, SupervisorStrategy, Terminated } -import akka.stream.{ MaterializerSettings, ReactiveStreamsConstants } -import org.reactivestreams.{ Subscriber, Subscription } - -/** - * INTERNAL API - */ -private[akka] object IterablePublisher { - def props(iterable: immutable.Iterable[Any], settings: MaterializerSettings): Props = - Props(new IterablePublisher(iterable, settings)).withDispatcher(settings.dispatcher) - - object BasicActorSubscription { - case object Cancel - case class RequestMore(elements: Long) - } - - class BasicActorSubscription(ref: ActorRef) - extends Subscription { - import akka.stream.impl.IterablePublisher.BasicActorSubscription._ - def cancel(): Unit = ref ! Cancel - def request(elements: Long): Unit = - if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) - else ref ! RequestMore(elements) - override def toString = "BasicActorSubscription" - } -} - -/** - * INTERNAL API - * - * Elements are produced from the iterator of the iterable. Each subscriber - * makes use of its own iterable, i.e. each subscriber will receive the elements from the - * beginning of the iterable and it can consume the elements in its own pace. - */ -private[akka] class IterablePublisher(iterable: immutable.Iterable[Any], settings: MaterializerSettings) extends Actor with SoftShutdown { - import akka.stream.impl.IterablePublisher.BasicActorSubscription - - require(iterable.nonEmpty, "Use EmptyPublisher for empty iterable") - - var exposedPublisher: ActorPublisher[Any] = _ - var subscribers = Set.empty[Subscriber[Any]] - var workers = Map.empty[ActorRef, Subscriber[Any]] - - override val supervisorStrategy = SupervisorStrategy.stoppingStrategy - - def receive = { - case ExposedPublisher(publisher) ⇒ - exposedPublisher = publisher - context.become(waitingForFirstSubscriber) - case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher") - } - - def waitingForFirstSubscriber: Receive = { - case SubscribePending ⇒ - exposedPublisher.takePendingSubscribers() foreach registerSubscriber - context.become(active) - } - - def active: Receive = { - case SubscribePending ⇒ - exposedPublisher.takePendingSubscribers() foreach registerSubscriber - - case Terminated(worker) ⇒ - workerFinished(worker) - - case IterablePublisherWorker.Finished ⇒ - context.unwatch(sender) - workerFinished(sender) - } - - private def workerFinished(worker: ActorRef): Unit = { - val subscriber = workers(worker) - workers -= worker - subscribers -= subscriber - if (subscribers.isEmpty) { - exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) - softShutdown() - } - } - - def registerSubscriber(subscriber: Subscriber[Any]): Unit = { - if (subscribers(subscriber)) - subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}")) - else { - val iterator = iterable.iterator - val worker = context.watch(context.actorOf(IterablePublisherWorker.props(iterator, subscriber, - settings.maxInputBufferSize).withDispatcher(context.props.dispatcher))) - val subscription = new BasicActorSubscription(worker) - subscribers += subscriber - workers = workers.updated(worker, subscriber) - subscriber.onSubscribe(subscription) - } - } - - override def postStop(): Unit = { - if (exposedPublisher ne null) - exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) - } - -} - -/** - * INTERNAL API - */ -private[akka] object IterablePublisherWorker { - def props(iterator: Iterator[Any], subscriber: Subscriber[Any], maxPush: Int): Props = - Props(new IterablePublisherWorker(iterator, subscriber, maxPush)) - - private object PushMore - case object Finished -} - -/** - * INTERNAL API - * - * Each subscriber is served by this worker actor. It pushes elements to the - * subscriber immediately when it receives demand, but to allow cancel before - * pushing everything it sends a PushMore to itself after a batch of elements. - */ -private[akka] class IterablePublisherWorker(iterator: Iterator[Any], subscriber: Subscriber[Any], maxPush: Int) - extends Actor with SoftShutdown { - import akka.stream.impl.IterablePublisher.BasicActorSubscription._ - import akka.stream.impl.IterablePublisherWorker._ - - require(iterator.hasNext, "Iterator must not be empty") - - var pendingDemand: Long = 0L - - def receive = { - case RequestMore(elements) ⇒ - pendingDemand += elements - push() - case PushMore ⇒ - push() - case Cancel ⇒ - context.parent ! Finished - softShutdown() - } - - private def push(): Unit = { - @tailrec def doPush(n: Int): Unit = - if (pendingDemand > 0) { - pendingDemand -= 1 - val hasNext = { - subscriber.onNext(iterator.next()) - iterator.hasNext - } - if (!hasNext) { - subscriber.onComplete() - context.parent ! Finished - softShutdown() - } else if (n == 0 && pendingDemand > 0) - self ! PushMore - else - doPush(n - 1) - } - - try doPush(maxPush) catch { - case NonFatal(e) ⇒ - subscriber.onError(e) - context.parent ! Finished - softShutdown() - } - } -} - diff --git a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala deleted file mode 100644 index d3ec1f32d9..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/IteratorPublisher.scala +++ /dev/null @@ -1,16 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import akka.actor.Props -import akka.stream.MaterializerSettings - -/** - * INTERNAL API - */ -private[akka] object IteratorPublisher { - def props[T](iterator: Iterator[T], settings: MaterializerSettings): Props = - Props(new IteratorPublisherImpl(iterator, settings)).withDispatcher(settings.dispatcher) - -} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala b/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala index c6bb590c42..728e6fad09 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SynchronousPublisherFromIterable.scala @@ -3,6 +3,7 @@ */ package akka.stream.impl +import akka.stream.ReactiveStreamsConstants import org.reactivestreams.{ Publisher, Subscriber, Subscription } import scala.annotation.tailrec @@ -13,25 +14,43 @@ import scala.util.control.NonFatal * INTERNAL API */ private[akka] object SynchronousPublisherFromIterable { - def apply[T](iterable: immutable.Iterable[T]): Publisher[T] = - if (iterable.isEmpty) EmptyPublisher[T] - else new SynchronousPublisherFromIterable(iterable) + def apply[T](iterable: immutable.Iterable[T]): Publisher[T] = new SynchronousPublisherFromIterable(iterable) - private class IteratorSubscription[T](subscriber: Subscriber[T], iterator: Iterator[T]) extends Subscription { + object IteratorSubscription { + def apply[T](subscriber: Subscriber[T], iterator: Iterator[T]): Unit = + new IteratorSubscription[T](subscriber, iterator).init() + } + + private[this] final class IteratorSubscription[T](subscriber: Subscriber[T], iterator: Iterator[T]) extends Subscription { var done = false var pendingDemand = 0L var pushing = false + def init(): Unit = try { + if (!iterator.hasNext) { + cancel() + subscriber.onSubscribe(this) + subscriber.onComplete() + } else { + subscriber.onSubscribe(this) + } + } catch { + case NonFatal(e) ⇒ + cancel() + subscriber.onError(e) + } + override def cancel(): Unit = done = true override def request(elements: Long): Unit = { + if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg) @tailrec def pushNext(): Unit = { if (!done) if (iterator.isEmpty) { - done = true + cancel() subscriber.onComplete() - } else if (pendingDemand != 0) { + } else if (pendingDemand > 0) { pendingDemand -= 1 subscriber.onNext(iterator.next()) pushNext() @@ -39,7 +58,7 @@ private[akka] object SynchronousPublisherFromIterable { } if (pushing) - pendingDemand += elements // reentrant call to requestMore from onNext + pendingDemand += elements // reentrant call to requestMore from onNext // FIXME This severely lacks overflow checks else { try { pushing = true @@ -47,7 +66,7 @@ private[akka] object SynchronousPublisherFromIterable { pushNext() } catch { case NonFatal(e) ⇒ - done = true + cancel() subscriber.onError(e) } finally { pushing = false } } @@ -71,10 +90,9 @@ private[akka] object SynchronousPublisherFromIterable { */ private[akka] class SynchronousPublisherFromIterable[T](private val iterable: immutable.Iterable[T]) extends Publisher[T] { - import akka.stream.impl.SynchronousPublisherFromIterable.IteratorSubscription + import SynchronousPublisherFromIterable.IteratorSubscription - override def subscribe(subscriber: Subscriber[_ >: T]): Unit = - subscriber.onSubscribe(new IteratorSubscription(subscriber, iterable.iterator)) + override def subscribe(subscriber: Subscriber[_ >: T]): Unit = IteratorSubscription(subscriber, iterable.iterator) //FIXME what if .iterator throws? - override def toString: String = s"SynchronousPublisherFromIterable(${iterable.mkString(", ")})" + override def toString: String = getClass.getSimpleName } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala index 3e8eed3d38..e55eff8757 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/ActorInterpreter.scala @@ -245,7 +245,7 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundaryOp /** * INTERNAL API */ -private[akka] class ActorInterpreter(val settings: MaterializerSettings, val ops: Seq[Op[_, _, _, _, _]]) +private[akka] class ActorInterpreter(settings: MaterializerSettings, ops: Seq[Op[_, _, _, _, _]]) extends Actor { private val upstream = new BatchingActorInputBoundary(settings.initialInputBufferSize) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index f4a4b5d9c7..d3069151eb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -11,23 +11,39 @@ import scala.collection.immutable /** * INTERNAL API */ -private[akka] case class Map[In, Out](f: In ⇒ Out) extends TransitivePullOp[In, Out] { +private[akka] final case class Map[In, Out](f: In ⇒ Out) extends TransitivePullOp[In, Out] { override def onPush(elem: In, ctxt: Context[Out]): Directive = ctxt.push(f(elem)) } /** * INTERNAL API */ -private[akka] case class Filter[T](p: T ⇒ Boolean) extends TransitivePullOp[T, T] { +private[akka] final case class Filter[T](p: T ⇒ Boolean) extends TransitivePullOp[T, T] { override def onPush(elem: T, ctxt: Context[T]): Directive = if (p(elem)) ctxt.push(elem) else ctxt.pull() } +private[akka] final object Collect { + // Cached function that can be used with PartialFunction.applyOrElse to ensure that A) the guard is only applied once, + // and the caller can check the returned value with Collect.notApplied to query whether the PF was applied or not. + // Prior art: https://github.com/scala/scala/blob/v2.11.4/src/library/scala/collection/immutable/List.scala#L458 + final val NotApplied: Any ⇒ Any = _ ⇒ Collect.NotApplied +} + +private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends TransitivePullOp[In, Out] { + import Collect.NotApplied + override def onPush(elem: In, ctxt: Context[Out]): Directive = + pf.applyOrElse(elem, NotApplied) match { + case NotApplied ⇒ ctxt.pull() + case result: Out ⇒ ctxt.push(result) + } +} + /** * INTERNAL API */ -private[akka] case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out]) extends DeterministicOp[In, Out] { +private[akka] final case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out]) extends DeterministicOp[In, Out] { private var currentIterator: Iterator[Out] = Iterator.empty override def onPush(elem: In, ctxt: Context[Out]): Directive = { @@ -44,20 +60,21 @@ private[akka] case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out]) extend /** * INTERNAL API */ -private[akka] case class Take[T](count: Int) extends TransitivePullOp[T, T] { +private[akka] final case class Take[T](count: Int) extends TransitivePullOp[T, T] { private var left: Int = count override def onPush(elem: T, ctxt: Context[T]): Directive = { left -= 1 - if (left == 0) ctxt.pushAndFinish(elem) - else ctxt.push(elem) + if (left > 0) ctxt.push(elem) + else if (left == 0) ctxt.pushAndFinish(elem) + else ctxt.finish() //Handle negative take counts } } /** * INTERNAL API */ -private[akka] case class Drop[T](count: Int) extends TransitivePullOp[T, T] { +private[akka] final case class Drop[T](count: Int) extends TransitivePullOp[T, T] { private var left: Int = count override def onPush(elem: T, ctxt: Context[T]): Directive = if (left > 0) { @@ -69,7 +86,26 @@ private[akka] case class Drop[T](count: Int) extends TransitivePullOp[T, T] { /** * INTERNAL API */ -private[akka] case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends DeterministicOp[In, Out] { +private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends DeterministicOp[In, Out] { + private var aggregator = zero + + override def onPush(elem: In, ctxt: Context[Out]): Directive = { + val old = aggregator + aggregator = f(old, elem) + ctxt.push(old) + } + + override def onPull(ctxt: Context[Out]): Directive = + if (isFinishing) ctxt.pushAndFinish(aggregator) + else ctxt.pull() + + override def onUpstreamFinish(ctxt: Context[Out]): TerminationDirective = ctxt.absorbTermination() +} + +/** + * INTERNAL API + */ +private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends DeterministicOp[In, Out] { private var aggregator = zero override def onPush(elem: In, ctxt: Context[Out]): Directive = { @@ -87,31 +123,42 @@ private[akka] case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends /** * INTERNAL API */ -private[akka] case class Grouped[T](n: Int) extends DeterministicOp[T, immutable.Seq[T]] { - private var buf: Vector[T] = Vector.empty +private[akka] final case class Grouped[T](n: Int) extends DeterministicOp[T, immutable.Seq[T]] { + private val buf = { + val b = Vector.newBuilder[T] + b.sizeHint(n) + b + } + private var left = n override def onPush(elem: T, ctxt: Context[immutable.Seq[T]]): Directive = { - buf :+= elem - if (buf.size == n) { - val emit = buf - buf = Vector.empty + buf += elem + left -= 1 + if (left == 0) { + val emit = buf.result() + buf.clear() + left = n ctxt.push(emit) } else ctxt.pull() } override def onPull(ctxt: Context[immutable.Seq[T]]): Directive = - if (isFinishing) ctxt.pushAndFinish(buf) - else ctxt.pull() + if (isFinishing) { + val elem = buf.result() + buf.clear() //FIXME null out the reference to the `buf`? + left = n + ctxt.pushAndFinish(elem) + } else ctxt.pull() override def onUpstreamFinish(ctxt: Context[immutable.Seq[T]]): TerminationDirective = - if (buf.isEmpty) ctxt.finish() + if (left == n) ctxt.finish() else ctxt.absorbTermination() } /** * INTERNAL API */ -private[akka] case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedOp[T, T] { +private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedOp[T, T] { import OverflowStrategy._ private val buffer = FixedSizeBuffer(size) @@ -170,7 +217,7 @@ private[akka] case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy /** * INTERNAL API */ -private[akka] case class Completed[T]() extends DeterministicOp[T, T] { +private[akka] final case class Completed[T]() extends DeterministicOp[T, T] { override def onPush(elem: T, ctxt: Context[T]): Directive = ctxt.finish() override def onPull(ctxt: Context[T]): Directive = ctxt.finish() } @@ -178,14 +225,15 @@ private[akka] case class Completed[T]() extends DeterministicOp[T, T] { /** * INTERNAL API */ -private[akka] case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (Out, In) ⇒ Out) extends DetachedOp[In, Out] { +private[akka] final case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (Out, In) ⇒ Out) extends DetachedOp[In, Out] { private var agg: Any = null override def onPush(elem: In, ctxt: DetachedContext[Out]): UpstreamDirective = { - if (agg == null) agg = seed(elem) - else agg = aggregate(agg.asInstanceOf[Out], elem) + agg = if (agg == null) seed(elem) + else aggregate(agg.asInstanceOf[Out], elem) - if (!isHolding) ctxt.pull() else { + if (!isHolding) ctxt.pull() + else { val result = agg.asInstanceOf[Out] agg = null ctxt.pushAndPull(result) @@ -214,7 +262,7 @@ private[akka] case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (Out, In /** * INTERNAL API */ -private[akka] case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapolate: Seed ⇒ (Out, Seed)) extends DetachedOp[In, Out] { +private[akka] final case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapolate: Seed ⇒ (Out, Seed)) extends DetachedOp[In, Out] { private var s: Any = null override def onPush(elem: In, ctxt: DetachedContext[Out]): UpstreamDirective = { @@ -231,9 +279,8 @@ private[akka] case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapolate: S else { val (emit, newS) = extrapolate(s.asInstanceOf[Seed]) s = newS - if (isHolding) { - ctxt.pushAndPull(emit) - } else ctxt.push(emit) + if (isHolding) ctxt.pushAndPull(emit) + else ctxt.push(emit) } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index f8b3af2689..36cc22c090 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -178,6 +178,15 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) { def grouped(n: Int): javadsl.Flow[In, java.util.List[Out @uncheckedVariance]] = new Flow(delegate.grouped(n).map(_.asJava)) // FIXME optimize to one step + /** + * Similar to `fold` but is not a terminal operation, + * emits its current value which starts at `zero` and then + * applies the current and next value to the given function `f`, + * emitting the next current value. + */ + def scan[T](zero: T)(f: japi.Function2[T, Out, T]): javadsl.Flow[In, T] = + new Flow(delegate.scan(zero)(f.apply)) + /** * Chunk up this stream into groups of elements received within a time window, * or limited by the given number of elements, whatever happens first. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 02276e6b96..b551faed02 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -65,8 +65,8 @@ object Source { * in accordance with the demand coming from the downstream transformation * steps. */ - def from[O](iterator: java.util.Iterator[O]): javadsl.Source[O] = - new Source(scaladsl.Source(iterator.asScala)) + def from[O](f: japi.Creator[java.util.Iterator[O]]): javadsl.Source[O] = + new Source(scaladsl.Source(() ⇒ f.create().asScala)) /** * Helper to create [[Source]] from `Iterable`. @@ -87,14 +87,6 @@ object Source { def from[O](iterable: java.lang.Iterable[O]): javadsl.Source[O] = new Source(scaladsl.Source(akka.stream.javadsl.japi.Util.immutableIterable(iterable))) - /** - * Define the sequence of elements to be produced by the given closure. - * The stream ends normally when evaluation of the closure returns a `None`. - * The stream ends exceptionally when an exception is thrown from the closure. - */ - def from[O](f: japi.Creator[akka.japi.Option[O]]): javadsl.Source[O] = - new Source(scaladsl.Source(() ⇒ f.create().asScala)) - /** * Start a new `Source` from the given `Future`. The stream will consist of * one element when the `Future` is completed with a successful value, which @@ -293,6 +285,15 @@ class Source[+Out](delegate: scaladsl.Source[Out]) { def grouped(n: Int): javadsl.Source[java.util.List[Out @uncheckedVariance]] = new Source(delegate.grouped(n).map(_.asJava)) + /** + * Similar to `fold` but is not a terminal operation, + * emits its current value which starts at `zero` and then + * applies the current and next value to the given function `f`, + * yielding the next current value. + */ + def scan[T](zero: T)(f: japi.Function2[T, Out, T]): javadsl.Source[T] = + new Source(delegate.scan(zero)(f.apply)) + /** * Chunk up this stream into groups of elements received within a time window, * or limited by the given number of elements, whatever happens first. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala index 02cc7658e6..872ba25d89 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/ActorFlowSource.scala @@ -14,6 +14,7 @@ import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration +import scala.util.control.NonFatal import scala.util.{ Success, Failure } sealed trait ActorFlowSource[+Out] extends Source[Out] { @@ -51,14 +52,12 @@ sealed trait ActorFlowSource[+Out] extends Source[Out] { override type Repr[+O] = SourcePipe[O] - private def sourcePipe = Pipe.empty[Out].withSource(this) + override def via[T](flow: Flow[Out, T]): Source[T] = Pipe.empty[Out].withSource(this).via(flow) - override def via[T](flow: Flow[Out, T]): Source[T] = sourcePipe.via(flow) - - override def to(sink: Sink[Out]): RunnableFlow = sourcePipe.to(sink) + override def to(sink: Sink[Out]): RunnableFlow = Pipe.empty[Out].withSource(this).to(sink) /** INTERNAL API */ - override private[scaladsl] def andThen[U](op: AstNode) = SourcePipe(this, List(op)) + override private[scaladsl] def andThen[U](op: AstNode) = SourcePipe(this, List(op)) //FIXME raw addition of AstNodes } /** @@ -100,22 +99,6 @@ final case class PublisherSource[Out](p: Publisher[Out]) extends SimpleActorFlow override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = (p, ()) } -/** - * Start a new `Source` from the given Iterator. The produced stream of elements - * will continue until the iterator runs empty or fails during evaluation of - * the `next()` method. Elements are pulled out of the iterator - * in accordance with the demand coming from the downstream transformation - * steps. - */ -final case class IteratorSource[Out](iterator: Iterator[Out]) extends SimpleActorFlowSource[Out] { - override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) = - create(materializer, flowName)._1.subscribe(flowSubscriber) - override def isActive: Boolean = true - override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = - (ActorPublisher[Out](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings), - name = s"$flowName-0-iterator")), ()) -} - /** * Starts a new `Source` from the given `Iterable`. This is like starting from an * Iterator, but every Subscriber directly attached to the Publisher of this @@ -127,36 +110,14 @@ final case class IterableSource[Out](iterable: immutable.Iterable[Out]) extends create(materializer, flowName)._1.subscribe(flowSubscriber) override def isActive: Boolean = true override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = - if (iterable.isEmpty) (EmptyPublisher[Out], ()) - else (ActorPublisher[Out](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings), - name = s"$flowName-0-iterable")), ()) + (SynchronousPublisherFromIterable(iterable), ()) //FIXME This should probably be an AsynchronousPublisherFromIterable } -final class ThunkIterator[Out](thunk: () ⇒ Option[Out]) extends Iterator[Out] { - require(thunk ne null, "thunk is not allowed to be null") - private[this] var value: Option[Out] = null - - private[this] def advance(): Unit = - value = thunk() match { - case null ⇒ throw new NullPointerException("Thunk is not allowed to return null") - case option ⇒ option - } - - @tailrec override final def hasNext: Boolean = value match { - case null ⇒ - advance(); hasNext - case option ⇒ option.isDefined +//FIXME SerialVersionUID? +final class FuncIterable[Out](f: () ⇒ Iterator[Out]) extends immutable.Iterable[Out] { + override def iterator: Iterator[Out] = try f() catch { + case NonFatal(e) ⇒ Iterator.continually(throw e) //FIXME not rock-solid, is the least one can say } - - @tailrec override final def next(): Out = value match { - case null ⇒ - advance(); next() - case Some(next) ⇒ - advance(); next - case None ⇒ Iterator.empty.next() - } - - override def toString: String = "ThunkIterator" } /** @@ -172,13 +133,12 @@ final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowS override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = future.value match { case Some(Success(element)) ⇒ - (ActorPublisher[Out](materializer.actorOf(IterablePublisher.props(List(element), materializer.settings), - name = s"$flowName-0-future")), ()) + (SynchronousPublisherFromIterable(List(element)), ()) // Option is not Iterable. sigh case Some(Failure(t)) ⇒ (ErrorPublisher(t).asInstanceOf[Publisher[Out]], ()) case None ⇒ (ActorPublisher[Out](materializer.actorOf(FuturePublisher.props(future, materializer.settings), - name = s"$flowName-0-future")), ()) + name = s"$flowName-0-future")), ()) // FIXME optimize } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 8bf3810a9b..04f7ec3f3d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -4,7 +4,6 @@ package akka.stream.scaladsl import akka.stream.impl.Ast._ -import akka.stream.impl.fusing import akka.stream.{ TimerTransformer, Transformer, OverflowStrategy } import akka.util.Collections.EmptyImmutableSeq import scala.collection.immutable @@ -102,19 +101,18 @@ trait RunnableFlow { trait FlowOps[+Out] { import FlowOps._ type Repr[+O] - import akka.stream.impl.fusing /** * Transform this stream by applying the given function to each of the elements * as they pass through this processing step. */ - def map[T](f: Out ⇒ T): Repr[T] = andThen(Fusable(Vector(fusing.Map(f)), "map")) + def map[T](f: Out ⇒ T): Repr[T] = andThen(Map(f.asInstanceOf[Any ⇒ Any])) /** * Transform each input element into a sequence of output elements that is * then flattened into the output stream. */ - def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[T] = andThen(Fusable(Vector(fusing.MapConcat(f)), "mapConcat")) + def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[T] = andThen(MapConcat(f.asInstanceOf[Any ⇒ immutable.Seq[Any]])) /** * Transform this stream by applying the given function to each of the elements @@ -144,16 +142,14 @@ trait FlowOps[+Out] { /** * Only pass on those elements that satisfy the given predicate. */ - def filter(p: Out ⇒ Boolean): Repr[Out] = andThen(Fusable(Vector(fusing.Filter(p)), "filter")) + def filter(p: Out ⇒ Boolean): Repr[Out] = andThen(Filter(p.asInstanceOf[Any ⇒ Boolean])) /** * Transform this stream by applying the given partial function to each of the elements * on which the function is defined as they pass through this processing step. * Non-matching elements are filtered out. */ - def collect[T](pf: PartialFunction[Out, T]): Repr[T] = andThen(Fusable(Vector( - fusing.Filter(pf.isDefinedAt), - fusing.Map(pf.apply)), "filter")) + def collect[T](pf: PartialFunction[Out, T]): Repr[T] = andThen(Collect(pf.asInstanceOf[PartialFunction[Any, Any]])) /** * Chunk up this stream into groups of the given size, with the last group @@ -161,10 +157,15 @@ trait FlowOps[+Out] { * * `n` must be positive, otherwise IllegalArgumentException is thrown. */ - def grouped(n: Int): Repr[immutable.Seq[Out]] = { - require(n > 0, "n must be greater than 0") - andThen(Fusable(Vector(fusing.Grouped(n)), "grouped")) - } + def grouped(n: Int): Repr[immutable.Seq[Out]] = andThen(Grouped(n)) + + /** + * Similar to `fold` but is not a terminal operation, + * emits its current value which starts at `zero` and then + * applies the current and next value to the given function `f`, + * emitting the next current value. + */ + def scan[T](zero: T)(f: (T, Out) ⇒ T): Repr[T] = andThen(Scan(zero, f.asInstanceOf[(Any, Any) ⇒ Any])) /** * Chunk up this stream into groups of elements received within a time window, @@ -207,9 +208,7 @@ trait FlowOps[+Out] { * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. */ - def drop(n: Int): Repr[Out] = - if (n <= 0) andThen(Fusable(Vector.empty, "drop")) - else andThen(Fusable(Vector(fusing.Drop(n)), "drop")) + def drop(n: Int): Repr[Out] = andThen(Drop(n)) /** * Discard the elements received within the given duration at beginning of the stream. @@ -225,7 +224,7 @@ trait FlowOps[+Out] { def onNext(in: Out) = delegate.onNext(in) def onTimer(timerKey: Any) = { - delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]] + delegate = FlowOps.identityTransformer[Out] Nil } }) @@ -239,9 +238,7 @@ trait FlowOps[+Out] { * The stream will be completed without producing any elements if `n` is zero * or negative. */ - def take(n: Int): Repr[Out] = - if (n <= 0) andThen(Fusable(Vector(fusing.Completed()), "take")) - else andThen(Fusable(Vector(fusing.Take(n)), "take")) + def take(n: Int): Repr[Out] = andThen(Take(n)) /** * Terminate processing (and cancel the upstream publisher) after the given @@ -256,12 +253,12 @@ trait FlowOps[+Out] { timerTransform("takeWithin", () ⇒ new TimerTransformer[Out, Out] { scheduleOnce(TakeWithinTimerKey, d) - var delegate: Transformer[Out, Out] = identityTransformer.asInstanceOf[Transformer[Out, Out]] + var delegate: Transformer[Out, Out] = FlowOps.identityTransformer[Out] - def onNext(in: Out) = delegate.onNext(in) + override def onNext(in: Out) = delegate.onNext(in) override def isComplete = delegate.isComplete - def onTimer(timerKey: Any) = { - delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]] + override def onTimer(timerKey: Any) = { + delegate = FlowOps.completedTransformer[Out] Nil } }) @@ -278,7 +275,7 @@ trait FlowOps[+Out] { * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate */ def conflate[S](seed: Out ⇒ S)(aggregate: (S, Out) ⇒ S): Repr[S] = - andThen(Fusable(Vector(fusing.Conflate(seed, aggregate)), "conflate")) + andThen(Conflate(seed.asInstanceOf[Any ⇒ Any], aggregate.asInstanceOf[(Any, Any) ⇒ Any])) /** * Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older @@ -294,7 +291,7 @@ trait FlowOps[+Out] { * state. */ def expand[S, U](seed: Out ⇒ S)(extrapolate: S ⇒ (U, S)): Repr[U] = - andThen(Fusable(Vector(fusing.Expand(seed, extrapolate)), "expand")) + andThen(Expand(seed.asInstanceOf[Any ⇒ Any], extrapolate.asInstanceOf[Any ⇒ (Any, Any)])) /** * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. @@ -304,10 +301,8 @@ trait FlowOps[+Out] { * @param size The size of the buffer in element count * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] = { - require(size > 0, s"Buffer size must be larger than zero but was [$size]") - andThen(Fusable(Vector(fusing.Buffer(size, overflowStrategy)), "buffer")) - } + def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] = + andThen(Buffer(size, overflowStrategy)) /** * Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]] @@ -418,18 +413,21 @@ trait FlowOps[+Out] { /** * INTERNAL API */ -private[scaladsl] object FlowOps { +private[stream] object FlowOps { private case object TakeWithinTimerKey private case object DropWithinTimerKey private case object GroupedWithinTimerKey - private val takeCompletedTransformer: Transformer[Any, Any] = new Transformer[Any, Any] { + private[this] final case object CompletedTransformer extends Transformer[Any, Any] { override def onNext(elem: Any) = Nil override def isComplete = true } - private val identityTransformer: Transformer[Any, Any] = new Transformer[Any, Any] { + private[this] final case object IdentityTransformer extends Transformer[Any, Any] { override def onNext(elem: Any) = List(elem) } + + def completedTransformer[T]: Transformer[T, T] = CompletedTransformer.asInstanceOf[Transformer[T, T]] + def identityTransformer[T]: Transformer[T, T] = IdentityTransformer.asInstanceOf[Transformer[T, T]] } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala index 36080f13f7..700f116cf0 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Pipe.scala @@ -19,14 +19,14 @@ private[stream] object Pipe { private[stream] final case class Pipe[-In, +Out](ops: List[AstNode]) extends Flow[In, Out] { override type Repr[+O] = Pipe[In @uncheckedVariance, O] - override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = this.copy(ops = op :: ops) + override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = this.copy(ops = op :: ops) // FIXME raw addition of AstNodes private[stream] def withSink(out: Sink[Out]): SinkPipe[In] = SinkPipe(out, ops) private[stream] def withSource(in: Source[In]): SourcePipe[Out] = SourcePipe(in, ops) override def via[T](flow: Flow[Out, T]): Flow[In, T] = flow match { - case p: Pipe[T, In] ⇒ Pipe(p.ops ++: ops) + case p: Pipe[Out, T] ⇒ this.appendPipe(p) case gf: GraphFlow[Out, _, _, T] ⇒ gf.prepend(this) case x ⇒ FlowGraphInternal.throwUnsupportedValue(x) } @@ -37,7 +37,7 @@ private[stream] final case class Pipe[-In, +Out](ops: List[AstNode]) extends Flo case d: Sink[Out] ⇒ this.withSink(d) } - private[stream] def appendPipe[T](pipe: Pipe[Out, T]): Pipe[In, T] = Pipe(pipe.ops ++: ops) + private[stream] def appendPipe[T](pipe: Pipe[Out, T]): Pipe[In, T] = Pipe(pipe.ops ++: ops) // FIXME raw addition of AstNodes } /** @@ -47,7 +47,7 @@ private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNod private[stream] def withSource(in: Source[In]): RunnablePipe = RunnablePipe(in, output, ops) - private[stream] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops) + private[stream] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops) // FIXME raw addition of AstNodes } @@ -57,11 +57,11 @@ private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNod private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[AstNode]) extends Source[Out] { override type Repr[+O] = SourcePipe[O] - override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, op :: ops) + override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, op :: ops) // FIXME raw addition of AstNodes private[stream] def withSink(out: Sink[Out]): RunnablePipe = RunnablePipe(input, out, ops) - private[stream] def appendPipe[T](pipe: Pipe[Out, T]): SourcePipe[T] = SourcePipe(input, pipe.ops ++: ops) + private[stream] def appendPipe[T](pipe: Pipe[Out, T]): SourcePipe[T] = SourcePipe(input, pipe.ops ++: ops) // FIXME raw addition of AstNodes override def via[T](flow: Flow[Out, T]): Source[T] = flow match { case p: Pipe[Out, T] ⇒ appendPipe(p) @@ -70,7 +70,7 @@ private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[As } override def to(sink: Sink[Out]): RunnableFlow = sink match { - case sp: SinkPipe[Out] ⇒ RunnablePipe(input, sp.output, sp.ops ++: ops) + case sp: SinkPipe[Out] ⇒ RunnablePipe(input, sp.output, sp.ops ++: ops) // FIXME raw addition of AstNodes case g: GraphSink[Out, _] ⇒ g.prepend(this) case d: Sink[Out] ⇒ this.withSink(d) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index afb225a3d1..65939b4ee7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -3,13 +3,14 @@ */ package akka.stream.scaladsl +import scala.language.higherKinds + import akka.actor.Props import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, SynchronousPublisherFromIterable } import org.reactivestreams.Publisher import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.concurrent.Future -import scala.language.higherKinds import akka.stream.FlowMaterializer /** @@ -35,8 +36,7 @@ trait Source[+Out] extends FlowOps[Out] { * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]]. */ - def runWith(sink: Sink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType = - to(sink).run().get(sink) + def runWith(sink: Sink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType = to(sink).run().get(sink) /** * Shortcut for running this `Source` with a fold function. @@ -46,8 +46,7 @@ trait Source[+Out] extends FlowOps[Out] { * function evaluation when the input stream ends, or completed with `Failure` * if there is an error is signaled in the stream. */ - def fold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: FlowMaterializer): Future[U] = - runWith(FoldSink(zero)(f)) + def fold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: FlowMaterializer): Future[U] = runWith(FoldSink(zero)(f)) // FIXME why is fold always an end step? /** * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked @@ -56,8 +55,7 @@ trait Source[+Out] extends FlowOps[Out] { * normal end of the stream, or completed with `Failure` if there is an error is signaled in * the stream. */ - def foreach(f: Out ⇒ Unit)(implicit materializer: FlowMaterializer): Future[Unit] = - runWith(ForeachSink(f)) + def foreach(f: Out ⇒ Unit)(implicit materializer: FlowMaterializer): Future[Unit] = runWith(ForeachSink(f)) /** * Concatenates a second source so that the first element @@ -90,15 +88,15 @@ object Source { /** * Helper to create [[Source]] from `Iterator`. - * Example usage: `Source(Seq(1,2,3).iterator)` + * Example usage: `Source(() => Iterator.from(0))` * - * Start a new `Source` from the given Iterator. The produced stream of elements - * will continue until the iterator runs empty or fails during evaluation of - * the `next()` method. Elements are pulled out of the iterator - * in accordance with the demand coming from the downstream transformation - * steps. + * Start a new `Source` from the given function that produces anIterator. + * The produced stream of elements will continue until the iterator runs empty + * or fails during evaluation of the `next()` method. + * Elements are pulled out of the iterator in accordance with the demand coming + * from the downstream transformation steps. */ - def apply[T](iterator: Iterator[T]): Source[T] = IteratorSource(iterator) + def apply[T](f: () ⇒ Iterator[T]): Source[T] = apply(new FuncIterable(f)) /** * Helper to create [[Source]] from `Iterable`. @@ -111,13 +109,6 @@ object Source { */ def apply[T](iterable: immutable.Iterable[T]): Source[T] = IterableSource(iterable) - /** - * Define the sequence of elements to be produced by the given closure. - * The stream ends normally when evaluation of the closure returns a `None`. - * The stream ends exceptionally when an exception is thrown from the closure. - */ - def apply[T](f: () ⇒ Option[T]): Source[T] = IteratorSource(new ThunkIterator(f)) - /** * Start a new `Source` from the given `Future`. The stream will consist of * one element when the `Future` is completed with a successful value, which @@ -133,7 +124,7 @@ object Source { * element is produced it will not receive that tick element later. It will * receive new tick elements as soon as it has requested more elements. */ - def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): Source[T] = + def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): Source[T] = // FIXME why is tick () => T and not T? TickSource(initialDelay, interval, tick) /** @@ -166,7 +157,7 @@ object Source { * Create a `Source` with one element. * Every connected `Sink` of this stream will see an individual stream consisting of one element. */ - def singleton[T](element: T): Source[T] = apply(SynchronousPublisherFromIterable(List(element))) + def singleton[T](element: T): Source[T] = apply(SynchronousPublisherFromIterable(List(element))) // FIXME optimize /** * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.