=str - Various Flow and FlowMaterializer improvements

- Switches from using size-of-1/2 Vectors to using Lists
  - Fixes an issue where processorForNode wouldn't use the dispatcher form the settings
  - Adds a dedicated Collect fusion op
  - Adds various simplifications to ActorBasedFlowMaterializer
  - Adds FIXMEs where appropriate
  - Switches `grouped` to use a VectorBuilder
  - Adds support for `scan`
  - ActorBasedFlowMaterializer now uses Iterator instead of head+tail decomp on Seqs
  - Identity and Completed Transformers are now cached
  - Adds dedicated AstNodes for all combinators
  - Adds a hook-in point for fusion in `ActorBasedFlowMaterializer`
  - Adds support for `Operate` an AstNode with a function that create a fusing.Op
  - Adds experimental and slow optimizer as a PoC
  - Adds verification that Ast.Fused does not exist in optimizer input
This commit is contained in:
Viktor Klang 2014-11-09 21:09:50 +01:00
parent efe9331b69
commit db4e5c4a29
51 changed files with 678 additions and 732 deletions

View file

@ -152,7 +152,7 @@ private[http] object StreamUtils {
} else ByteString.empty
}
Props(new IteratorPublisherImpl(iterator, materializer.settings)).withDispatcher(materializer.settings.fileIODispatcher)
IteratorPublisher.props(iterator, materializer.settings).withDispatcher(materializer.settings.fileIODispatcher)
}
new AtomicBoolean(false) with SimpleActorFlowSource[ByteString] {

View file

@ -158,7 +158,7 @@ class CodingDirectivesSpec extends RoutingSpec {
"correctly encode the chunk stream produced by a chunked response" in {
val text = "This is a somewhat lengthy text that is being chunked by the autochunk directive!"
val textChunks =
text.grouped(8).map { chars
() text.grouped(8).map { chars
Chunk(chars.mkString): ChunkStreamPart
}
val chunkedTextEntity = HttpEntity.Chunked(MediaTypes.`text/plain`, Source(textChunks))

View file

@ -3,7 +3,9 @@
*/
package akka.stream.tck
import scala.collection.immutable
import akka.event.Logging
import scala.collection.{ mutable, immutable }
import akka.actor.ActorSystem
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Sink
@ -24,14 +26,14 @@ abstract class AkkaIdentityProcessorVerification[T](val system: ActorSystem, env
system.eventStream.publish(TestEvent.Mute(EventFilter[RuntimeException]("Test exception")))
/** Readable way to ignore TCK specs; Return this for `createErrorStatePublisher` to skip tests including it */
final def ignored: Publisher[T] = null
final def ignored: mutable.Publisher[T] = null
def this(system: ActorSystem, printlnDebug: Boolean) {
this(system, new TestEnvironment(Timeouts.defaultTimeoutMillis(system), printlnDebug), Timeouts.publisherShutdownTimeoutMillis)
}
def this(printlnDebug: Boolean) {
this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug)
this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug)
}
def this() {

View file

@ -3,6 +3,8 @@
*/
package akka.stream.tck
import akka.event.Logging
import scala.concurrent.duration._
import akka.actor.ActorSystem
@ -27,7 +29,7 @@ abstract class AkkaPublisherVerification[T](val system: ActorSystem, env: TestEn
}
def this(printlnDebug: Boolean) {
this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug)
this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug)
}
def this() {

View file

@ -3,6 +3,8 @@
*/
package akka.stream.tck
import akka.event.Logging
import scala.collection.immutable
import scala.concurrent.duration._
import akka.actor.ActorSystem
@ -27,7 +29,7 @@ abstract class AkkaSubscriberBlackboxVerification[T](val system: ActorSystem, en
}
def this(printlnDebug: Boolean) {
this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug)
this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug)
}
def this() {
@ -44,7 +46,7 @@ abstract class AkkaSubscriberWhiteboxVerification[T](val system: ActorSystem, en
}
def this(printlnDebug: Boolean) {
this(ActorSystem(classOf[IterablePublisherTest].getSimpleName, AkkaSpec.testConf), printlnDebug)
this(ActorSystem(Logging.simpleName(classOf[IterablePublisherTest]), AkkaSpec.testConf), printlnDebug)
}
def this() {

View file

@ -22,7 +22,7 @@ class FusableProcessorTest extends AkkaIdentityProcessorVerification[Int] {
val flowName = getClass.getSimpleName + "-" + processorCounter.incrementAndGet()
val processor = materializer.asInstanceOf[ActorBasedFlowMaterializer].processorForNode(
Ast.Fusable(Vector(akka.stream.impl.fusing.Map[Int, Int](identity)), "identity"), flowName, 1)
Ast.Fused(List(akka.stream.impl.fusing.Map[Int, Int](identity)), "identity"), flowName, 1)
processor.asInstanceOf[Processor[Int, Int]]
}

View file

@ -19,5 +19,4 @@ class IterablePublisherTest extends AkkaPublisherVerification[Int] {
Source(iterable).runWith(Sink.publisher)
}
}

View file

@ -1,21 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import scala.collection.immutable
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams.Publisher
class IteratorPublisherTest extends AkkaPublisherVerification[Int](true) {
def createPublisher(elements: Long): Publisher[Int] = {
val iterable: immutable.Iterable[Int] =
if (elements == 0) new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
else 0 until elements.toInt
Source(iterable).runWith(Sink.publisher)
}
}

View file

@ -1,18 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.tck
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import org.reactivestreams._
class SimpleCallbackPublisherTest extends AkkaPublisherVerification[Int] {
def createPublisher(elements: Long): Publisher[Int] = {
val iter = Iterator from 0
val iter2 = if (elements > 0) iter take elements.toInt else iter
Source(() if (iter2.hasNext) Some(iter2.next()) else None).runWith(Sink.publisher)
}
}

View file

@ -3,6 +3,7 @@ package akka.stream.testkit
import akka.stream.MaterializerSettings
import akka.stream.scaladsl._
import org.reactivestreams.Publisher
import scala.collection.immutable
import scala.util.control.NoStackTrace
import akka.stream.FlowMaterializer
@ -40,7 +41,7 @@ abstract class TwoStreamsSetup extends AkkaSpec {
def completedPublisher[T]: Publisher[T] = StreamTestKit.emptyPublisher[T]
def nonemptyPublisher[T](elems: Iterator[T]): Publisher[T] = Source(elems).runWith(Sink.publisher)
def nonemptyPublisher[T](elems: immutable.Iterable[T]): Publisher[T] = Source(elems).runWith(Sink.publisher)
def soonToFailPublisher[T]: Publisher[T] = StreamTestKit.lazyErrorPublisher[T](TestException)

View file

@ -61,7 +61,7 @@ public class ActorSubscriberTest extends StreamTest {
final JavaTestKit probe = new JavaTestKit(system);
final ActorRef ref = system.actorOf(Props.create(TestSubscriber.class, probe.getRef()).withDispatcher("akka.test.stream-dispatcher"));
final Subscriber<Integer> subscriber = UntypedActorSubscriber.create(ref);
final java.util.Iterator<Integer> input = Arrays.asList(1, 2, 3).iterator();
final java.lang.Iterable<Integer> input = Arrays.asList(1, 2, 3);
Source.from(input).runWith(Sink.create(subscriber), materializer);

View file

@ -43,7 +43,7 @@ public class FlowTest extends StreamTest {
public void mustBeAbleToUseSimpleOperators() {
final JavaTestKit probe = new JavaTestKit(system);
final String[] lookup = { "a", "b", "c", "d", "e", "f" };
final java.util.Iterator<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5).iterator();
final java.lang.Iterable<Integer> input = Arrays.asList(0, 1, 2, 3, 4, 5);
final Source<Integer> ints = Source.from(input);
ints.drop(2).take(3).takeWithin(FiniteDuration.create(10, TimeUnit.SECONDS)).map(new Function<Integer, String>() {
@ -79,7 +79,7 @@ public class FlowTest extends StreamTest {
@Test
public void mustBeAbleToUseVoidTypeInForeach() {
final JavaTestKit probe = new JavaTestKit(system);
final java.util.Iterator<String> input = Arrays.asList("a", "b", "c").iterator();
final java.lang.Iterable<String> input = Arrays.asList("a", "b", "c");
Source<String> ints = Source.from(input);
Future<BoxedUnit> completion = ints.foreach(new Procedure<String>() {
@ -414,17 +414,11 @@ public class FlowTest extends StreamTest {
@Test
public void mustBeAbleToUseCallableInput() {
final JavaTestKit probe = new JavaTestKit(system);
final akka.stream.javadsl.japi.Creator<akka.japi.Option<Integer>> input = new akka.stream.javadsl.japi.Creator<akka.japi.Option<Integer>>() {
int countdown = 5;
final Iterable<Integer> input1 = Arrays.asList(4,3,2,1,0);
final akka.stream.javadsl.japi.Creator<Iterator<Integer>> input = new akka.stream.javadsl.japi.Creator<Iterator<Integer>>() {
@Override
public akka.japi.Option<Integer> create() {
if (countdown == 0) {
return akka.japi.Option.none();
} else {
countdown -= 1;
return akka.japi.Option.option(countdown);
}
public Iterator<Integer> create() {
return input1.iterator();
}
};
Source.from(input).foreach(new Procedure<Integer>() {

View file

@ -29,14 +29,15 @@ class SynchronousPublisherFromIterableSpec extends AkkaSpec {
"complete empty" in {
val p = SynchronousPublisherFromIterable(List.empty[Int])
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectComplete()
c.expectNoMsg(100.millis)
def verifyNewSubscriber(i: Int): Unit = {
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectSubscription()
c.expectComplete()
c.expectNoMsg(100.millis)
}
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c2)
c2.expectComplete()
1 to 10 foreach verifyNewSubscriber
}
"produce elements with multiple subscribers" in {
@ -171,8 +172,8 @@ class SynchronousPublisherFromIterableSpec extends AkkaSpec {
probe.expectMsg("complete")
}
"have nice toString" in {
SynchronousPublisherFromIterable(List(1, 2, 3)).toString should be("SynchronousPublisherFromIterable(1, 2, 3)")
"have a toString that doesn't OOME" in {
SynchronousPublisherFromIterable(List(1, 2, 3)).toString should be(classOf[SynchronousPublisherFromIterable[_]].getSimpleName)
}
}
}

View file

@ -6,6 +6,7 @@ package akka.stream.io
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import akka.util.ByteString
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.scaladsl.Source
@ -44,7 +45,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val server = new Server()
val (tcpProcessor, serverConnection) = connect(server)
val testInput = Iterator.range(0, 256).map(ByteString(_))
val testInput = (0 to 255).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
serverConnection.read(256)
@ -59,7 +60,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val server = new Server()
val (tcpProcessor, serverConnection) = connect(server)
val testInput = Iterator.range(0, 256).map(ByteString(_))
val testInput = (0 to 255).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
for (in testInput) serverConnection.write(in)
@ -155,7 +156,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val server = echoServer(serverAddress)
val conn = connect(serverAddress)
val testInput = Iterator.range(0, 256).map(ByteString(_))
val testInput = (0 to 255).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
Source(testInput).runWith(Sink(conn.outputStream))
@ -175,7 +176,7 @@ class TcpFlowSpec extends AkkaSpec with TcpHelper {
val conn2 = connect(serverAddress)
val conn3 = connect(serverAddress)
val testInput = Iterator.range(0, 256).map(ByteString(_))
val testInput = (0 to 255).map(ByteString(_))
val expectedOutput = ByteString(Array.tabulate(256)(_.asInstanceOf[Byte]))
Source(testInput).runWith(Sink(conn1.outputStream))

View file

@ -24,20 +24,20 @@ class FlowBufferSpec extends AkkaSpec {
"Buffer" must {
"pass elements through normally in backpressured mode" in {
val future: Future[Seq[Int]] = Source((1 to 1000).iterator).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).
val future: Future[Seq[Int]] = Source(1 to 1000).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).
runWith(Sink.head)
Await.result(future, 3.seconds) should be(1 to 1000)
}
"pass elements through normally in backpressured mode with buffer size one" in {
val futureSink = Sink.head[Seq[Int]]
val future = Source((1 to 1000).iterator).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).
val future = Source(1 to 1000).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).
runWith(Sink.head)
Await.result(future, 3.seconds) should be(1 to 1000)
}
"pass elements through a chain of backpressured buffers of different size" in {
val future = Source((1 to 1000).iterator)
val future = Source(1 to 1000)
.buffer(1, overflowStrategy = OverflowStrategy.backpressure)
.buffer(10, overflowStrategy = OverflowStrategy.backpressure)
.buffer(256, overflowStrategy = OverflowStrategy.backpressure)

View file

@ -23,11 +23,11 @@ class FlowConcatAllSpec extends AkkaSpec {
val testException = new Exception("test") with NoStackTrace
"work in the happy case" in {
val s1 = Source((1 to 2).iterator)
val s1 = Source(1 to 2)
val s2 = Source(List.empty[Int])
val s3 = Source(List(3))
val s4 = Source((4 to 6).iterator)
val s5 = Source((7 to 10).iterator)
val s4 = Source(4 to 6)
val s5 = Source(7 to 10)
val main = Source(List(s1, s2, s3, s4, s5))
@ -42,7 +42,7 @@ class FlowConcatAllSpec extends AkkaSpec {
"work together with SplitWhen" in {
val subscriber = StreamTestKit.SubscriberProbe[Int]()
Source((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).runWith(Sink(subscriber))
Source(1 to 10).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).runWith(Sink(subscriber))
val subscription = subscriber.expectSubscription()
subscription.request(10)
subscriber.probe.receiveN(10) should be((1 to 10).map(StreamTestKit.OnNext(_)))

View file

@ -57,7 +57,7 @@ class FlowConflateSpec extends AkkaSpec {
}
"work on a variable rate chain" in {
val future = Source((1 to 1000).iterator)
val future = Source(1 to 1000)
.conflate(seed = i i)(aggregate = (sum, i) sum + i)
.map { i if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i }
.fold(0)(_ + _)

View file

@ -67,7 +67,7 @@ class FlowExpandSpec extends AkkaSpec {
}
"work on a variable rate chain" in {
val future = Source((1 to 100).iterator)
val future = Source(1 to 100)
.map { i if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i }
.expand(seed = i i)(extrapolate = i (i, i))
.fold(Set.empty[Int])(_ + _)

View file

@ -30,8 +30,7 @@ class FlowFilterSpec extends AkkaSpec with ScriptedTest {
implicit val materializer = FlowMaterializer(settings)
val probe = StreamTestKit.SubscriberProbe[Int]()
Source(Iterator.fill(1000)(0) ++ List(1)).filter(_ != 0).
runWith(Sink(probe))
Source(List.fill(1000)(0) ::: List(1)).filter(_ != 0).runWith(Sink(probe))
val subscription = probe.expectSubscription()
for (_ 1 to 10000) {

View file

@ -24,6 +24,8 @@ class FlowGraphCompileSpec extends AkkaSpec {
}
}
val apples = () Iterator.continually(new Apple)
val f1 = Flow[String].transform("f1", op[String, String])
val f2 = Flow[String].transform("f2", op[String, String])
val f3 = Flow[String].transform("f3", op[String, String])
@ -314,8 +316,8 @@ class FlowGraphCompileSpec extends AkkaSpec {
FlowGraph { b
val merge = Merge[Fruit]
b.
addEdge(Source[Fruit](() Some(new Apple)), Flow[Fruit], merge).
addEdge(Source[Apple](() Some(new Apple)), Flow[Apple], merge).
addEdge(Source[Fruit](apples), Flow[Fruit], merge).
addEdge(Source[Apple](apples), Flow[Apple], merge).
addEdge(merge, Flow[Fruit].map(identity), out)
}
}
@ -330,8 +332,8 @@ class FlowGraphCompileSpec extends AkkaSpec {
val unzip = Unzip[Int, String]
val whatever = Sink.publisher[Any]
import FlowGraphImplicits._
Source[Fruit](() Some(new Apple)) ~> merge
Source[Apple](() Some(new Apple)) ~> merge
Source[Fruit](apples) ~> merge
Source[Apple](apples) ~> merge
inA ~> merge
inB ~> merge
inA ~> Flow[Fruit].map(identity) ~> merge
@ -341,9 +343,9 @@ class FlowGraphCompileSpec extends AkkaSpec {
UndefinedSource[Apple] ~> Flow[Apple].map(identity) ~> merge
merge ~> Flow[Fruit].map(identity) ~> outA
Source[Apple](() Some(new Apple)) ~> Broadcast[Apple] ~> merge
Source[Apple](() Some(new Apple)) ~> Broadcast[Apple] ~> outB
Source[Apple](() Some(new Apple)) ~> Broadcast[Apple] ~> UndefinedSink[Fruit]
Source[Apple](apples) ~> Broadcast[Apple] ~> merge
Source[Apple](apples) ~> Broadcast[Apple] ~> outB
Source[Apple](apples) ~> Broadcast[Apple] ~> UndefinedSink[Fruit]
inB ~> Broadcast[Apple] ~> merge
Source(List(1 -> "a", 2 -> "b", 3 -> "c")) ~> unzip.in

View file

@ -45,7 +45,7 @@ class FlowGraphInitSpec extends AkkaSpec {
val s = Source(1 to 5)
val b = Broadcast[Int]
val sink: KeyedSink[Int] = Sink.foreach[Int](i i)
val sink: KeyedSink[Int] = Sink.foreach[Int](_ ())
val otherSink: KeyedSink[Int] = Sink.foreach[Int](i 2 * i)
FlowGraph { implicit builder

View file

@ -33,7 +33,7 @@ class FlowGroupBySpec extends AkkaSpec {
}
class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) {
val source = Source((1 to elementCount).iterator).runWith(Sink.publisher)
val source = Source(1 to elementCount).runWith(Sink.publisher)
val groupStream = Source(source).groupBy(_ % groupCount).runWith(Sink.publisher)
val masterSubscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]()

View file

@ -18,15 +18,19 @@ class FlowGroupedSpec extends AkkaSpec with ScriptedTest {
"A Grouped" must {
def randomSeq(n: Int) = immutable.Seq.fill(n)(random.nextInt())
def randomTest(n: Int) = { val s = randomSeq(n); s -> immutable.Seq(s) }
"group evenly" in {
def script = Script(TestConfig.RandomTestRange map { _ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }: _*)
TestConfig.RandomTestRange foreach (_ runScript(script, settings)(_.grouped(3)))
val testLen = random.nextInt(1, 16)
def script = Script(TestConfig.RandomTestRange map { _ randomTest(testLen) }: _*)
TestConfig.RandomTestRange foreach (_ runScript(script, settings)(_.grouped(testLen)))
}
"group with rest" in {
def script = Script((TestConfig.RandomTestRange.map { _ val x, y, z = random.nextInt(); Seq(x, y, z) -> Seq(immutable.Seq(x, y, z)) }
:+ { val x = random.nextInt(); Seq(x) -> Seq(immutable.Seq(x)) }): _*)
TestConfig.RandomTestRange foreach (_ runScript(script, settings)(_.grouped(3)))
val testLen = random.nextInt(1, 16)
def script = Script((TestConfig.RandomTestRange.map { _ randomTest(testLen) } :+ randomTest(1)): _*)
TestConfig.RandomTestRange foreach (_ runScript(script, settings)(_.grouped(testLen)))
}
}

View file

@ -20,7 +20,7 @@ class FlowIterableSpec extends AkkaSpec {
"A Flow based on an iterable" must {
"produce elements" in {
val p = Source(List(1, 2, 3)).runWith(Sink.publisher)
val p = Source(1 to 3).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -37,16 +37,18 @@ class FlowIterableSpec extends AkkaSpec {
val p = Source(List.empty[Int]).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectSubscription()
c.expectComplete()
c.expectNoMsg(100.millis)
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c2)
c2.expectSubscription()
c2.expectComplete()
}
"produce elements with multiple subscribers" in {
val p = Source(List(1, 2, 3)).runWith(Sink.publisher)
val p = Source(1 to 3).runWith(Sink.publisher)
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
@ -70,7 +72,7 @@ class FlowIterableSpec extends AkkaSpec {
}
"produce elements to later subscriber" in {
val p = Source(List(1, 2, 3)).runWith(Sink.publisher)
val p = Source(1 to 3).runWith(Sink.publisher)
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
@ -96,7 +98,7 @@ class FlowIterableSpec extends AkkaSpec {
}
"produce elements with one transformation step" in {
val p = Source(List(1, 2, 3)).map(_ * 2).runWith(Sink.publisher)
val p = Source(1 to 3).map(_ * 2).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -117,23 +119,5 @@ class FlowIterableSpec extends AkkaSpec {
c.expectNext(8)
c.expectComplete()
}
"allow cancel before receiving all elements" in {
val count = 100000
val p = Source(1 to count).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(count)
c.expectNext(1)
sub.cancel()
val got = c.probe.receiveWhile(3.seconds) {
case _: OnNext[_]
case OnComplete fail("Cancel expected before OnComplete")
case OnError(e) fail(e)
}
got.size should be < (count - 1)
}
}
}

View file

@ -21,9 +21,9 @@ class FlowIteratorSpec extends AkkaSpec {
implicit val materializer = FlowMaterializer(settings)
"A Flow based on an iterator" must {
"A Flow based on an iterator producing function" must {
"produce elements" in {
val p = Source(List(1, 2, 3).iterator).runWith(Sink.publisher)
val p = Source(() (1 to 3).iterator).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -37,7 +37,7 @@ class FlowIteratorSpec extends AkkaSpec {
}
"complete empty" in {
val p = Source[Int](Iterator.empty).runWith(Sink.publisher)
val p = Source[Int](() Iterator.empty).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectSubscription()
@ -46,11 +46,12 @@ class FlowIteratorSpec extends AkkaSpec {
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c2)
c2.expectSubscription()
c2.expectComplete()
}
"produce elements with multiple subscribers" in {
val p = Source(List(1, 2, 3).iterator).runWith(Sink.publisher)
val p = Source(() (1 to 3).iterator).runWith(Sink.publisher)
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
@ -74,7 +75,7 @@ class FlowIteratorSpec extends AkkaSpec {
}
"produce elements to later subscriber" in {
val p = Source(List(1, 2, 3).iterator).runWith(Sink.publisher)
val p = Source(() (1 to 3).iterator).runWith(Sink.publisher)
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
@ -86,7 +87,7 @@ class FlowIteratorSpec extends AkkaSpec {
p.subscribe(c2)
val sub2 = c2.expectSubscription()
sub2.request(3)
// element 1 is already gone
c2.expectNext(1)
c2.expectNext(2)
c2.expectNext(3)
c2.expectComplete()
@ -97,7 +98,7 @@ class FlowIteratorSpec extends AkkaSpec {
}
"produce elements with one transformation step" in {
val p = Source(List(1, 2, 3).iterator).map(_ * 2).runWith(Sink.publisher)
val p = Source(() (1 to 3).iterator).map(_ * 2).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -109,7 +110,7 @@ class FlowIteratorSpec extends AkkaSpec {
}
"produce elements with two transformation steps" in {
val p = Source(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).runWith(Sink.publisher)
val p = Source(() (1 to 4).iterator).filter(_ % 2 == 0).map(_ * 2).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
@ -118,23 +119,5 @@ class FlowIteratorSpec extends AkkaSpec {
c.expectNext(8)
c.expectComplete()
}
"allow cancel before receiving all elements" in {
val count = 100000
val p = Source((1 to count).iterator).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(count)
c.expectNext(1)
sub.cancel()
val got = c.probe.receiveWhile(3.seconds) {
case _: OnNext[_]
case OnComplete fail("Cancel expected before OnComplete")
case OnError(e) fail(e)
}
got.size should be < (count - 1)
}
}
}

View file

@ -27,7 +27,7 @@ class FlowMapSpec extends AkkaSpec with ScriptedTest {
"not blow up with high request counts" in {
val probe = StreamTestKit.SubscriberProbe[Int]()
Source(List(1).iterator).
Source(List(1)).
map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1).map(_ + 1).
runWith(Sink.publisher).subscribe(probe)

View file

@ -49,7 +49,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
"work on longer inputs" in {
val futureSink = newHeadSink
val fut = Source((1 to 10).iterator).prefixAndTail(5).runWith(futureSink)
val fut = Source(1 to 10).prefixAndTail(5).runWith(futureSink)
val (takes, tail) = Await.result(fut, 3.seconds)
takes should be(1 to 5)
@ -60,7 +60,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
"handle zero take count" in {
val futureSink = newHeadSink
val fut = Source((1 to 10).iterator).prefixAndTail(0).runWith(futureSink)
val fut = Source(1 to 10).prefixAndTail(0).runWith(futureSink)
val (takes, tail) = Await.result(fut, 3.seconds)
takes should be(Nil)
@ -71,7 +71,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
"handle negative take count" in {
val futureSink = newHeadSink
val fut = Source((1 to 10).iterator).prefixAndTail(-1).runWith(futureSink)
val fut = Source(1 to 10).prefixAndTail(-1).runWith(futureSink)
val (takes, tail) = Await.result(fut, 3.seconds)
takes should be(Nil)
@ -82,7 +82,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
"work if size of take is equal to stream size" in {
val futureSink = newHeadSink
val fut = Source((1 to 10).iterator).prefixAndTail(10).runWith(futureSink)
val fut = Source(1 to 10).prefixAndTail(10).runWith(futureSink)
val (takes, tail) = Await.result(fut, 3.seconds)
takes should be(1 to 10)

View file

@ -0,0 +1,44 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current random }
import scala.collection.immutable
import akka.stream.FlowMaterializer
import akka.stream.MaterializerSettings
import akka.stream.testkit.AkkaSpec
class FlowScanSpec extends AkkaSpec {
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
.withFanOutBuffer(initialSize = 1, maxSize = 16)
implicit val materializer = FlowMaterializer(settings)
"A Scan" must {
def scan(s: Source[Int], duration: Duration = 5.seconds): immutable.Seq[Int] =
Await.result(s.scan(0)(_ + _).fold(immutable.Seq.empty[Int])(_ :+ _), duration)
"Scan" in {
val v = Vector.fill(random.nextInt(100, 1000))(random.nextInt())
scan(Source(v)) should be(v.scan(0)(_ + _))
}
"Scan empty failed" in {
val e = new Exception("fail!")
intercept[Exception](scan(Source.failed[Int](e))) should be theSameInstanceAs (e)
}
"Scan empty" in {
val v = Vector.empty[Int]
scan(Source(v)) should be(v.scan(0)(_ + _))
}
}
}

View file

@ -13,8 +13,8 @@ import scala.concurrent.duration._
import akka.actor.{ Props, ActorRefFactory, ActorRef }
import akka.stream.{ TransformerLike, MaterializerSettings }
import akka.stream.FlowMaterializer
import akka.stream.impl.{ ActorProcessorFactory, TransformProcessorImpl, StreamSupervisor, ActorBasedFlowMaterializer }
import akka.stream.impl.Ast.{ Transform, Fusable, AstNode }
import akka.stream.impl._
import akka.stream.impl.Ast._
import akka.stream.testkit.{ StreamTestKit, AkkaSpec }
import akka.stream.testkit.ChainSetup
import akka.testkit._
@ -25,6 +25,7 @@ import org.reactivestreams.{ Processor, Subscriber, Publisher }
object FlowSpec {
class Fruit
class Apple extends Fruit
val apples = () Iterator.continually(new Apple)
val flowNameCounter = new AtomicLong(0)
@ -68,13 +69,24 @@ object FlowSpec {
supervisor: ActorRef,
flowNameCounter: AtomicLong,
namePrefix: String,
brokenMessage: Any) extends ActorBasedFlowMaterializer(settings, supervisor, flowNameCounter, namePrefix) {
optimizations: Optimizations,
brokenMessage: Any) extends ActorBasedFlowMaterializer(settings, supervisor, flowNameCounter, namePrefix, optimizations) {
override def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
override def processorForNode[In, Out](op: AstNode, flowName: String, n: Int): Processor[In, Out] = {
val props = op match {
case t: Transform Props(new BrokenTransformProcessorImpl(settings, t.mkTransformer(), brokenMessage))
case f: Fusable Props(new BrokenActorInterpreter(settings, f.ops, brokenMessage)).withDispatcher(settings.dispatcher)
case o ActorProcessorFactory.props(this, o)
case t: Transform Props(new BrokenTransformProcessorImpl(settings, t.mkTransformer(), brokenMessage))
case f: Fused Props(new BrokenActorInterpreter(settings, f.ops, brokenMessage)).withDispatcher(settings.dispatcher)
case Map(f) Props(new BrokenActorInterpreter(settings, List(fusing.Map(f)), brokenMessage))
case Filter(p) Props(new BrokenActorInterpreter(settings, List(fusing.Filter(p)), brokenMessage))
case Drop(n) Props(new BrokenActorInterpreter(settings, List(fusing.Drop(n)), brokenMessage))
case Take(n) Props(new BrokenActorInterpreter(settings, List(fusing.Take(n)), brokenMessage))
case Collect(pf) Props(new BrokenActorInterpreter(settings, List(fusing.Collect(pf)), brokenMessage))
case Scan(z, f) Props(new BrokenActorInterpreter(settings, List(fusing.Scan(z, f)), brokenMessage))
case Expand(s, f) Props(new BrokenActorInterpreter(settings, List(fusing.Expand(s, f)), brokenMessage))
case Conflate(s, f) Props(new BrokenActorInterpreter(settings, List(fusing.Conflate(s, f)), brokenMessage))
case Buffer(n, s) Props(new BrokenActorInterpreter(settings, List(fusing.Buffer(n, s)), brokenMessage))
case MapConcat(f) Props(new BrokenActorInterpreter(settings, List(fusing.MapConcat(f)), brokenMessage))
case o ActorProcessorFactory.props(this, o)
}
val impl = actorOf(props, s"$flowName-$n-${op.name}")
ActorProcessorFactory(impl)
@ -87,6 +99,7 @@ object FlowSpec {
context.actorOf(StreamSupervisor.props(settings).withDispatcher(settings.dispatcher)),
flowNameCounter,
"brokenflow",
Optimizations.none,
brokenMessage)
}
}
@ -316,11 +329,11 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
}
"be covariant" in {
val f1: Source[Fruit] = Source[Fruit](() Some(new Apple))
val p1: Publisher[Fruit] = Source[Fruit](() Some(new Apple)).runWith(Sink.publisher)
val f2: Source[Source[Fruit]] = Source[Fruit](() Some(new Apple)).splitWhen(_ true)
val f3: Source[(Boolean, Source[Fruit])] = Source[Fruit](() Some(new Apple)).groupBy(_ true)
val f4: Source[(immutable.Seq[Fruit], Source[Fruit])] = Source[Fruit](() Some(new Apple)).prefixAndTail(1)
val f1: Source[Fruit] = Source[Fruit](apples)
val p1: Publisher[Fruit] = Source[Fruit](apples).runWith(Sink.publisher)
val f2: Source[Source[Fruit]] = Source[Fruit](apples).splitWhen(_ true)
val f3: Source[(Boolean, Source[Fruit])] = Source[Fruit](apples).groupBy(_ true)
val f4: Source[(immutable.Seq[Fruit], Source[Fruit])] = Source[Fruit](apples).prefixAndTail(1)
val d1: Flow[String, Source[Fruit]] = Flow[String].map(_ new Apple).splitWhen(_ true)
val d2: Flow[String, (Boolean, Source[Fruit])] = Flow[String].map(_ new Apple).groupBy(_ true)
val d3: Flow[String, (immutable.Seq[Apple], Source[Fruit])] = Flow[String].map(_ new Apple).prefixAndTail(1)

View file

@ -32,7 +32,7 @@ class FlowSplitWhenSpec extends AkkaSpec {
}
class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) {
val source = Source((1 to elementCount).iterator)
val source = Source(1 to elementCount)
val groupStream = source.splitWhen(_ == splitWhen).runWith(Sink.publisher)
val masterSubscriber = StreamTestKit.SubscriberProbe[Source[Int]]()

View file

@ -1,65 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import scala.concurrent.duration._
import akka.stream.FlowMaterializer
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.OnComplete
import akka.stream.testkit.StreamTestKit.OnError
import akka.stream.testkit.StreamTestKit.OnNext
class FlowThunkSpec extends AkkaSpec {
implicit val materializer = FlowMaterializer()
"A Flow based on a thunk generator" must {
"produce elements" in {
val iter = List(1, 2, 3).iterator
val p = Source(() if (iter.hasNext) Some(iter.next()) else None).map(_ + 10).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(1)
c.expectNext(11)
c.expectNoMsg(100.millis)
sub.request(3)
c.expectNext(12)
c.expectNext(13)
c.expectComplete()
}
"complete empty" in {
val p = Source(() None).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(1)
c.expectComplete()
c.expectNoMsg(100.millis)
}
"allow cancel before receiving all elements" in {
val count = 100000
val iter = (1 to count).iterator
val p = Source(() if (iter.hasNext) Some(iter.next()) else None).runWith(Sink.publisher)
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(count)
c.expectNext(1)
sub.cancel()
val got = c.probe.receiveWhile(3.seconds) {
case _: OnNext[_]
case OnComplete fail("Cancel expected before OnComplete")
case OnError(e) fail(e)
}
got.size should be < (count - 1)
}
}
}

View file

@ -41,7 +41,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"A Flow with transformRecover operations" must {
"produce one-to-one transformation as expected" in {
val p = Source(List(1, 2, 3).iterator).runWith(Sink.publisher)
val p = Source(1 to 3).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
var tot = 0
@ -69,7 +69,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"produce one-to-several transformation as expected" in {
val p = Source(List(1, 2, 3).iterator).runWith(Sink.publisher)
val p = Source(1 to 3).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
var tot = 0
@ -100,7 +100,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"produce dropping transformation as expected" in {
val p = Source(List(1, 2, 3, 4).iterator).runWith(Sink.publisher)
val p = Source(1 to 4).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
var tot = 0
@ -128,7 +128,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"produce multi-step transformation as expected" in {
val p = Source(List("a", "bc", "def").iterator).runWith(Sink.publisher)
val p = Source(List("a", "bc", "def")).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new TryRecoveryTransformer[String, Int] {
var concat = ""
@ -171,7 +171,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"invoke onComplete when done" in {
val p = Source(List("a").iterator).runWith(Sink.publisher)
val p = Source(List("a")).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new TryRecoveryTransformer[String, String] {
var s = ""
@ -241,7 +241,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"report error when exception is thrown" in {
val p = Source(List(1, 2, 3).iterator).runWith(Sink.publisher)
val p = Source(1 to 3).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = {
@ -267,7 +267,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
"report error after emitted elements" in {
EventFilter[IllegalArgumentException]("two not allowed") intercept {
val p2 = Source(List(1, 2, 3).iterator).
val p2 = Source(1 to 3).
mapConcat { elem
if (elem == 2) throw new IllegalArgumentException("two not allowed")
else (1 to 5).map(elem * 100 + _)
@ -367,7 +367,7 @@ class FlowTransformRecoverSpec extends AkkaSpec {
}
"support cancel as expected" in {
val p = Source(List(1, 2, 3).iterator).runWith(Sink.publisher)
val p = Source(1 to 3).runWith(Sink.publisher)
val p2 = Source(p).
transform("transform", () new Transformer[Int, Int] {
override def onNext(elem: Int) = List(elem, elem)

View file

@ -27,11 +27,11 @@ class GraphConcatSpec extends TwoStreamsSetup {
val concat1 = Concat[Int]("concat1")
val concat2 = Concat[Int]("concat2")
Source(List.empty[Int].iterator) ~> concat1.first
Source((1 to 4).iterator) ~> concat1.second
Source(List.empty[Int]) ~> concat1.first
Source(1 to 4) ~> concat1.second
concat1.out ~> concat2.first
Source((5 to 10).iterator) ~> concat2.second
Source(5 to 10) ~> concat2.second
concat2.out ~> Sink(probe)
}.run()
@ -49,7 +49,7 @@ class GraphConcatSpec extends TwoStreamsSetup {
commonTests()
"work with one immediately completed and one nonempty publisher" in {
val subscriber1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator))
val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4))
val subscription1 = subscriber1.expectSubscription()
subscription1.request(5)
subscriber1.expectNext(1)
@ -58,7 +58,7 @@ class GraphConcatSpec extends TwoStreamsSetup {
subscriber1.expectNext(4)
subscriber1.expectComplete()
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher)
val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher)
val subscription2 = subscriber2.expectSubscription()
subscription2.request(5)
subscriber2.expectNext(1)
@ -69,7 +69,7 @@ class GraphConcatSpec extends TwoStreamsSetup {
}
"work with one delayed completed and one nonempty publisher" in {
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator))
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4))
val subscription1 = subscriber1.expectSubscription()
subscription1.request(5)
subscriber1.expectNext(1)
@ -78,7 +78,7 @@ class GraphConcatSpec extends TwoStreamsSetup {
subscriber1.expectNext(4)
subscriber1.expectComplete()
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher)
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher)
val subscription2 = subscriber2.expectSubscription()
subscription2.request(5)
subscriber2.expectNext(1)
@ -89,18 +89,18 @@ class GraphConcatSpec extends TwoStreamsSetup {
}
"work with one immediately failed and one nonempty publisher" in {
val subscriber1 = setup(failedPublisher, nonemptyPublisher((1 to 4).iterator))
val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), failedPublisher)
val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher)
subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
}
"work with one delayed failed and one nonempty publisher" in {
val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher((1 to 4).iterator))
val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToFailPublisher)
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher)
subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
}

View file

@ -20,9 +20,9 @@ class GraphMergeSpec extends TwoStreamsSetup {
"work in the happy case" in {
// Different input sizes (4 and 6)
val source1 = Source((0 to 3).iterator)
val source2 = Source((4 to 9).iterator)
val source3 = Source(List.empty[Int].iterator)
val source1 = Source(0 to 3)
val source2 = Source(4 to 9)
val source3 = Source(List[Int]())
val probe = StreamTestKit.SubscriberProbe[Int]()
FlowGraph { implicit b
@ -54,7 +54,7 @@ class GraphMergeSpec extends TwoStreamsSetup {
val source3 = Source(List(3))
val source4 = Source(List(4))
val source5 = Source(List(5))
val source6 = Source(List.empty[Int])
val source6 = Source(List[Int]())
val probe = StreamTestKit.SubscriberProbe[Int]()
@ -85,7 +85,7 @@ class GraphMergeSpec extends TwoStreamsSetup {
commonTests()
"work with one immediately completed and one nonempty publisher" in {
val subscriber1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator))
val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4))
val subscription1 = subscriber1.expectSubscription()
subscription1.request(4)
subscriber1.expectNext(1)
@ -94,7 +94,7 @@ class GraphMergeSpec extends TwoStreamsSetup {
subscriber1.expectNext(4)
subscriber1.expectComplete()
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher)
val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher)
val subscription2 = subscriber2.expectSubscription()
subscription2.request(4)
subscriber2.expectNext(1)
@ -105,7 +105,7 @@ class GraphMergeSpec extends TwoStreamsSetup {
}
"work with one delayed completed and one nonempty publisher" in {
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator))
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4))
val subscription1 = subscriber1.expectSubscription()
subscription1.request(4)
subscriber1.expectNext(1)
@ -114,7 +114,7 @@ class GraphMergeSpec extends TwoStreamsSetup {
subscriber1.expectNext(4)
subscriber1.expectComplete()
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher)
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher)
val subscription2 = subscriber2.expectSubscription()
subscription2.request(4)
subscriber2.expectNext(1)

View file

@ -42,34 +42,34 @@ class GraphZipSpec extends TwoStreamsSetup {
commonTests()
"work with one immediately completed and one nonempty publisher" in {
val subscriber1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator))
val subscriber1 = setup(completedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectCompletedOrSubscriptionFollowedByComplete()
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher)
val subscriber2 = setup(nonemptyPublisher(1 to 4), completedPublisher)
subscriber2.expectCompletedOrSubscriptionFollowedByComplete()
}
"work with one delayed completed and one nonempty publisher" in {
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator))
val subscriber1 = setup(soonToCompletePublisher, nonemptyPublisher(1 to 4))
subscriber1.expectCompletedOrSubscriptionFollowedByComplete()
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher)
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToCompletePublisher)
subscriber2.expectCompletedOrSubscriptionFollowedByComplete()
}
"work with one immediately failed and one nonempty publisher" in {
val subscriber1 = setup(failedPublisher, nonemptyPublisher((1 to 4).iterator))
val subscriber1 = setup(failedPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), failedPublisher)
val subscriber2 = setup(nonemptyPublisher(1 to 4), failedPublisher)
subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
}
"work with one delayed failed and one nonempty publisher" in {
val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher((1 to 4).iterator))
val subscriber1 = setup(soonToFailPublisher, nonemptyPublisher(1 to 4))
subscriber1.expectErrorOrSubscriptionFollowedByError(TestException)
val subscriber2 = setup(nonemptyPublisher((1 to 4).iterator), soonToFailPublisher)
val subscriber2 = setup(nonemptyPublisher(1 to 4), soonToFailPublisher)
val subscription2 = subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
}

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.impl.{ Optimizations, ActorBasedFlowMaterializer }
import akka.stream.testkit.AkkaSpec
import akka.testkit._
import scala.concurrent.duration._
import scala.concurrent.Await
class OptimizingActorBasedFlowMaterializerSpec extends AkkaSpec with ImplicitSender {
"ActorBasedFlowMaterializer" must {
//FIXME Add more and meaningful tests to verify that optimizations occur and have the same semantics as the non-optimized code
"optimize filter + map" in {
implicit val mat = FlowMaterializer().asInstanceOf[ActorBasedFlowMaterializer].copy(optimizations = Optimizations.all)
val f = Source(1 to 100).
drop(4).
drop(5).
transform("identity", () FlowOps.identityTransformer).
filter(_ % 2 == 0).
map(_ * 2).
map(identity).
take(20).
take(10).
drop(5).
fold(0)(_ + _)
val expected = (1 to 100).
drop(9).
filter(_ % 2 == 0).
map(_ * 2).
take(10).
drop(5).
fold(0)(_ + _)
Await.result(f, 5.seconds) should be(expected)
}
}
}

View file

@ -6,10 +6,7 @@ package akka.stream
import java.util.Locale
import java.util.concurrent.TimeUnit
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.impl.Ast
import akka.stream.impl.FlowNameCounter
import akka.stream.impl.StreamSupervisor
import akka.stream.impl._
import scala.collection.immutable
@ -63,7 +60,8 @@ object FlowMaterializer {
materializerSettings,
context.actorOf(StreamSupervisor.props(materializerSettings).withDispatcher(materializerSettings.dispatcher)),
FlowNameCounter(system).counter,
namePrefix)
namePrefix,
optimizations = Optimizations.none)
}
/**

View file

@ -5,6 +5,7 @@ package akka.stream.impl
import java.util.concurrent.atomic.AtomicLong
import akka.event.Logging
import akka.stream.impl.fusing.{ ActorInterpreter, Op }
import scala.annotation.tailrec
@ -35,37 +36,74 @@ import org.reactivestreams.{ Processor, Publisher, Subscriber }
* INTERNAL API
*/
private[akka] object Ast {
sealed trait AstNode {
sealed abstract class AstNode {
def name: String
}
// FIXME Replace with Operate
final case class Transform(name: String, mkTransformer: () Transformer[Any, Any]) extends AstNode
// FIXME Replace with Operate
final case class TimerTransform(name: String, mkTransformer: () TimerTransformer[Any, Any]) extends AstNode
case class Transform(name: String, mkTransformer: () Transformer[Any, Any]) extends AstNode
case class TimerTransform(name: String, mkTransformer: () TimerTransformer[Any, Any]) extends AstNode
case class Fusable(ops: immutable.Seq[Op[_, _, _, _, _]], name: String) extends AstNode
case class MapAsync(f: Any Future[Any]) extends AstNode {
override def name = "mapAsync"
final case class Operate(mkOp: () fusing.Op[_, _, _, _, _]) extends AstNode {
override def name = "operate"
}
case class MapAsyncUnordered(f: Any Future[Any]) extends AstNode {
override def name = "mapAsyncUnordered"
object Fused {
def apply(ops: immutable.Seq[Op[_, _, _, _, _]]): Fused =
Fused(ops, ops.map(x Logging.simpleName(x).toLowerCase).mkString("+")) //FIXME change to something more performant for name
}
final case class Fused(ops: immutable.Seq[Op[_, _, _, _, _]], override val name: String) extends AstNode
final case class Map(f: Any Any) extends AstNode { override def name = "map" }
final case class Filter(p: Any Boolean) extends AstNode { override def name = "filter" }
final case class Collect(pf: PartialFunction[Any, Any]) extends AstNode { override def name = "collect" }
// FIXME Replace with OperateAsync
final case class MapAsync(f: Any Future[Any]) extends AstNode { override def name = "mapAsync" }
//FIXME Should be OperateUnorderedAsync
final case class MapAsyncUnordered(f: Any Future[Any]) extends AstNode { override def name = "mapAsyncUnordered" }
final case class Grouped(n: Int) extends AstNode {
require(n > 0, "n must be greater than 0")
override def name = "grouped"
}
case class GroupBy(f: Any Any) extends AstNode {
override def name = "groupBy"
//FIXME should be `n: Long`
final case class Take(n: Int) extends AstNode {
override def name = "take"
}
case class PrefixAndTail(n: Int) extends AstNode {
override def name = "prefixAndTail"
//FIXME should be `n: Long`
final case class Drop(n: Int) extends AstNode {
override def name = "drop"
}
case class SplitWhen(p: Any Boolean) extends AstNode {
override def name = "splitWhen"
final case class Scan(zero: Any, f: (Any, Any) Any) extends AstNode { override def name = "scan" }
final case class Buffer(size: Int, overflowStrategy: OverflowStrategy) extends AstNode {
require(size > 0, s"Buffer size must be larger than zero but was [$size]")
override def name = "buffer"
}
final case class Conflate(seed: Any Any, aggregate: (Any, Any) Any) extends AstNode {
override def name = "conflate"
}
final case class Expand(seed: Any Any, extrapolate: Any (Any, Any)) extends AstNode {
override def name = "expand"
}
final case class MapConcat(f: Any immutable.Seq[Any]) extends AstNode {
override def name = "mapConcat"
}
case object ConcatAll extends AstNode {
final case class GroupBy(f: Any Any) extends AstNode { override def name = "groupBy" }
final case class PrefixAndTail(n: Int) extends AstNode { override def name = "prefixAndTail" }
final case class SplitWhen(p: Any Boolean) extends AstNode { override def name = "splitWhen" }
final case object ConcatAll extends AstNode {
override def name = "concatFlatten"
}
@ -77,6 +115,7 @@ private[akka] object Ast {
sealed trait FanInAstNode extends JunctionAstNode
sealed trait FanOutAstNode extends JunctionAstNode
// FIXME Why do we need this?
case object IdentityAstNode extends JunctionAstNode {
override def name = "identity"
}
@ -119,54 +158,164 @@ private[akka] object Ast {
}
/**
* INTERNAL API
*/
final object Optimizations {
val none: Optimizations = Optimizations(collapsing = false, elision = false, simplification = false, fusion = false)
val all: Optimizations = Optimizations(collapsing = true, elision = true, simplification = true, fusion = true)
}
/**
* INTERNAL API
*/
final case class Optimizations(collapsing: Boolean, elision: Boolean, simplification: Boolean, fusion: Boolean) {
def isEnabled: Boolean = collapsing || elision || simplification || fusion
}
/**
* INTERNAL API
*/
case class ActorBasedFlowMaterializer(override val settings: MaterializerSettings,
supervisor: ActorRef,
flowNameCounter: AtomicLong,
namePrefix: String)
namePrefix: String,
optimizations: Optimizations)
extends FlowMaterializer(settings) {
import Ast.AstNode
def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name)
private def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet()
private[this] def nextFlowNameCount(): Long = flowNameCounter.incrementAndGet()
private def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
private[this] def createFlowName(): String = s"$namePrefix-${nextFlowNameCount()}"
@tailrec private def processorChain(topSubscriber: Subscriber[_], ops: immutable.Seq[AstNode],
flowName: String, n: Int): Subscriber[_] = {
@tailrec private[this] def processorChain(topProcessor: Processor[_, _],
ops: List[AstNode],
flowName: String,
n: Int): Processor[_, _] =
ops match {
case op :: tail
val opProcessor: Processor[Any, Any] = processorForNode(op, flowName, n)
opProcessor.subscribe(topSubscriber.asInstanceOf[Subscriber[Any]])
val opProcessor = processorForNode[Any, Any](op, flowName, n)
opProcessor.subscribe(topProcessor.asInstanceOf[Subscriber[Any]])
processorChain(opProcessor, tail, flowName, n - 1)
case _ topSubscriber
case Nil
topProcessor
}
//FIXME Optimize the implementation of the optimizer (no joke)
// AstNodes are in reverse order, Fusable Ops are in order
private[this] final def optimize(ops: List[Ast.AstNode]): (List[Ast.AstNode], Int) = {
@tailrec def analyze(rest: List[Ast.AstNode], optimized: List[Ast.AstNode], fuseCandidates: List[fusing.Op[_, _, _, _, _]]): (List[Ast.AstNode], Int) = {
//The `verify` phase
def verify(rest: List[Ast.AstNode], orig: List[Ast.AstNode]): List[Ast.AstNode] =
rest match {
case (f: Ast.Fused) :: _ throw new IllegalStateException("Fused AST nodes not allowed to be present in the input to the optimizer: " + f)
//TODO Ast.Take(-Long.MaxValue..0) == stream doesn't do anything. Perhaps output warning for that?
case noMatch noMatch
}
// The `elide` phase
// TODO / FIXME : This phase could be pulled out to be executed incrementally when building the Ast
def elide(rest: List[Ast.AstNode], orig: List[Ast.AstNode]): List[Ast.AstNode] =
rest match {
case noMatch if !optimizations.elision || (noMatch ne orig) orig
//Collapses consecutive Take's into one
case (t1 @ Ast.Take(t1n)) :: (t2 @ Ast.Take(t2n)) :: rest (if (t1n < t2n) t1 else t2) :: rest
//Collapses consecutive Drop's into one
case (d1 @ Ast.Drop(d1n)) :: (d2 @ Ast.Drop(d2n)) :: rest new Ast.Drop(d1n + d2n) :: rest
case Ast.Drop(n) :: rest if n < 1 rest // a 0 or negative drop is a NoOp
case noMatch noMatch
}
// The `simplify` phase
def simplify(rest: List[Ast.AstNode], orig: List[Ast.AstNode]): List[Ast.AstNode] =
rest match {
case noMatch if !optimizations.simplification || (noMatch ne orig) orig
// Two consecutive maps is equivalent to one pipelined map
case Ast.Map(second) :: Ast.Map(first) :: rest Ast.Map(first compose second) :: rest
case noMatch noMatch
}
// the `Collapse` phase
def collapse(rest: List[Ast.AstNode], orig: List[Ast.AstNode]): List[Ast.AstNode] =
rest match {
case noMatch if !optimizations.collapsing || (noMatch ne orig) orig
// Collapses a filter and a map into a collect
case Ast.Map(f) :: Ast.Filter(p) :: rest Ast.Collect({ case i if p(i) f(i) }) :: rest
case noMatch noMatch
}
// Tries to squeeze AstNode into a single fused pipeline
def ast2op(head: Ast.AstNode, prev: List[fusing.Op[_, _, _, _, _]]): List[fusing.Op[_, _, _, _, _]] =
head match {
// Always-on below
case Ast.Operate(mkOp) mkOp() :: prev
// Optimizations below
case noMatch if !optimizations.fusion prev
case Ast.Take(n) fusing.Take(n) :: prev
case Ast.Drop(n) fusing.Drop(n) :: prev
case Ast.Filter(p) fusing.Filter(p) :: prev
case Ast.Map(f) fusing.Map(f) :: prev
case Ast.Collect(pf) fusing.Collect(pf) :: prev
//FIXME Add more fusion goodies here
case _ prev
}
// First verify, then try to elide, then try to simplify, then try to fuse
collapse(rest, simplify(rest, elide(rest, verify(rest, rest)))) match {
case Nil
if (fuseCandidates.isEmpty) (optimized.reverse, optimized.length) // End of optimization run without fusion going on, wrap up
else ((Ast.Fused(fuseCandidates) :: optimized).reverse, optimized.length + 1) // End of optimization run with fusion going on, so add it to the optimized stack
// If the Ast was changed this pass simply recur
case modified if modified ne rest analyze(modified, optimized, fuseCandidates)
// No changes to the Ast, lets try to see if we can squeeze the current head Ast node into a fusion pipeline
case head :: rest
ast2op(head, fuseCandidates) match {
case Nil analyze(rest, head :: optimized, Nil)
case `fuseCandidates` analyze(rest, head :: Ast.Fused(fuseCandidates) :: optimized, Nil)
case newFuseCandidates analyze(rest, optimized, newFuseCandidates)
}
}
}
val result = analyze(ops, Nil, Nil)
//println(s"before: $ops")
//println(s"after: ${result._1}")
result
}
// Ops come in reverse order
override def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedMap = {
val flowName = createFlowName()
override def materialize[In, Out](source: Source[In], sink: Sink[Out], rawOps: List[Ast.AstNode]): MaterializedMap = {
val flowName = createFlowName() //FIXME: Creates Id even when it is not used in all branches below
def throwUnknownType(typeName: String, s: Any): Nothing =
throw new MaterializationException(s"unknown $typeName type " + s.getClass)
def throwUnknownType(typeName: String, s: AnyRef): Nothing =
throw new MaterializationException(s"unknown $typeName type ${s.getClass}")
def attachSink(pub: Publisher[Out]) = sink match {
def attachSink(pub: Publisher[Out], flowName: String) = sink match {
case s: ActorFlowSink[Out] s.attach(pub, this, flowName)
case s throwUnknownType("Sink", s)
}
def attachSource(sub: Subscriber[In]) = source match {
def attachSource(sub: Subscriber[In], flowName: String) = source match {
case s: ActorFlowSource[In] s.attach(sub, this, flowName)
case s throwUnknownType("Source", s)
}
def createSink() = sink match {
def createSink(flowName: String) = sink match {
case s: ActorFlowSink[In] s.create(this, flowName)
case s throwUnknownType("Sink", s)
}
def createSource() = source match {
def createSource(flowName: String) = source match {
case s: ActorFlowSource[Out] s.create(this, flowName)
case s throwUnknownType("Source", s)
}
@ -178,72 +327,60 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
}
val (sourceValue, sinkValue) =
if (ops.isEmpty) {
if (rawOps.isEmpty) {
if (isActive(sink)) {
val (sub, value) = createSink()
(attachSource(sub), value)
val (sub, value) = createSink(flowName)
(attachSource(sub, flowName), value)
} else if (isActive(source)) {
val (pub, value) = createSource()
(value, attachSink(pub))
val (pub, value) = createSource(flowName)
(value, attachSink(pub, flowName))
} else {
val id: Processor[In, Out] = processorForNode(identityTransform, flowName, 1).asInstanceOf[Processor[In, Out]]
(attachSource(id), attachSink(id))
val id = processorForNode[In, Out](identityTransform, flowName, 1)
(attachSource(id, flowName), attachSink(id, flowName))
}
} else {
val opsSize = ops.size
val last = processorForNode(ops.head, flowName, opsSize).asInstanceOf[Processor[Any, Out]]
val (ops, opsSize) = if (optimizations.isEnabled) optimize(rawOps) else (rawOps, rawOps.length)
val last = processorForNode[Any, Out](ops.head, flowName, opsSize)
val first = processorChain(last, ops.tail, flowName, opsSize - 1).asInstanceOf[Processor[In, Any]]
(attachSource(first), attachSink(last))
(attachSource(first, flowName), attachSink(last, flowName))
}
new MaterializedPipe(source, sourceValue, sink, sinkValue)
}
private val identityTransform = Ast.Transform("identity", ()
new Transformer[Any, Any] {
override def onNext(element: Any) = List(element)
})
//FIXME Should this be a dedicated AstNode?
private[this] val identityTransform = Ast.Transform("identity", () FlowOps.identityTransformer[Any])
/**
* INTERNAL API
*/
private[akka] def processorForNode(op: AstNode, flowName: String, n: Int): Processor[Any, Any] = {
val impl = actorOf(ActorProcessorFactory.props(this, op), s"$flowName-$n-${op.name}")
ActorProcessorFactory(impl)
}
private[akka] def processorForNode[In, Out](op: AstNode, flowName: String, n: Int): Processor[In, Out] =
ActorProcessorFactory[In, Out](actorOf(ActorProcessorFactory.props(this, op), s"$flowName-$n-${op.name}"))
def actorOf(props: Props, name: String): ActorRef = supervisor match {
case ref: LocalActorRef
ref.underlying.attachChild(props, name, systemService = false)
ref.underlying.attachChild(props.withDispatcher(settings.dispatcher), name, systemService = false)
case ref: RepointableActorRef
if (ref.isStarted)
ref.underlying.asInstanceOf[ActorCell].attachChild(props, name, systemService = false)
ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(settings.dispatcher), name, systemService = false)
else {
implicit val timeout = ref.system.settings.CreationTimeout
val f = (supervisor ? StreamSupervisor.Materialize(props, name)).mapTo[ActorRef]
val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(settings.dispatcher), name)).mapTo[ActorRef]
Await.result(f, timeout.duration)
}
case _
throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]")
case unknown
throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]")
}
// FIXME Investigate possibility of using `enableOperationsFusion` in `materializeJunction`
override def materializeJunction[In, Out](op: Ast.JunctionAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]]) = {
val flowName = createFlowName()
val actorName = s"$flowName-${op.name}"
val actorName = s"${createFlowName()}-${op.name}"
op match {
case fanin: Ast.FanInAstNode
val impl = fanin match {
case Ast.Merge
actorOf(FairMerge.props(settings, inputCount).withDispatcher(settings.dispatcher), actorName)
case Ast.MergePreferred
actorOf(UnfairMerge.props(settings, inputCount).withDispatcher(settings.dispatcher), actorName)
case zip: Ast.Zip
actorOf(Zip.props(settings, zip.as).withDispatcher(settings.dispatcher), actorName)
case Ast.Concat
actorOf(Concat.props(settings).withDispatcher(settings.dispatcher), actorName)
case Ast.FlexiMergeNode(merger)
actorOf(FlexiMergeImpl.props(settings, inputCount, merger.createMergeLogic()).
withDispatcher(settings.dispatcher), actorName)
case Ast.Merge actorOf(FairMerge.props(settings, inputCount), actorName)
case Ast.MergePreferred actorOf(UnfairMerge.props(settings, inputCount), actorName)
case zip: Ast.Zip actorOf(Zip.props(settings, zip.as), actorName)
case Ast.Concat actorOf(Concat.props(settings), actorName)
case Ast.FlexiMergeNode(merger) actorOf(FlexiMergeImpl.props(settings, inputCount, merger.createMergeLogic()), actorName)
}
val publisher = new ActorPublisher[Out](impl)
@ -253,15 +390,10 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
case fanout: Ast.FanOutAstNode
val impl = fanout match {
case Ast.Broadcast
actorOf(Broadcast.props(settings, outputCount).withDispatcher(settings.dispatcher), actorName)
case Ast.Balance(waitForAllDownstreams)
actorOf(Balance.props(settings, outputCount, waitForAllDownstreams).withDispatcher(settings.dispatcher), actorName)
case Ast.Unzip
actorOf(Unzip.props(settings).withDispatcher(settings.dispatcher), actorName)
case Ast.FlexiRouteNode(route)
actorOf(FlexiRouteImpl.props(settings, outputCount, route.createRouteLogic()).
withDispatcher(settings.dispatcher), actorName)
case Ast.Broadcast actorOf(Broadcast.props(settings, outputCount), actorName)
case Ast.Balance(waitForAllDownstreams) actorOf(Balance.props(settings, outputCount, waitForAllDownstreams), actorName)
case Ast.Unzip actorOf(Unzip.props(settings), actorName)
case Ast.FlexiRouteNode(route) actorOf(FlexiRouteImpl.props(settings, outputCount, route.createRouteLogic()), actorName)
}
val publishers = Vector.tabulate(outputCount)(id new ActorPublisher[Out](impl) {
@ -271,9 +403,9 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
val subscriber = ActorSubscriber[In](impl)
(List(subscriber), publishers)
case identity @ Ast.IdentityAstNode
val id: Processor[In, Out] = processorForNode(identityTransform, identity.name, 1).asInstanceOf[Processor[In, Out]]
(List(id), List(id))
case identity @ Ast.IdentityAstNode // FIXME Why is IdentityAstNode a JunctionAStNode?
val id = List(processorForNode[In, Out](identityTransform, identity.name, 1)) // FIXME is `identity.name` appropriate/unique here?
(id, id)
}
}
@ -324,17 +456,33 @@ private[akka] object ActorProcessorFactory {
import Ast._
def props(materializer: FlowMaterializer, op: AstNode): Props = {
val settings = materializer.settings
val settings = materializer.settings // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW
(op match {
case Fusable(ops, _) Props(new ActorInterpreter(materializer.settings, ops))
case t: Transform Props(new TransformProcessorImpl(settings, t.mkTransformer()))
case t: TimerTransform Props(new TimerTransformerProcessorsImpl(settings, t.mkTransformer()))
case m: MapAsync Props(new MapAsyncProcessorImpl(settings, m.f))
case m: MapAsyncUnordered Props(new MapAsyncUnorderedProcessorImpl(settings, m.f))
case g: GroupBy Props(new GroupByProcessorImpl(settings, g.f))
case tt: PrefixAndTail Props(new PrefixAndTailImpl(settings, tt.n))
case s: SplitWhen Props(new SplitWhenProcessorImpl(settings, s.p))
case ConcatAll Props(new ConcatAllImpl(materializer))
case Fused(ops, _) Props(new ActorInterpreter(settings, ops))
case Map(f) Props(new ActorInterpreter(settings, List(fusing.Map(f))))
case Filter(p) Props(new ActorInterpreter(settings, List(fusing.Filter(p))))
case Drop(n) Props(new ActorInterpreter(settings, List(fusing.Drop(n))))
case Take(n) Props(new ActorInterpreter(settings, List(fusing.Take(n))))
case Collect(pf) Props(new ActorInterpreter(settings, List(fusing.Collect(pf))))
case Scan(z, f) Props(new ActorInterpreter(settings, List(fusing.Scan(z, f))))
case Expand(s, f) Props(new ActorInterpreter(settings, List(fusing.Expand(s, f))))
case Conflate(s, f) Props(new ActorInterpreter(settings, List(fusing.Conflate(s, f))))
case Buffer(n, s) Props(new ActorInterpreter(settings, List(fusing.Buffer(n, s))))
case MapConcat(f) Props(new ActorInterpreter(settings, List(fusing.MapConcat(f))))
case Operate(mkOp) Props(new ActorInterpreter(settings, List(mkOp())))
case MapAsync(f) Props(new MapAsyncProcessorImpl(settings, f))
case MapAsyncUnordered(f) Props(new MapAsyncUnorderedProcessorImpl(settings, f))
case Grouped(n) Props(new ActorInterpreter(settings, List(fusing.Grouped(n))))
case GroupBy(f) Props(new GroupByProcessorImpl(settings, f))
case PrefixAndTail(n) Props(new PrefixAndTailImpl(settings, n))
case SplitWhen(p) Props(new SplitWhenProcessorImpl(settings, p))
case ConcatAll Props(new ConcatAllImpl(materializer)) //FIXME closes over the materializer, is this good?
case t: Transform
val tr = t.mkTransformer()
Props(new TransformProcessorImpl(settings, tr))
case t: TimerTransform
val tr = t.mkTransformer()
Props(new TimerTransformerProcessorsImpl(settings, tr))
}).withDispatcher(settings.dispatcher)
}

View file

@ -13,13 +13,6 @@ import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings }
import org.reactivestreams.{ Publisher, Subscriber }
/**
* INTERNAL API
*/
private[akka] object SimpleCallbackPublisher {
def props[T](f: () T, settings: MaterializerSettings): Props = IteratorPublisher.props(Iterator.continually(f()), settings)
}
/**
* INTERNAL API
*/
@ -98,7 +91,7 @@ private[akka] class ActorPublisher[T](val impl: ActorRef) extends Publisher[T] {
*/
private[akka] class ActorSubscription[T]( final val impl: ActorRef, final val subscriber: Subscriber[_ >: T]) extends SubscriptionWithCursor[T] {
override def request(elements: Long): Unit =
if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
else impl ! RequestMore(this, elements)
override def cancel(): Unit = impl ! Cancel(this)
}
@ -124,20 +117,23 @@ private[akka] trait SoftShutdown { this: Actor ⇒
/**
* INTERNAL API
*/
private[akka] object IteratorPublisherImpl {
case object Flush
private[akka] object IteratorPublisher {
private[IteratorPublisher] case object Flush
def props[T](iterator: Iterator[T], settings: MaterializerSettings): Props =
Props(new IteratorPublisher(iterator, settings))
}
/**
* INTERNAL API
*/
private[akka] class IteratorPublisherImpl[T](iterator: Iterator[T], settings: MaterializerSettings)
private[akka] class IteratorPublisher[T](iterator: Iterator[T], settings: MaterializerSettings)
extends Actor
with ActorLogging
with SubscriberManagement[T]
with SoftShutdown {
import IteratorPublisherImpl.Flush
import IteratorPublisher.Flush
type S = ActorSubscription[T]
private var demand = 0L

View file

@ -44,6 +44,7 @@ private[akka] object FuturePublisher {
/**
* INTERNAL API
*/
//FIXME why do we need to have an actor to drive a Future?
private[akka] class FuturePublisher(future: Future[Any], settings: MaterializerSettings) extends Actor with SoftShutdown {
import akka.stream.impl.FuturePublisher.FutureSubscription
import akka.stream.impl.FuturePublisher.FutureSubscription.Cancel

View file

@ -1,175 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import scala.annotation.tailrec
import scala.collection.immutable
import scala.util.control.NonFatal
import akka.actor.{ Actor, ActorRef, Props, SupervisorStrategy, Terminated }
import akka.stream.{ MaterializerSettings, ReactiveStreamsConstants }
import org.reactivestreams.{ Subscriber, Subscription }
/**
* INTERNAL API
*/
private[akka] object IterablePublisher {
def props(iterable: immutable.Iterable[Any], settings: MaterializerSettings): Props =
Props(new IterablePublisher(iterable, settings)).withDispatcher(settings.dispatcher)
object BasicActorSubscription {
case object Cancel
case class RequestMore(elements: Long)
}
class BasicActorSubscription(ref: ActorRef)
extends Subscription {
import akka.stream.impl.IterablePublisher.BasicActorSubscription._
def cancel(): Unit = ref ! Cancel
def request(elements: Long): Unit =
if (elements <= 0) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
else ref ! RequestMore(elements)
override def toString = "BasicActorSubscription"
}
}
/**
* INTERNAL API
*
* Elements are produced from the iterator of the iterable. Each subscriber
* makes use of its own iterable, i.e. each subscriber will receive the elements from the
* beginning of the iterable and it can consume the elements in its own pace.
*/
private[akka] class IterablePublisher(iterable: immutable.Iterable[Any], settings: MaterializerSettings) extends Actor with SoftShutdown {
import akka.stream.impl.IterablePublisher.BasicActorSubscription
require(iterable.nonEmpty, "Use EmptyPublisher for empty iterable")
var exposedPublisher: ActorPublisher[Any] = _
var subscribers = Set.empty[Subscriber[Any]]
var workers = Map.empty[ActorRef, Subscriber[Any]]
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
def receive = {
case ExposedPublisher(publisher)
exposedPublisher = publisher
context.become(waitingForFirstSubscriber)
case _ throw new IllegalStateException("The first message must be ExposedPublisher")
}
def waitingForFirstSubscriber: Receive = {
case SubscribePending
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
context.become(active)
}
def active: Receive = {
case SubscribePending
exposedPublisher.takePendingSubscribers() foreach registerSubscriber
case Terminated(worker)
workerFinished(worker)
case IterablePublisherWorker.Finished
context.unwatch(sender)
workerFinished(sender)
}
private def workerFinished(worker: ActorRef): Unit = {
val subscriber = workers(worker)
workers -= worker
subscribers -= subscriber
if (subscribers.isEmpty) {
exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
softShutdown()
}
}
def registerSubscriber(subscriber: Subscriber[Any]): Unit = {
if (subscribers(subscriber))
subscriber.onError(new IllegalStateException(s"${getClass.getSimpleName} [$self, sub: $subscriber] ${ReactiveStreamsConstants.CanNotSubscribeTheSameSubscriberMultipleTimes}"))
else {
val iterator = iterable.iterator
val worker = context.watch(context.actorOf(IterablePublisherWorker.props(iterator, subscriber,
settings.maxInputBufferSize).withDispatcher(context.props.dispatcher)))
val subscription = new BasicActorSubscription(worker)
subscribers += subscriber
workers = workers.updated(worker, subscriber)
subscriber.onSubscribe(subscription)
}
}
override def postStop(): Unit = {
if (exposedPublisher ne null)
exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
}
}
/**
* INTERNAL API
*/
private[akka] object IterablePublisherWorker {
def props(iterator: Iterator[Any], subscriber: Subscriber[Any], maxPush: Int): Props =
Props(new IterablePublisherWorker(iterator, subscriber, maxPush))
private object PushMore
case object Finished
}
/**
* INTERNAL API
*
* Each subscriber is served by this worker actor. It pushes elements to the
* subscriber immediately when it receives demand, but to allow cancel before
* pushing everything it sends a PushMore to itself after a batch of elements.
*/
private[akka] class IterablePublisherWorker(iterator: Iterator[Any], subscriber: Subscriber[Any], maxPush: Int)
extends Actor with SoftShutdown {
import akka.stream.impl.IterablePublisher.BasicActorSubscription._
import akka.stream.impl.IterablePublisherWorker._
require(iterator.hasNext, "Iterator must not be empty")
var pendingDemand: Long = 0L
def receive = {
case RequestMore(elements)
pendingDemand += elements
push()
case PushMore
push()
case Cancel
context.parent ! Finished
softShutdown()
}
private def push(): Unit = {
@tailrec def doPush(n: Int): Unit =
if (pendingDemand > 0) {
pendingDemand -= 1
val hasNext = {
subscriber.onNext(iterator.next())
iterator.hasNext
}
if (!hasNext) {
subscriber.onComplete()
context.parent ! Finished
softShutdown()
} else if (n == 0 && pendingDemand > 0)
self ! PushMore
else
doPush(n - 1)
}
try doPush(maxPush) catch {
case NonFatal(e)
subscriber.onError(e)
context.parent ! Finished
softShutdown()
}
}
}

View file

@ -1,16 +0,0 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.actor.Props
import akka.stream.MaterializerSettings
/**
* INTERNAL API
*/
private[akka] object IteratorPublisher {
def props[T](iterator: Iterator[T], settings: MaterializerSettings): Props =
Props(new IteratorPublisherImpl(iterator, settings)).withDispatcher(settings.dispatcher)
}

View file

@ -3,6 +3,7 @@
*/
package akka.stream.impl
import akka.stream.ReactiveStreamsConstants
import org.reactivestreams.{ Publisher, Subscriber, Subscription }
import scala.annotation.tailrec
@ -13,25 +14,43 @@ import scala.util.control.NonFatal
* INTERNAL API
*/
private[akka] object SynchronousPublisherFromIterable {
def apply[T](iterable: immutable.Iterable[T]): Publisher[T] =
if (iterable.isEmpty) EmptyPublisher[T]
else new SynchronousPublisherFromIterable(iterable)
def apply[T](iterable: immutable.Iterable[T]): Publisher[T] = new SynchronousPublisherFromIterable(iterable)
private class IteratorSubscription[T](subscriber: Subscriber[T], iterator: Iterator[T]) extends Subscription {
object IteratorSubscription {
def apply[T](subscriber: Subscriber[T], iterator: Iterator[T]): Unit =
new IteratorSubscription[T](subscriber, iterator).init()
}
private[this] final class IteratorSubscription[T](subscriber: Subscriber[T], iterator: Iterator[T]) extends Subscription {
var done = false
var pendingDemand = 0L
var pushing = false
def init(): Unit = try {
if (!iterator.hasNext) {
cancel()
subscriber.onSubscribe(this)
subscriber.onComplete()
} else {
subscriber.onSubscribe(this)
}
} catch {
case NonFatal(e)
cancel()
subscriber.onError(e)
}
override def cancel(): Unit =
done = true
override def request(elements: Long): Unit = {
if (elements < 1) throw new IllegalArgumentException(ReactiveStreamsConstants.NumberOfElementsInRequestMustBePositiveMsg)
@tailrec def pushNext(): Unit = {
if (!done)
if (iterator.isEmpty) {
done = true
cancel()
subscriber.onComplete()
} else if (pendingDemand != 0) {
} else if (pendingDemand > 0) {
pendingDemand -= 1
subscriber.onNext(iterator.next())
pushNext()
@ -39,7 +58,7 @@ private[akka] object SynchronousPublisherFromIterable {
}
if (pushing)
pendingDemand += elements // reentrant call to requestMore from onNext
pendingDemand += elements // reentrant call to requestMore from onNext // FIXME This severely lacks overflow checks
else {
try {
pushing = true
@ -47,7 +66,7 @@ private[akka] object SynchronousPublisherFromIterable {
pushNext()
} catch {
case NonFatal(e)
done = true
cancel()
subscriber.onError(e)
} finally { pushing = false }
}
@ -71,10 +90,9 @@ private[akka] object SynchronousPublisherFromIterable {
*/
private[akka] class SynchronousPublisherFromIterable[T](private val iterable: immutable.Iterable[T]) extends Publisher[T] {
import akka.stream.impl.SynchronousPublisherFromIterable.IteratorSubscription
import SynchronousPublisherFromIterable.IteratorSubscription
override def subscribe(subscriber: Subscriber[_ >: T]): Unit =
subscriber.onSubscribe(new IteratorSubscription(subscriber, iterable.iterator))
override def subscribe(subscriber: Subscriber[_ >: T]): Unit = IteratorSubscription(subscriber, iterable.iterator) //FIXME what if .iterator throws?
override def toString: String = s"SynchronousPublisherFromIterable(${iterable.mkString(", ")})"
override def toString: String = getClass.getSimpleName
}

View file

@ -245,7 +245,7 @@ private[akka] class ActorOutputBoundary(val actor: ActorRef) extends BoundaryOp
/**
* INTERNAL API
*/
private[akka] class ActorInterpreter(val settings: MaterializerSettings, val ops: Seq[Op[_, _, _, _, _]])
private[akka] class ActorInterpreter(settings: MaterializerSettings, ops: Seq[Op[_, _, _, _, _]])
extends Actor {
private val upstream = new BatchingActorInputBoundary(settings.initialInputBufferSize)

View file

@ -11,23 +11,39 @@ import scala.collection.immutable
/**
* INTERNAL API
*/
private[akka] case class Map[In, Out](f: In Out) extends TransitivePullOp[In, Out] {
private[akka] final case class Map[In, Out](f: In Out) extends TransitivePullOp[In, Out] {
override def onPush(elem: In, ctxt: Context[Out]): Directive = ctxt.push(f(elem))
}
/**
* INTERNAL API
*/
private[akka] case class Filter[T](p: T Boolean) extends TransitivePullOp[T, T] {
private[akka] final case class Filter[T](p: T Boolean) extends TransitivePullOp[T, T] {
override def onPush(elem: T, ctxt: Context[T]): Directive =
if (p(elem)) ctxt.push(elem)
else ctxt.pull()
}
private[akka] final object Collect {
// Cached function that can be used with PartialFunction.applyOrElse to ensure that A) the guard is only applied once,
// and the caller can check the returned value with Collect.notApplied to query whether the PF was applied or not.
// Prior art: https://github.com/scala/scala/blob/v2.11.4/src/library/scala/collection/immutable/List.scala#L458
final val NotApplied: Any Any = _ Collect.NotApplied
}
private[akka] final case class Collect[In, Out](pf: PartialFunction[In, Out]) extends TransitivePullOp[In, Out] {
import Collect.NotApplied
override def onPush(elem: In, ctxt: Context[Out]): Directive =
pf.applyOrElse(elem, NotApplied) match {
case NotApplied ctxt.pull()
case result: Out ctxt.push(result)
}
}
/**
* INTERNAL API
*/
private[akka] case class MapConcat[In, Out](f: In immutable.Seq[Out]) extends DeterministicOp[In, Out] {
private[akka] final case class MapConcat[In, Out](f: In immutable.Seq[Out]) extends DeterministicOp[In, Out] {
private var currentIterator: Iterator[Out] = Iterator.empty
override def onPush(elem: In, ctxt: Context[Out]): Directive = {
@ -44,20 +60,21 @@ private[akka] case class MapConcat[In, Out](f: In ⇒ immutable.Seq[Out]) extend
/**
* INTERNAL API
*/
private[akka] case class Take[T](count: Int) extends TransitivePullOp[T, T] {
private[akka] final case class Take[T](count: Int) extends TransitivePullOp[T, T] {
private var left: Int = count
override def onPush(elem: T, ctxt: Context[T]): Directive = {
left -= 1
if (left == 0) ctxt.pushAndFinish(elem)
else ctxt.push(elem)
if (left > 0) ctxt.push(elem)
else if (left == 0) ctxt.pushAndFinish(elem)
else ctxt.finish() //Handle negative take counts
}
}
/**
* INTERNAL API
*/
private[akka] case class Drop[T](count: Int) extends TransitivePullOp[T, T] {
private[akka] final case class Drop[T](count: Int) extends TransitivePullOp[T, T] {
private var left: Int = count
override def onPush(elem: T, ctxt: Context[T]): Directive =
if (left > 0) {
@ -69,7 +86,26 @@ private[akka] case class Drop[T](count: Int) extends TransitivePullOp[T, T] {
/**
* INTERNAL API
*/
private[akka] case class Fold[In, Out](zero: Out, f: (Out, In) Out) extends DeterministicOp[In, Out] {
private[akka] final case class Scan[In, Out](zero: Out, f: (Out, In) Out) extends DeterministicOp[In, Out] {
private var aggregator = zero
override def onPush(elem: In, ctxt: Context[Out]): Directive = {
val old = aggregator
aggregator = f(old, elem)
ctxt.push(old)
}
override def onPull(ctxt: Context[Out]): Directive =
if (isFinishing) ctxt.pushAndFinish(aggregator)
else ctxt.pull()
override def onUpstreamFinish(ctxt: Context[Out]): TerminationDirective = ctxt.absorbTermination()
}
/**
* INTERNAL API
*/
private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) Out) extends DeterministicOp[In, Out] {
private var aggregator = zero
override def onPush(elem: In, ctxt: Context[Out]): Directive = {
@ -87,31 +123,42 @@ private[akka] case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out) extends
/**
* INTERNAL API
*/
private[akka] case class Grouped[T](n: Int) extends DeterministicOp[T, immutable.Seq[T]] {
private var buf: Vector[T] = Vector.empty
private[akka] final case class Grouped[T](n: Int) extends DeterministicOp[T, immutable.Seq[T]] {
private val buf = {
val b = Vector.newBuilder[T]
b.sizeHint(n)
b
}
private var left = n
override def onPush(elem: T, ctxt: Context[immutable.Seq[T]]): Directive = {
buf :+= elem
if (buf.size == n) {
val emit = buf
buf = Vector.empty
buf += elem
left -= 1
if (left == 0) {
val emit = buf.result()
buf.clear()
left = n
ctxt.push(emit)
} else ctxt.pull()
}
override def onPull(ctxt: Context[immutable.Seq[T]]): Directive =
if (isFinishing) ctxt.pushAndFinish(buf)
else ctxt.pull()
if (isFinishing) {
val elem = buf.result()
buf.clear() //FIXME null out the reference to the `buf`?
left = n
ctxt.pushAndFinish(elem)
} else ctxt.pull()
override def onUpstreamFinish(ctxt: Context[immutable.Seq[T]]): TerminationDirective =
if (buf.isEmpty) ctxt.finish()
if (left == n) ctxt.finish()
else ctxt.absorbTermination()
}
/**
* INTERNAL API
*/
private[akka] case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedOp[T, T] {
private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedOp[T, T] {
import OverflowStrategy._
private val buffer = FixedSizeBuffer(size)
@ -170,7 +217,7 @@ private[akka] case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy
/**
* INTERNAL API
*/
private[akka] case class Completed[T]() extends DeterministicOp[T, T] {
private[akka] final case class Completed[T]() extends DeterministicOp[T, T] {
override def onPush(elem: T, ctxt: Context[T]): Directive = ctxt.finish()
override def onPull(ctxt: Context[T]): Directive = ctxt.finish()
}
@ -178,14 +225,15 @@ private[akka] case class Completed[T]() extends DeterministicOp[T, T] {
/**
* INTERNAL API
*/
private[akka] case class Conflate[In, Out](seed: In Out, aggregate: (Out, In) Out) extends DetachedOp[In, Out] {
private[akka] final case class Conflate[In, Out](seed: In Out, aggregate: (Out, In) Out) extends DetachedOp[In, Out] {
private var agg: Any = null
override def onPush(elem: In, ctxt: DetachedContext[Out]): UpstreamDirective = {
if (agg == null) agg = seed(elem)
else agg = aggregate(agg.asInstanceOf[Out], elem)
agg = if (agg == null) seed(elem)
else aggregate(agg.asInstanceOf[Out], elem)
if (!isHolding) ctxt.pull() else {
if (!isHolding) ctxt.pull()
else {
val result = agg.asInstanceOf[Out]
agg = null
ctxt.pushAndPull(result)
@ -214,7 +262,7 @@ private[akka] case class Conflate[In, Out](seed: In ⇒ Out, aggregate: (Out, In
/**
* INTERNAL API
*/
private[akka] case class Expand[In, Out, Seed](seed: In Seed, extrapolate: Seed (Out, Seed)) extends DetachedOp[In, Out] {
private[akka] final case class Expand[In, Out, Seed](seed: In Seed, extrapolate: Seed (Out, Seed)) extends DetachedOp[In, Out] {
private var s: Any = null
override def onPush(elem: In, ctxt: DetachedContext[Out]): UpstreamDirective = {
@ -231,9 +279,8 @@ private[akka] case class Expand[In, Out, Seed](seed: In ⇒ Seed, extrapolate: S
else {
val (emit, newS) = extrapolate(s.asInstanceOf[Seed])
s = newS
if (isHolding) {
ctxt.pushAndPull(emit)
} else ctxt.push(emit)
if (isHolding) ctxt.pushAndPull(emit)
else ctxt.push(emit)
}
}

View file

@ -178,6 +178,15 @@ class Flow[-In, +Out](delegate: scaladsl.Flow[In, Out]) {
def grouped(n: Int): javadsl.Flow[In, java.util.List[Out @uncheckedVariance]] =
new Flow(delegate.grouped(n).map(_.asJava)) // FIXME optimize to one step
/**
* Similar to `fold` but is not a terminal operation,
* emits its current value which starts at `zero` and then
* applies the current and next value to the given function `f`,
* emitting the next current value.
*/
def scan[T](zero: T)(f: japi.Function2[T, Out, T]): javadsl.Flow[In, T] =
new Flow(delegate.scan(zero)(f.apply))
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the given number of elements, whatever happens first.

View file

@ -65,8 +65,8 @@ object Source {
* in accordance with the demand coming from the downstream transformation
* steps.
*/
def from[O](iterator: java.util.Iterator[O]): javadsl.Source[O] =
new Source(scaladsl.Source(iterator.asScala))
def from[O](f: japi.Creator[java.util.Iterator[O]]): javadsl.Source[O] =
new Source(scaladsl.Source(() f.create().asScala))
/**
* Helper to create [[Source]] from `Iterable`.
@ -87,14 +87,6 @@ object Source {
def from[O](iterable: java.lang.Iterable[O]): javadsl.Source[O] =
new Source(scaladsl.Source(akka.stream.javadsl.japi.Util.immutableIterable(iterable)))
/**
* Define the sequence of elements to be produced by the given closure.
* The stream ends normally when evaluation of the closure returns a `None`.
* The stream ends exceptionally when an exception is thrown from the closure.
*/
def from[O](f: japi.Creator[akka.japi.Option[O]]): javadsl.Source[O] =
new Source(scaladsl.Source(() f.create().asScala))
/**
* Start a new `Source` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
@ -293,6 +285,15 @@ class Source[+Out](delegate: scaladsl.Source[Out]) {
def grouped(n: Int): javadsl.Source[java.util.List[Out @uncheckedVariance]] =
new Source(delegate.grouped(n).map(_.asJava))
/**
* Similar to `fold` but is not a terminal operation,
* emits its current value which starts at `zero` and then
* applies the current and next value to the given function `f`,
* yielding the next current value.
*/
def scan[T](zero: T)(f: japi.Function2[T, Out, T]): javadsl.Source[T] =
new Source(delegate.scan(zero)(f.apply))
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the given number of elements, whatever happens first.

View file

@ -14,6 +14,7 @@ import scala.annotation.tailrec
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import scala.util.{ Success, Failure }
sealed trait ActorFlowSource[+Out] extends Source[Out] {
@ -51,14 +52,12 @@ sealed trait ActorFlowSource[+Out] extends Source[Out] {
override type Repr[+O] = SourcePipe[O]
private def sourcePipe = Pipe.empty[Out].withSource(this)
override def via[T](flow: Flow[Out, T]): Source[T] = Pipe.empty[Out].withSource(this).via(flow)
override def via[T](flow: Flow[Out, T]): Source[T] = sourcePipe.via(flow)
override def to(sink: Sink[Out]): RunnableFlow = sourcePipe.to(sink)
override def to(sink: Sink[Out]): RunnableFlow = Pipe.empty[Out].withSource(this).to(sink)
/** INTERNAL API */
override private[scaladsl] def andThen[U](op: AstNode) = SourcePipe(this, List(op))
override private[scaladsl] def andThen[U](op: AstNode) = SourcePipe(this, List(op)) //FIXME raw addition of AstNodes
}
/**
@ -100,22 +99,6 @@ final case class PublisherSource[Out](p: Publisher[Out]) extends SimpleActorFlow
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) = (p, ())
}
/**
* Start a new `Source` from the given Iterator. The produced stream of elements
* will continue until the iterator runs empty or fails during evaluation of
* the `next()` method. Elements are pulled out of the iterator
* in accordance with the demand coming from the downstream transformation
* steps.
*/
final case class IteratorSource[Out](iterator: Iterator[Out]) extends SimpleActorFlowSource[Out] {
override def attach(flowSubscriber: Subscriber[Out], materializer: ActorBasedFlowMaterializer, flowName: String) =
create(materializer, flowName)._1.subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) =
(ActorPublisher[Out](materializer.actorOf(IteratorPublisher.props(iterator, materializer.settings),
name = s"$flowName-0-iterator")), ())
}
/**
* Starts a new `Source` from the given `Iterable`. This is like starting from an
* Iterator, but every Subscriber directly attached to the Publisher of this
@ -127,36 +110,14 @@ final case class IterableSource[Out](iterable: immutable.Iterable[Out]) extends
create(materializer, flowName)._1.subscribe(flowSubscriber)
override def isActive: Boolean = true
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) =
if (iterable.isEmpty) (EmptyPublisher[Out], ())
else (ActorPublisher[Out](materializer.actorOf(IterablePublisher.props(iterable, materializer.settings),
name = s"$flowName-0-iterable")), ())
(SynchronousPublisherFromIterable(iterable), ()) //FIXME This should probably be an AsynchronousPublisherFromIterable
}
final class ThunkIterator[Out](thunk: () Option[Out]) extends Iterator[Out] {
require(thunk ne null, "thunk is not allowed to be null")
private[this] var value: Option[Out] = null
private[this] def advance(): Unit =
value = thunk() match {
case null throw new NullPointerException("Thunk is not allowed to return null")
case option option
}
@tailrec override final def hasNext: Boolean = value match {
case null
advance(); hasNext
case option option.isDefined
//FIXME SerialVersionUID?
final class FuncIterable[Out](f: () Iterator[Out]) extends immutable.Iterable[Out] {
override def iterator: Iterator[Out] = try f() catch {
case NonFatal(e) Iterator.continually(throw e) //FIXME not rock-solid, is the least one can say
}
@tailrec override final def next(): Out = value match {
case null
advance(); next()
case Some(next)
advance(); next
case None Iterator.empty.next()
}
override def toString: String = "ThunkIterator"
}
/**
@ -172,13 +133,12 @@ final case class FutureSource[Out](future: Future[Out]) extends SimpleActorFlowS
override def create(materializer: ActorBasedFlowMaterializer, flowName: String) =
future.value match {
case Some(Success(element))
(ActorPublisher[Out](materializer.actorOf(IterablePublisher.props(List(element), materializer.settings),
name = s"$flowName-0-future")), ())
(SynchronousPublisherFromIterable(List(element)), ()) // Option is not Iterable. sigh
case Some(Failure(t))
(ErrorPublisher(t).asInstanceOf[Publisher[Out]], ())
case None
(ActorPublisher[Out](materializer.actorOf(FuturePublisher.props(future, materializer.settings),
name = s"$flowName-0-future")), ())
name = s"$flowName-0-future")), ()) // FIXME optimize
}
}

View file

@ -4,7 +4,6 @@
package akka.stream.scaladsl
import akka.stream.impl.Ast._
import akka.stream.impl.fusing
import akka.stream.{ TimerTransformer, Transformer, OverflowStrategy }
import akka.util.Collections.EmptyImmutableSeq
import scala.collection.immutable
@ -102,19 +101,18 @@ trait RunnableFlow {
trait FlowOps[+Out] {
import FlowOps._
type Repr[+O]
import akka.stream.impl.fusing
/**
* Transform this stream by applying the given function to each of the elements
* as they pass through this processing step.
*/
def map[T](f: Out T): Repr[T] = andThen(Fusable(Vector(fusing.Map(f)), "map"))
def map[T](f: Out T): Repr[T] = andThen(Map(f.asInstanceOf[Any Any]))
/**
* Transform each input element into a sequence of output elements that is
* then flattened into the output stream.
*/
def mapConcat[T](f: Out immutable.Seq[T]): Repr[T] = andThen(Fusable(Vector(fusing.MapConcat(f)), "mapConcat"))
def mapConcat[T](f: Out immutable.Seq[T]): Repr[T] = andThen(MapConcat(f.asInstanceOf[Any immutable.Seq[Any]]))
/**
* Transform this stream by applying the given function to each of the elements
@ -144,16 +142,14 @@ trait FlowOps[+Out] {
/**
* Only pass on those elements that satisfy the given predicate.
*/
def filter(p: Out Boolean): Repr[Out] = andThen(Fusable(Vector(fusing.Filter(p)), "filter"))
def filter(p: Out Boolean): Repr[Out] = andThen(Filter(p.asInstanceOf[Any Boolean]))
/**
* Transform this stream by applying the given partial function to each of the elements
* on which the function is defined as they pass through this processing step.
* Non-matching elements are filtered out.
*/
def collect[T](pf: PartialFunction[Out, T]): Repr[T] = andThen(Fusable(Vector(
fusing.Filter(pf.isDefinedAt),
fusing.Map(pf.apply)), "filter"))
def collect[T](pf: PartialFunction[Out, T]): Repr[T] = andThen(Collect(pf.asInstanceOf[PartialFunction[Any, Any]]))
/**
* Chunk up this stream into groups of the given size, with the last group
@ -161,10 +157,15 @@ trait FlowOps[+Out] {
*
* `n` must be positive, otherwise IllegalArgumentException is thrown.
*/
def grouped(n: Int): Repr[immutable.Seq[Out]] = {
require(n > 0, "n must be greater than 0")
andThen(Fusable(Vector(fusing.Grouped(n)), "grouped"))
}
def grouped(n: Int): Repr[immutable.Seq[Out]] = andThen(Grouped(n))
/**
* Similar to `fold` but is not a terminal operation,
* emits its current value which starts at `zero` and then
* applies the current and next value to the given function `f`,
* emitting the next current value.
*/
def scan[T](zero: T)(f: (T, Out) T): Repr[T] = andThen(Scan(zero, f.asInstanceOf[(Any, Any) Any]))
/**
* Chunk up this stream into groups of elements received within a time window,
@ -207,9 +208,7 @@ trait FlowOps[+Out] {
* Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative.
*/
def drop(n: Int): Repr[Out] =
if (n <= 0) andThen(Fusable(Vector.empty, "drop"))
else andThen(Fusable(Vector(fusing.Drop(n)), "drop"))
def drop(n: Int): Repr[Out] = andThen(Drop(n))
/**
* Discard the elements received within the given duration at beginning of the stream.
@ -225,7 +224,7 @@ trait FlowOps[+Out] {
def onNext(in: Out) = delegate.onNext(in)
def onTimer(timerKey: Any) = {
delegate = identityTransformer.asInstanceOf[Transformer[Out, Out]]
delegate = FlowOps.identityTransformer[Out]
Nil
}
})
@ -239,9 +238,7 @@ trait FlowOps[+Out] {
* The stream will be completed without producing any elements if `n` is zero
* or negative.
*/
def take(n: Int): Repr[Out] =
if (n <= 0) andThen(Fusable(Vector(fusing.Completed()), "take"))
else andThen(Fusable(Vector(fusing.Take(n)), "take"))
def take(n: Int): Repr[Out] = andThen(Take(n))
/**
* Terminate processing (and cancel the upstream publisher) after the given
@ -256,12 +253,12 @@ trait FlowOps[+Out] {
timerTransform("takeWithin", () new TimerTransformer[Out, Out] {
scheduleOnce(TakeWithinTimerKey, d)
var delegate: Transformer[Out, Out] = identityTransformer.asInstanceOf[Transformer[Out, Out]]
var delegate: Transformer[Out, Out] = FlowOps.identityTransformer[Out]
def onNext(in: Out) = delegate.onNext(in)
override def onNext(in: Out) = delegate.onNext(in)
override def isComplete = delegate.isComplete
def onTimer(timerKey: Any) = {
delegate = takeCompletedTransformer.asInstanceOf[Transformer[Out, Out]]
override def onTimer(timerKey: Any) = {
delegate = FlowOps.completedTransformer[Out]
Nil
}
})
@ -278,7 +275,7 @@ trait FlowOps[+Out] {
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*/
def conflate[S](seed: Out S)(aggregate: (S, Out) S): Repr[S] =
andThen(Fusable(Vector(fusing.Conflate(seed, aggregate)), "conflate"))
andThen(Conflate(seed.asInstanceOf[Any Any], aggregate.asInstanceOf[(Any, Any) Any]))
/**
* Allows a faster downstream to progress independently of a slower publisher by extrapolating elements from an older
@ -294,7 +291,7 @@ trait FlowOps[+Out] {
* state.
*/
def expand[S, U](seed: Out S)(extrapolate: S (U, S)): Repr[U] =
andThen(Fusable(Vector(fusing.Expand(seed, extrapolate)), "expand"))
andThen(Expand(seed.asInstanceOf[Any Any], extrapolate.asInstanceOf[Any (Any, Any)]))
/**
* Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.
@ -304,10 +301,8 @@ trait FlowOps[+Out] {
* @param size The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] = {
require(size > 0, s"Buffer size must be larger than zero but was [$size]")
andThen(Fusable(Vector(fusing.Buffer(size, overflowStrategy)), "buffer"))
}
def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[Out] =
andThen(Buffer(size, overflowStrategy))
/**
* Generic transformation of a stream: for each element the [[akka.stream.Transformer#onNext]]
@ -418,18 +413,21 @@ trait FlowOps[+Out] {
/**
* INTERNAL API
*/
private[scaladsl] object FlowOps {
private[stream] object FlowOps {
private case object TakeWithinTimerKey
private case object DropWithinTimerKey
private case object GroupedWithinTimerKey
private val takeCompletedTransformer: Transformer[Any, Any] = new Transformer[Any, Any] {
private[this] final case object CompletedTransformer extends Transformer[Any, Any] {
override def onNext(elem: Any) = Nil
override def isComplete = true
}
private val identityTransformer: Transformer[Any, Any] = new Transformer[Any, Any] {
private[this] final case object IdentityTransformer extends Transformer[Any, Any] {
override def onNext(elem: Any) = List(elem)
}
def completedTransformer[T]: Transformer[T, T] = CompletedTransformer.asInstanceOf[Transformer[T, T]]
def identityTransformer[T]: Transformer[T, T] = IdentityTransformer.asInstanceOf[Transformer[T, T]]
}

View file

@ -19,14 +19,14 @@ private[stream] object Pipe {
private[stream] final case class Pipe[-In, +Out](ops: List[AstNode]) extends Flow[In, Out] {
override type Repr[+O] = Pipe[In @uncheckedVariance, O]
override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = this.copy(ops = op :: ops)
override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = this.copy(ops = op :: ops) // FIXME raw addition of AstNodes
private[stream] def withSink(out: Sink[Out]): SinkPipe[In] = SinkPipe(out, ops)
private[stream] def withSource(in: Source[In]): SourcePipe[Out] = SourcePipe(in, ops)
override def via[T](flow: Flow[Out, T]): Flow[In, T] = flow match {
case p: Pipe[T, In] Pipe(p.ops ++: ops)
case p: Pipe[Out, T] this.appendPipe(p)
case gf: GraphFlow[Out, _, _, T] gf.prepend(this)
case x FlowGraphInternal.throwUnsupportedValue(x)
}
@ -37,7 +37,7 @@ private[stream] final case class Pipe[-In, +Out](ops: List[AstNode]) extends Flo
case d: Sink[Out] this.withSink(d)
}
private[stream] def appendPipe[T](pipe: Pipe[Out, T]): Pipe[In, T] = Pipe(pipe.ops ++: ops)
private[stream] def appendPipe[T](pipe: Pipe[Out, T]): Pipe[In, T] = Pipe(pipe.ops ++: ops) // FIXME raw addition of AstNodes
}
/**
@ -47,7 +47,7 @@ private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNod
private[stream] def withSource(in: Source[In]): RunnablePipe = RunnablePipe(in, output, ops)
private[stream] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops)
private[stream] def prependPipe[T](pipe: Pipe[T, In]): SinkPipe[T] = SinkPipe(output, ops ::: pipe.ops) // FIXME raw addition of AstNodes
}
@ -57,11 +57,11 @@ private[stream] final case class SinkPipe[-In](output: Sink[_], ops: List[AstNod
private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[AstNode]) extends Source[Out] {
override type Repr[+O] = SourcePipe[O]
override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, op :: ops)
override private[scaladsl] def andThen[U](op: AstNode): Repr[U] = SourcePipe(input, op :: ops) // FIXME raw addition of AstNodes
private[stream] def withSink(out: Sink[Out]): RunnablePipe = RunnablePipe(input, out, ops)
private[stream] def appendPipe[T](pipe: Pipe[Out, T]): SourcePipe[T] = SourcePipe(input, pipe.ops ++: ops)
private[stream] def appendPipe[T](pipe: Pipe[Out, T]): SourcePipe[T] = SourcePipe(input, pipe.ops ++: ops) // FIXME raw addition of AstNodes
override def via[T](flow: Flow[Out, T]): Source[T] = flow match {
case p: Pipe[Out, T] appendPipe(p)
@ -70,7 +70,7 @@ private[stream] final case class SourcePipe[+Out](input: Source[_], ops: List[As
}
override def to(sink: Sink[Out]): RunnableFlow = sink match {
case sp: SinkPipe[Out] RunnablePipe(input, sp.output, sp.ops ++: ops)
case sp: SinkPipe[Out] RunnablePipe(input, sp.output, sp.ops ++: ops) // FIXME raw addition of AstNodes
case g: GraphSink[Out, _] g.prepend(this)
case d: Sink[Out] this.withSink(d)
}

View file

@ -3,13 +3,14 @@
*/
package akka.stream.scaladsl
import scala.language.higherKinds
import akka.actor.Props
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, SynchronousPublisherFromIterable }
import org.reactivestreams.Publisher
import scala.collection.immutable
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.Future
import scala.language.higherKinds
import akka.stream.FlowMaterializer
/**
@ -35,8 +36,7 @@ trait Source[+Out] extends FlowOps[Out] {
* Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
* of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]].
*/
def runWith(sink: Sink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType =
to(sink).run().get(sink)
def runWith(sink: Sink[Out])(implicit materializer: FlowMaterializer): sink.MaterializedType = to(sink).run().get(sink)
/**
* Shortcut for running this `Source` with a fold function.
@ -46,8 +46,7 @@ trait Source[+Out] extends FlowOps[Out] {
* function evaluation when the input stream ends, or completed with `Failure`
* if there is an error is signaled in the stream.
*/
def fold[U](zero: U)(f: (U, Out) U)(implicit materializer: FlowMaterializer): Future[U] =
runWith(FoldSink(zero)(f))
def fold[U](zero: U)(f: (U, Out) U)(implicit materializer: FlowMaterializer): Future[U] = runWith(FoldSink(zero)(f)) // FIXME why is fold always an end step?
/**
* Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked
@ -56,8 +55,7 @@ trait Source[+Out] extends FlowOps[Out] {
* normal end of the stream, or completed with `Failure` if there is an error is signaled in
* the stream.
*/
def foreach(f: Out Unit)(implicit materializer: FlowMaterializer): Future[Unit] =
runWith(ForeachSink(f))
def foreach(f: Out Unit)(implicit materializer: FlowMaterializer): Future[Unit] = runWith(ForeachSink(f))
/**
* Concatenates a second source so that the first element
@ -90,15 +88,15 @@ object Source {
/**
* Helper to create [[Source]] from `Iterator`.
* Example usage: `Source(Seq(1,2,3).iterator)`
* Example usage: `Source(() => Iterator.from(0))`
*
* Start a new `Source` from the given Iterator. The produced stream of elements
* will continue until the iterator runs empty or fails during evaluation of
* the `next()` method. Elements are pulled out of the iterator
* in accordance with the demand coming from the downstream transformation
* steps.
* Start a new `Source` from the given function that produces anIterator.
* The produced stream of elements will continue until the iterator runs empty
* or fails during evaluation of the `next()` method.
* Elements are pulled out of the iterator in accordance with the demand coming
* from the downstream transformation steps.
*/
def apply[T](iterator: Iterator[T]): Source[T] = IteratorSource(iterator)
def apply[T](f: () Iterator[T]): Source[T] = apply(new FuncIterable(f))
/**
* Helper to create [[Source]] from `Iterable`.
@ -111,13 +109,6 @@ object Source {
*/
def apply[T](iterable: immutable.Iterable[T]): Source[T] = IterableSource(iterable)
/**
* Define the sequence of elements to be produced by the given closure.
* The stream ends normally when evaluation of the closure returns a `None`.
* The stream ends exceptionally when an exception is thrown from the closure.
*/
def apply[T](f: () Option[T]): Source[T] = IteratorSource(new ThunkIterator(f))
/**
* Start a new `Source` from the given `Future`. The stream will consist of
* one element when the `Future` is completed with a successful value, which
@ -133,7 +124,7 @@ object Source {
* element is produced it will not receive that tick element later. It will
* receive new tick elements as soon as it has requested more elements.
*/
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): Source[T] =
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): Source[T] = // FIXME why is tick () => T and not T?
TickSource(initialDelay, interval, tick)
/**
@ -166,7 +157,7 @@ object Source {
* Create a `Source` with one element.
* Every connected `Sink` of this stream will see an individual stream consisting of one element.
*/
def singleton[T](element: T): Source[T] = apply(SynchronousPublisherFromIterable(List(element)))
def singleton[T](element: T): Source[T] = apply(SynchronousPublisherFromIterable(List(element))) // FIXME optimize
/**
* A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.