diff --git a/akka-stream/src/main/scala/akka/stream/ProcessorGenerator.scala b/akka-stream/src/main/scala/akka/stream/ProcessorGenerator.scala new file mode 100644 index 0000000000..6a0178fe07 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/ProcessorGenerator.scala @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration.FiniteDuration +import akka.actor.ActorRefFactory +import akka.stream.impl.ActorBasedProcessorGenerator +import akka.stream.impl.Ast +import org.reactivestreams.api.Producer +import scala.concurrent.duration._ + +// FIXME is Processor the right naming here? +object ProcessorGenerator { + def apply(settings: GeneratorSettings)(implicit context: ActorRefFactory): ProcessorGenerator = + new ActorBasedProcessorGenerator(settings, context) +} + +trait ProcessorGenerator { + /** + * INTERNAL API + * ops are stored in reverse order + */ + private[akka] def toProducer[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Producer[O] + /** + * INTERNAL API + */ + private[akka] def consume[I](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Unit + /** + * INTERNAL API + */ + private[akka] def produce[T](f: () ⇒ T): Producer[T] +} + +// FIXME default values? Should we have an extension that reads from config? +case class GeneratorSettings( + initialFanOutBufferSize: Int = 4, + maxFanOutBufferSize: Int = 16, + initialInputBufferSize: Int = 4, + maximumInputBufferSize: Int = 16, + upstreamSubscriptionTimeout: FiniteDuration = 3.seconds, + downstreamSubscriptionTimeout: FiniteDuration = 3.seconds) { + + private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 + require(initialFanOutBufferSize > 0, "initialFanOutBufferSize must be > 0") + require(maxFanOutBufferSize > 0, "maxFanOutBufferSize must be > 0") + require(initialFanOutBufferSize <= maxFanOutBufferSize, + s"initialFanOutBufferSize($initialFanOutBufferSize) must be <= maxFanOutBufferSize($maxFanOutBufferSize)") + + require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") + require(isPowerOfTwo(initialInputBufferSize), "initialInputBufferSize must be a power of two") + require(maximumInputBufferSize > 0, "maximumInputBufferSize must be > 0") + require(isPowerOfTwo(maximumInputBufferSize), "initialInputBufferSize must be a power of two") + require(initialInputBufferSize <= maximumInputBufferSize, + s"initialInputBufferSize($initialInputBufferSize) must be <= maximumInputBufferSize($maximumInputBufferSize)") +} + diff --git a/akka-stream/src/main/scala/akka/stream/Stream.scala b/akka-stream/src/main/scala/akka/stream/Stream.scala deleted file mode 100644 index f48568b933..0000000000 --- a/akka-stream/src/main/scala/akka/stream/Stream.scala +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream - -import scala.collection.immutable -import org.reactivestreams.api.{ Consumer, Producer } -import scala.util.control.NonFatal -import akka.stream.impl._ -import akka.actor.ActorRefFactory -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.util.Try -import scala.concurrent.duration._ -import scala.util.control.NoStackTrace -import akka.stream.impl.Ast.IteratorProducerNode -import akka.stream.impl.Ast.IterableProducerNode -import akka.stream.impl.Ast.ExistingProducer -import scala.annotation.unchecked.uncheckedVariance - -object Stream { - def apply[T](producer: Producer[T]): Stream[T] = StreamImpl(ExistingProducer(producer), Nil) - def apply[T](iterator: Iterator[T]): Stream[T] = StreamImpl(IteratorProducerNode(iterator), Nil) - def apply[T](iterable: immutable.Iterable[T]): Stream[T] = StreamImpl(IterableProducerNode(iterable), Nil) - - def apply[T](gen: ProcessorGenerator, f: () ⇒ T): Stream[T] = apply(gen.produce(f)) - - object Stop extends RuntimeException with NoStackTrace -} - -trait Stream[+T] { - def map[U](f: T ⇒ U): Stream[U] - def filter(p: T ⇒ Boolean): Stream[T] - def foreach(c: T ⇒ Unit): Stream[Unit] - def fold[U](zero: U)(f: (U, T) ⇒ U): Stream[U] - def drop(n: Int): Stream[T] - def take(n: Int): Stream[T] - def grouped(n: Int): Stream[immutable.Seq[T]] - def mapConcat[U](f: T ⇒ immutable.Seq[U]): Stream[U] - def transform[S, U](zero: S)( - f: (S, T) ⇒ (S, immutable.Seq[U]), - onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, - isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U] - def transformRecover[S, U](zero: S)( - f: (S, Try[T]) ⇒ (S, immutable.Seq[U]), - onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, - isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U] - - def groupBy[K](f: T ⇒ K): Stream[(K, Producer[T @uncheckedVariance])] - def splitWhen(p: T ⇒ Boolean): Stream[Producer[T @uncheckedVariance]] - - def merge[U >: T](other: Producer[U]): Stream[U] - def zip[U](other: Producer[U]): Stream[(T, U)] - def concat[U >: T](next: Producer[U]): Stream[U] - - def toFuture(generator: ProcessorGenerator): Future[T] - def consume(generator: ProcessorGenerator): Unit - def toProducer(generator: ProcessorGenerator): Producer[T @uncheckedVariance] -} - -// FIXME is Processor the right naming here? -object ProcessorGenerator { - def apply(settings: GeneratorSettings)(implicit context: ActorRefFactory): ProcessorGenerator = - new ActorBasedProcessorGenerator(settings, context) -} - -trait ProcessorGenerator { - /** - * INTERNAL API - * ops are stored in reverse order - */ - private[akka] def toProducer[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Producer[O] - /** - * INTERNAL API - */ - private[akka] def consume[I](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Unit - /** - * INTERNAL API - */ - private[akka] def produce[T](f: () ⇒ T): Producer[T] -} - -// FIXME default values? Should we have an extension that reads from config? -case class GeneratorSettings( - initialFanOutBufferSize: Int = 4, - maxFanOutBufferSize: Int = 16, - initialInputBufferSize: Int = 4, - maximumInputBufferSize: Int = 16, - upstreamSubscriptionTimeout: FiniteDuration = 3.seconds, - downstreamSubscriptionTimeout: FiniteDuration = 3.seconds) { - - private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 - require(initialFanOutBufferSize > 0, "initialFanOutBufferSize must be > 0") - require(maxFanOutBufferSize > 0, "maxFanOutBufferSize must be > 0") - require(initialFanOutBufferSize <= maxFanOutBufferSize, - s"initialFanOutBufferSize($initialFanOutBufferSize) must be <= maxFanOutBufferSize($maxFanOutBufferSize)") - - require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") - require(isPowerOfTwo(initialInputBufferSize), "initialInputBufferSize must be a power of two") - require(maximumInputBufferSize > 0, "maximumInputBufferSize must be > 0") - require(isPowerOfTwo(maximumInputBufferSize), "initialInputBufferSize must be a power of two") - require(initialInputBufferSize <= maximumInputBufferSize, - s"initialInputBufferSize($initialInputBufferSize) must be <= maximumInputBufferSize($maximumInputBufferSize)") -} - diff --git a/akka-stream/src/main/scala/akka/stream/Support.scala b/akka-stream/src/main/scala/akka/stream/Support.scala new file mode 100644 index 0000000000..0014a25e96 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/Support.scala @@ -0,0 +1,8 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.util.control.NoStackTrace + +case object Stop extends RuntimeException("Stop this flow") with NoStackTrace \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala index 82cbc6c218..8df5f79fa8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala @@ -16,7 +16,7 @@ import scala.concurrent.duration.Duration import scala.util.control.NonFatal import akka.actor.Props import scala.util.control.NoStackTrace -import akka.stream.Stream +import akka.stream.Stop /** * INTERNAL API @@ -123,7 +123,6 @@ private[akka] object ActorProducerImpl { * INTERNAL API */ private[akka] class ActorProducerImpl[T](f: () ⇒ T, settings: GeneratorSettings) extends Actor with ActorLogging with SubscriberManagement[T] { - import Stream._ import ActorProducerImpl._ type S = ActorSubscription[T] diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala new file mode 100644 index 0000000000..1591c5f618 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import scala.collection.immutable +import scala.concurrent.{ Future, Promise } +import scala.util.Try + +import org.reactivestreams.api.Producer + +import Ast.{ AstNode, Recover, Transform } +import akka.stream.ProcessorGenerator +import akka.stream.scala_api.Flow + +/** + * INTERNAL API + */ +private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]) extends Flow[O] { + import Ast._ + // Storing ops in reverse order + private def andThen[U](op: AstNode): Flow[U] = this.copy(ops = op :: ops) + + def map[U](f: O ⇒ U): Flow[U] = transform(())((_, in) ⇒ ((), List(f(in)))) + + def filter(p: O ⇒ Boolean): Flow[O] = transform(())((_, in) ⇒ if (p(in)) ((), List(in)) else ((), Nil)) + + def foreach(c: O ⇒ Unit): Flow[Unit] = transform(())((_, in) ⇒ c(in) -> Nil, _ ⇒ List(())) + + def fold[U](zero: U)(f: (U, O) ⇒ U): Flow[U] = transform(zero)((z, in) ⇒ f(z, in) -> Nil, z ⇒ List(z)) + + def drop(n: Int): Flow[O] = transform(n)((x, in) ⇒ if (x == 0) 0 -> List(in) else (x - 1) -> Nil) + + def take(n: Int): Flow[O] = transform(n)((x, in) ⇒ if (x == 0) 0 -> Nil else (x - 1) -> List(in), isComplete = _ == 0) + + def grouped(n: Int): Flow[immutable.Seq[O]] = + transform(immutable.Seq.empty[O])((buf, in) ⇒ { + val group = buf :+ in + if (group.size == n) (Nil, List(group)) + else (group, Nil) + }, x ⇒ if (x.isEmpty) Nil else List(x)) + + def mapConcat[U](f: O ⇒ immutable.Seq[U]): Flow[U] = transform(())((_, in) ⇒ ((), f(in))) + + def transform[S, U](zero: S)( + f: (S, O) ⇒ (S, immutable.Seq[U]), + onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, + isComplete: S ⇒ Boolean = (_: S) ⇒ false): Flow[U] = + andThen(Transform( + zero, + f.asInstanceOf[(Any, Any) ⇒ (Any, immutable.Seq[Any])], + onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]], + isComplete.asInstanceOf[Any ⇒ Boolean])) + + def transformRecover[S, U](zero: S)( + f: (S, Try[O]) ⇒ (S, immutable.Seq[U]), + onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, + isComplete: S ⇒ Boolean = (_: S) ⇒ false): Flow[U] = + andThen(Recover(Transform( + zero, + f.asInstanceOf[(Any, Any) ⇒ (Any, immutable.Seq[Any])], + onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]], + isComplete.asInstanceOf[Any ⇒ Boolean]))) + + override def zip[O2](other: Producer[O2]): Flow[(O, O2)] = andThen(Zip(other.asInstanceOf[Producer[Any]])) + + override def concat[U >: O](next: Producer[U]): Flow[U] = andThen(Concat(next.asInstanceOf[Producer[Any]])) + + override def merge[U >: O](other: Producer[U]): Flow[U] = andThen(Merge(other.asInstanceOf[Producer[Any]])) + + override def splitWhen(p: (O) ⇒ Boolean): Flow[Producer[O]] = andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) + + override def groupBy[K](f: (O) ⇒ K): Flow[(K, Producer[O])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) + + def toFuture(generator: ProcessorGenerator): Future[O] = { + val p = Promise[O]() + transformRecover(0)((x, in) ⇒ { p complete in; 1 -> Nil }, isComplete = _ == 1).consume(generator) + p.future + } + + def consume(generator: ProcessorGenerator): Unit = generator.consume(producerNode, ops) + + def toProducer(generator: ProcessorGenerator): Producer[O] = generator.toProducer(producerNode, ops) +} + diff --git a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala index 5a6d8a978d..047e607c3d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala @@ -5,7 +5,7 @@ package akka.stream.impl import org.reactivestreams.spi.Subscription import akka.actor.{ Terminated, Props, ActorRef } -import akka.stream.{ Stream, GeneratorSettings } +import akka.stream.GeneratorSettings import akka.stream.impl._ /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala index 967037e671..ff9389f3b0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala @@ -5,7 +5,7 @@ package akka.stream.impl import akka.actor.Props import akka.stream.GeneratorSettings -import akka.stream.Stream +import akka.stream.Stop /** * INTERNAL API @@ -13,7 +13,7 @@ import akka.stream.Stream private[akka] object IteratorProducer { def props(iterator: Iterator[Any], settings: GeneratorSettings): Props = { def f(): Any = { - if (!iterator.hasNext) throw Stream.Stop + if (!iterator.hasNext) throw Stop iterator.next() } ActorProducer.props(settings, f) diff --git a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala index 698df96370..1c634b62be 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala @@ -4,7 +4,7 @@ package akka.stream.impl import scala.collection.immutable -import scala.util.{Failure, Success} +import scala.util.{ Failure, Success } import akka.actor.Props import akka.stream.GeneratorSettings diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala index 41005eb1b3..4165fd22f6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala @@ -6,7 +6,7 @@ package akka.stream.impl import akka.actor.{ Props, ActorRef } import org.reactivestreams.spi.Subscription import akka.stream.impl._ -import akka.stream.{ Stream, GeneratorSettings } +import akka.stream.GeneratorSettings import akka.actor.Terminated /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala deleted file mode 100644 index cf5ef50082..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import scala.collection.immutable -import scala.concurrent.{ Future, Promise } -import scala.util.Try - -import org.reactivestreams.api.Producer - -import Ast.{ AstNode, Recover, Transform } -import akka.stream.{ ProcessorGenerator, Stream } - -/** - * INTERNAL API - */ -private[akka] case class StreamImpl[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]) extends Stream[O] { - import Ast._ - // Storing ops in reverse order - private def andThen[U](op: AstNode): Stream[U] = this.copy(ops = op :: ops) - - def map[U](f: O ⇒ U): Stream[U] = transform(())((_, in) ⇒ ((), List(f(in)))) - - def filter(p: O ⇒ Boolean): Stream[O] = transform(())((_, in) ⇒ if (p(in)) ((), List(in)) else ((), Nil)) - - def foreach(c: O ⇒ Unit): Stream[Unit] = transform(())((_, in) ⇒ c(in) -> Nil, _ ⇒ List(())) - - def fold[U](zero: U)(f: (U, O) ⇒ U): Stream[U] = transform(zero)((z, in) ⇒ f(z, in) -> Nil, z ⇒ List(z)) - - def drop(n: Int): Stream[O] = transform(n)((x, in) ⇒ if (x == 0) 0 -> List(in) else (x - 1) -> Nil) - - def take(n: Int): Stream[O] = transform(n)((x, in) ⇒ if (x == 0) 0 -> Nil else (x - 1) -> List(in), isComplete = _ == 0) - - def grouped(n: Int): Stream[immutable.Seq[O]] = - transform(immutable.Seq.empty[O])((buf, in) ⇒ { - val group = buf :+ in - if (group.size == n) (Nil, List(group)) - else (group, Nil) - }, x ⇒ if (x.isEmpty) Nil else List(x)) - - def mapConcat[U](f: O ⇒ immutable.Seq[U]): Stream[U] = transform(())((_, in) ⇒ ((), f(in))) - - def transform[S, U](zero: S)( - f: (S, O) ⇒ (S, immutable.Seq[U]), - onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, - isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U] = - andThen(Transform( - zero, - f.asInstanceOf[(Any, Any) ⇒ (Any, immutable.Seq[Any])], - onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]], - isComplete.asInstanceOf[Any ⇒ Boolean])) - - def transformRecover[S, U](zero: S)( - f: (S, Try[O]) ⇒ (S, immutable.Seq[U]), - onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, - isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U] = - andThen(Recover(Transform( - zero, - f.asInstanceOf[(Any, Any) ⇒ (Any, immutable.Seq[Any])], - onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]], - isComplete.asInstanceOf[Any ⇒ Boolean]))) - - override def zip[O2](other: Producer[O2]): Stream[(O, O2)] = andThen(Zip(other.asInstanceOf[Producer[Any]])) - - override def concat[U >: O](next: Producer[U]): Stream[U] = andThen(Concat(next.asInstanceOf[Producer[Any]])) - - override def merge[U >: O](other: Producer[U]): Stream[U] = andThen(Merge(other.asInstanceOf[Producer[Any]])) - - override def splitWhen(p: (O) ⇒ Boolean): Stream[Producer[O]] = andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) - - override def groupBy[K](f: (O) ⇒ K): Stream[(K, Producer[O])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) - - def toFuture(generator: ProcessorGenerator): Future[O] = { - val p = Promise[O]() - transformRecover(0)((x, in) ⇒ { p complete in; 1 -> Nil }, isComplete = _ == 1).consume(generator) - p.future - } - - def consume(generator: ProcessorGenerator): Unit = generator.consume(producerNode, ops) - - def toProducer(generator: ProcessorGenerator): Producer[O] = generator.toProducer(producerNode, ops) -} - diff --git a/akka-stream/src/main/scala/akka/stream/scala_api/Flow.scala b/akka-stream/src/main/scala/akka/stream/scala_api/Flow.scala new file mode 100644 index 0000000000..c395a7f268 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scala_api/Flow.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scala_api + +import scala.annotation.unchecked.uncheckedVariance +import scala.collection.immutable +import scala.concurrent.Future +import scala.util.Try +import scala.util.control.NoStackTrace + +import org.reactivestreams.api.Producer + +import akka.stream.ProcessorGenerator +import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode } +import akka.stream.impl.FlowImpl + +object Flow { + def apply[T](producer: Producer[T]): Flow[T] = FlowImpl(ExistingProducer(producer), Nil) + def apply[T](iterator: Iterator[T]): Flow[T] = FlowImpl(IteratorProducerNode(iterator), Nil) + def apply[T](iterable: immutable.Iterable[T]): Flow[T] = FlowImpl(IterableProducerNode(iterable), Nil) + + def apply[T](gen: ProcessorGenerator, f: () ⇒ T): Flow[T] = apply(gen.produce(f)) +} + +trait Flow[+T] { + def map[U](f: T ⇒ U): Flow[U] + def filter(p: T ⇒ Boolean): Flow[T] + def foreach(c: T ⇒ Unit): Flow[Unit] + def fold[U](zero: U)(f: (U, T) ⇒ U): Flow[U] + def drop(n: Int): Flow[T] + def take(n: Int): Flow[T] + def grouped(n: Int): Flow[immutable.Seq[T]] + def mapConcat[U](f: T ⇒ immutable.Seq[U]): Flow[U] + def transform[S, U](zero: S)( + f: (S, T) ⇒ (S, immutable.Seq[U]), + onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, + isComplete: S ⇒ Boolean = (_: S) ⇒ false): Flow[U] + def transformRecover[S, U](zero: S)( + f: (S, Try[T]) ⇒ (S, immutable.Seq[U]), + onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, + isComplete: S ⇒ Boolean = (_: S) ⇒ false): Flow[U] + + def groupBy[K](f: T ⇒ K): Flow[(K, Producer[T @uncheckedVariance])] + def splitWhen(p: T ⇒ Boolean): Flow[Producer[T @uncheckedVariance]] + + def merge[U >: T](other: Producer[U]): Flow[U] + def zip[U](other: Producer[U]): Flow[(T, U)] + def concat[U >: T](next: Producer[U]): Flow[U] + + def toFuture(generator: ProcessorGenerator): Future[T] + def consume(generator: ProcessorGenerator): Unit + def toProducer(generator: ProcessorGenerator): Producer[T @uncheckedVariance] +} + diff --git a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala index 2cb6469130..dee8bd3b9f 100644 --- a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala @@ -8,6 +8,7 @@ import org.reactivestreams.spi.Publisher import org.reactivestreams.tck.PublisherVerification import akka.stream.impl.ActorBasedProcessorGenerator import org.reactivestreams.api.Producer +import akka.stream.scala_api.Flow class ActorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { import system.dispatcher @@ -17,14 +18,14 @@ class ActorProducerTest extends PublisherVerification[Int] with WithActorSystem private def createProducer(elements: Int): Producer[Int] = { val iter = Iterator from 1000 val iter2 = if (elements > 0) iter take elements else iter - Stream(factory, () ⇒ if (iter2.hasNext) iter2.next() else throw Stream.Stop).toProducer(factory) + Flow(factory, () ⇒ if (iter2.hasNext) iter2.next() else throw Stop).toProducer(factory) } def createPublisher(elements: Int): Publisher[Int] = createProducer(elements).getPublisher override def createCompletedStatePublisher(): Publisher[Int] = { val pub = createProducer(1) - Stream(pub).consume(factory) + Flow(pub).consume(factory) Thread.sleep(100) pub.getPublisher } diff --git a/akka-stream/src/test/scala/akka/stream/StreamConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala similarity index 69% rename from akka-stream/src/test/scala/akka/stream/StreamConcatSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala index 618b46ff6f..f84c3cea26 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamConcatSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala @@ -6,8 +6,9 @@ package akka.stream import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec +import akka.stream.scala_api.Flow -class StreamConcatSpec extends AkkaSpec { +class FlowConcatSpec extends AkkaSpec { val gen = new ActorBasedProcessorGenerator(GeneratorSettings( initialInputBufferSize = 2, @@ -18,10 +19,10 @@ class StreamConcatSpec extends AkkaSpec { "Concat" must { "work in the happy case" in { - val source0 = Stream(List.empty[Int].iterator).toProducer(gen) - val source1 = Stream((1 to 4).iterator).toProducer(gen) - val source2 = Stream((5 to 10).iterator).toProducer(gen) - val p = Stream(source0).concat(source1).concat(source2).toProducer(gen) + val source0 = Flow(List.empty[Int].iterator).toProducer(gen) + val source1 = Flow((1 to 4).iterator).toProducer(gen) + val source2 = Flow((5 to 10).iterator).toProducer(gen) + val p = Flow(source0).concat(source1).concat(source2).toProducer(gen) val probe = StreamTestKit.consumerProbe[Int] p.produceTo(probe) diff --git a/akka-stream/src/test/scala/akka/stream/StreamDropSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala similarity index 100% rename from akka-stream/src/test/scala/akka/stream/StreamDropSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala diff --git a/akka-stream/src/test/scala/akka/stream/StreamFilterSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala similarity index 100% rename from akka-stream/src/test/scala/akka/stream/StreamFilterSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala diff --git a/akka-stream/src/test/scala/akka/stream/StreamFoldSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala similarity index 100% rename from akka-stream/src/test/scala/akka/stream/StreamFoldSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala diff --git a/akka-stream/src/test/scala/akka/stream/StreamForeachTest.scala b/akka-stream/src/test/scala/akka/stream/FlowForeachTest.scala similarity index 92% rename from akka-stream/src/test/scala/akka/stream/StreamForeachTest.scala rename to akka-stream/src/test/scala/akka/stream/FlowForeachTest.scala index 3831695899..13788f1bf7 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamForeachTest.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowForeachTest.scala @@ -7,7 +7,7 @@ import akka.testkit.AkkaSpec import akka.stream.testkit.ScriptedTest import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } -class StreamForeachSpec extends AkkaSpec with ScriptedTest { +class FlowForeachSpec extends AkkaSpec with ScriptedTest { val genSettings = GeneratorSettings( initialInputBufferSize = 2, diff --git a/akka-stream/src/test/scala/akka/stream/StreamGroupBySpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala similarity index 96% rename from akka-stream/src/test/scala/akka/stream/StreamGroupBySpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala index 4dcd1a093b..9d7d88aef8 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamGroupBySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala @@ -8,6 +8,7 @@ import akka.stream.testkit._ import akka.testkit.AkkaSpec import org.reactivestreams.api.Producer import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.scala_api.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class StreamGroupBySpec extends AkkaSpec { @@ -31,8 +32,8 @@ class StreamGroupBySpec extends AkkaSpec { } class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) { - val source = Stream((1 to elementCount).iterator).toProducer(gen) - val groupStream = Stream(source).groupBy(_ % groupCount).toProducer(gen) + val source = Flow((1 to elementCount).iterator).toProducer(gen) + val groupStream = Flow(source).groupBy(_ % groupCount).toProducer(gen) val masterConsumer = StreamTestKit.consumerProbe[(Int, Producer[Int])] groupStream.produceTo(masterConsumer) diff --git a/akka-stream/src/test/scala/akka/stream/StreamGroupedSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala similarity index 100% rename from akka-stream/src/test/scala/akka/stream/StreamGroupedSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala diff --git a/akka-stream/src/test/scala/akka/stream/StreamIterableSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala similarity index 87% rename from akka-stream/src/test/scala/akka/stream/StreamIterableSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala index 0a1b13cf65..98859645a1 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamIterableSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala @@ -11,16 +11,17 @@ import akka.dispatch.OnComplete import akka.stream.testkit.OnComplete import akka.stream.testkit.OnError import akka.stream.testkit.OnSubscribe +import akka.stream.scala_api.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamIterableSpec extends AkkaSpec { +class FlowIterableSpec extends AkkaSpec { val gen = ProcessorGenerator(GeneratorSettings( maximumInputBufferSize = 512)) - "A Stream based on an iterable" must { + "A Flow based on an iterable" must { "produce elements" in { - val p = Stream(List(1, 2, 3)).toProducer(gen) + val p = Flow(List(1, 2, 3)).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -34,7 +35,7 @@ class StreamIterableSpec extends AkkaSpec { } "complete empty" in { - val p = Stream(List.empty[Int]).toProducer(gen) + val p = Flow(List.empty[Int]).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) c.expectComplete() @@ -46,7 +47,7 @@ class StreamIterableSpec extends AkkaSpec { } "produce elements with multiple subscribers" in { - val p = Stream(List(1, 2, 3)).toProducer(gen) + val p = Flow(List(1, 2, 3)).toProducer(gen) val c1 = StreamTestKit.consumerProbe[Int] val c2 = StreamTestKit.consumerProbe[Int] p.produceTo(c1) @@ -70,7 +71,7 @@ class StreamIterableSpec extends AkkaSpec { } "produce elements to later subscriber" in { - val p = Stream(List(1, 2, 3)).toProducer(gen) + val p = Flow(List(1, 2, 3)).toProducer(gen) val c1 = StreamTestKit.consumerProbe[Int] val c2 = StreamTestKit.consumerProbe[Int] p.produceTo(c1) @@ -96,7 +97,7 @@ class StreamIterableSpec extends AkkaSpec { } "produce elements with one transformation step" in { - val p = Stream(List(1, 2, 3)).map(_ * 2).toProducer(gen) + val p = Flow(List(1, 2, 3)).map(_ * 2).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -108,7 +109,7 @@ class StreamIterableSpec extends AkkaSpec { } "produce elements with two transformation steps" in { - val p = Stream(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toProducer(gen) + val p = Flow(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -120,7 +121,7 @@ class StreamIterableSpec extends AkkaSpec { "allow cancel before receiving all elements" in { val count = 100000 - val p = Stream(1 to count).toProducer(gen) + val p = Flow(1 to count).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() diff --git a/akka-stream/src/test/scala/akka/stream/StreamIteratorSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala similarity index 85% rename from akka-stream/src/test/scala/akka/stream/StreamIteratorSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala index 814a4c2c20..3ceb680280 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamIteratorSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala @@ -9,9 +9,10 @@ import akka.testkit.AkkaSpec import akka.stream.testkit.OnNext import akka.stream.testkit.OnComplete import akka.stream.testkit.OnError +import akka.stream.scala_api.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamIteratorSpec extends AkkaSpec { +class FlowIteratorSpec extends AkkaSpec { val gen = ProcessorGenerator(GeneratorSettings( initialInputBufferSize = 2, @@ -19,9 +20,9 @@ class StreamIteratorSpec extends AkkaSpec { initialFanOutBufferSize = 4, maxFanOutBufferSize = 4)) - "A Stream based on an iterator" must { + "A Flow based on an iterator" must { "produce elements" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -35,7 +36,7 @@ class StreamIteratorSpec extends AkkaSpec { } "complete empty" in { - val p = Stream(List.empty[Int].iterator).toProducer(gen) + val p = Flow(List.empty[Int].iterator).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) c.expectComplete() @@ -47,7 +48,7 @@ class StreamIteratorSpec extends AkkaSpec { } "produce elements with multiple subscribers" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) val c1 = StreamTestKit.consumerProbe[Int] val c2 = StreamTestKit.consumerProbe[Int] p.produceTo(c1) @@ -71,7 +72,7 @@ class StreamIteratorSpec extends AkkaSpec { } "produce elements to later subscriber" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) val c1 = StreamTestKit.consumerProbe[Int] val c2 = StreamTestKit.consumerProbe[Int] p.produceTo(c1) @@ -94,7 +95,7 @@ class StreamIteratorSpec extends AkkaSpec { } "produce elements with one transformation step" in { - val p = Stream(List(1, 2, 3).iterator).map(_ * 2).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).map(_ * 2).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -106,7 +107,7 @@ class StreamIteratorSpec extends AkkaSpec { } "produce elements with two transformation steps" in { - val p = Stream(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toProducer(gen) + val p = Flow(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -118,7 +119,7 @@ class StreamIteratorSpec extends AkkaSpec { "allow cancel before receiving all elements" in { val count = 100000 - val p = Stream((1 to count).iterator).toProducer(gen) + val p = Flow((1 to count).iterator).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() diff --git a/akka-stream/src/test/scala/akka/stream/StreamMapConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala similarity index 91% rename from akka-stream/src/test/scala/akka/stream/StreamMapConcatSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala index ba6b31ba42..f0b8270b0b 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamMapConcatSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala @@ -6,7 +6,7 @@ package akka.stream import akka.testkit.AkkaSpec import akka.stream.testkit.ScriptedTest -class StreamMapConcatSpec extends AkkaSpec with ScriptedTest { +class FlowMapConcatSpec extends AkkaSpec with ScriptedTest { val genSettings = GeneratorSettings( initialInputBufferSize = 2, diff --git a/akka-stream/src/test/scala/akka/stream/StreamMapSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala similarity index 100% rename from akka-stream/src/test/scala/akka/stream/StreamMapSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala diff --git a/akka-stream/src/test/scala/akka/stream/StreamMergeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala similarity index 74% rename from akka-stream/src/test/scala/akka/stream/StreamMergeSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala index 61ca547d46..c52dbe6be8 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamMergeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala @@ -8,8 +8,9 @@ import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec import org.reactivestreams.api.Producer import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.scala_api.Flow -class StreamMergeSpec extends AkkaSpec { +class FlowMergeSpec extends AkkaSpec { val gen = new ActorBasedProcessorGenerator(GeneratorSettings( initialInputBufferSize = 2, @@ -21,10 +22,10 @@ class StreamMergeSpec extends AkkaSpec { "work in the happy case" in { // Different input sizes (4 and 6) - val source1 = Stream((1 to 4).iterator).toProducer(gen) - val source2 = Stream((5 to 10).iterator).toProducer(gen) - val source3 = Stream(List.empty[Int].iterator).toProducer(gen) - val p = Stream(source1).merge(source2).merge(source3).toProducer(gen) + val source1 = Flow((1 to 4).iterator).toProducer(gen) + val source2 = Flow((5 to 10).iterator).toProducer(gen) + val source3 = Flow(List.empty[Int].iterator).toProducer(gen) + val p = Flow(source1).merge(source2).merge(source3).toProducer(gen) val probe = StreamTestKit.consumerProbe[Int] p.produceTo(probe) diff --git a/akka-stream/src/test/scala/akka/stream/StreamSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala similarity index 97% rename from akka-stream/src/test/scala/akka/stream/StreamSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowSpec.scala index da1f62f67d..af480166a9 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala @@ -9,9 +9,10 @@ import akka.testkit._ import org.reactivestreams.api.Producer import org.scalatest.FreeSpecLike import com.typesafe.config.ConfigFactory +import akka.stream.scala_api.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { +class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { import system.dispatcher @@ -21,10 +22,10 @@ class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.re initialFanOutBufferSize = 1, maxFanOutBufferSize = 16) - val identity: Stream[Any] ⇒ Stream[Any] = in ⇒ in.map(e ⇒ e) - val identity2: Stream[Any] ⇒ Stream[Any] = in ⇒ identity(in) + val identity: Flow[Any] ⇒ Flow[Any] = in ⇒ in.map(e ⇒ e) + val identity2: Flow[Any] ⇒ Flow[Any] = in ⇒ identity(in) - "A Stream" must { + "A Flow" must { for ((name, op) ← List("identity" -> identity, "identity2" -> identity2); n ← List(1, 2, 4)) { s"requests initial elements from upstream ($name, $n)" in { @@ -99,7 +100,7 @@ class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.re } - "A Stream with multiple subscribers (FanOutBox)" must { + "A Flow with multiple subscribers (FanOutBox)" must { "adapt speed to the currently slowest consumer" in { new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { val downstream2 = StreamTestKit.consumerProbe[Any]() diff --git a/akka-stream/src/test/scala/akka/stream/StreamSplitWhenSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala similarity index 94% rename from akka-stream/src/test/scala/akka/stream/StreamSplitWhenSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala index f459731440..1a6b6f10f8 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamSplitWhenSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala @@ -8,6 +8,7 @@ import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec import org.reactivestreams.api.Producer import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.scala_api.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class StreamSplitWhenSpec extends AkkaSpec { @@ -31,8 +32,8 @@ class StreamSplitWhenSpec extends AkkaSpec { } class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) { - val source = Stream((1 to elementCount).iterator).toProducer(gen) - val groupStream = Stream(source).splitWhen(_ == splitWhen).toProducer(gen) + val source = Flow((1 to elementCount).iterator).toProducer(gen) + val groupStream = Flow(source).splitWhen(_ == splitWhen).toProducer(gen) val masterConsumer = StreamTestKit.consumerProbe[Producer[Int]] groupStream.produceTo(masterConsumer) diff --git a/akka-stream/src/test/scala/akka/stream/StreamTakeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala similarity index 100% rename from akka-stream/src/test/scala/akka/stream/StreamTakeSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala diff --git a/akka-stream/src/test/scala/akka/stream/StreamToFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala similarity index 84% rename from akka-stream/src/test/scala/akka/stream/StreamToFutureSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala index bdc77d71aa..4c05d12f62 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamToFutureSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala @@ -10,8 +10,9 @@ import akka.stream.testkit.StreamTestKit import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.Failure +import akka.stream.scala_api.Flow -class StreamToFutureSpec extends AkkaSpec with ScriptedTest { +class FlowToFutureSpec extends AkkaSpec with ScriptedTest { val gen = ProcessorGenerator(GeneratorSettings( initialInputBufferSize = 2, @@ -19,11 +20,11 @@ class StreamToFutureSpec extends AkkaSpec with ScriptedTest { initialFanOutBufferSize = 1, maxFanOutBufferSize = 16)) - "A Stream with toFuture" must { + "A Flow with toFuture" must { "yield the first value" in { val p = StreamTestKit.producerProbe[Int] - val f = Stream(p).toFuture(gen) + val f = Flow(p).toFuture(gen) val proc = p.expectSubscription proc.expectRequestMore() proc.sendNext(42) @@ -33,7 +34,7 @@ class StreamToFutureSpec extends AkkaSpec with ScriptedTest { "yield the first error" in { val p = StreamTestKit.producerProbe[Int] - val f = Stream(p).toFuture(gen) + val f = Flow(p).toFuture(gen) val proc = p.expectSubscription proc.expectRequestMore() val ex = new RuntimeException("ex") diff --git a/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala similarity index 85% rename from akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala index d8e9c10ef2..715c6df593 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala @@ -9,9 +9,10 @@ import akka.testkit.AkkaSpec import akka.testkit.EventFilter import scala.util.Failure import scala.util.control.NoStackTrace +import akka.stream.scala_api.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamTransformRecoverSpec extends AkkaSpec { +class FlowTransformRecoverSpec extends AkkaSpec { val gen = ProcessorGenerator(GeneratorSettings( initialInputBufferSize = 2, @@ -19,10 +20,10 @@ class StreamTransformRecoverSpec extends AkkaSpec { initialFanOutBufferSize = 2, maxFanOutBufferSize = 2)) - "A Stream with transformRecover operations" must { + "A Flow with transformRecover operations" must { "produce one-to-one transformation as expected" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transformRecover(0)((tot, elem) ⇒ (tot + elem.get, List(tot + elem.get))). toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] @@ -38,8 +39,8 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "produce one-to-several transformation as expected" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transformRecover(0)((tot, elem) ⇒ (tot + elem.get, Vector.fill(elem.get)(tot + elem.get))). toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] @@ -58,8 +59,8 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "produce dropping transformation as expected" in { - val p = Stream(List(1, 2, 3, 4).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3, 4).iterator).toProducer(gen) + val p2 = Flow(p). transformRecover(0)((tot, elem) ⇒ (tot + elem.get, if (elem.get % 2 == 0) Nil else List(tot + elem.get))). toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] @@ -75,8 +76,8 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "produce multi-step transformation as expected" in { - val p = Stream(List("a", "bc", "def").iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List("a", "bc", "def").iterator).toProducer(gen) + val p2 = Flow(p). transformRecover("") { (str, elem) ⇒ val concat = str + elem (concat, List(concat.length)) @@ -105,8 +106,8 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "invoke onComplete when done" in { - val p = Stream(List("a").iterator).toProducer(gen) - val p2 = Stream(p).transformRecover("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen) + val p = Flow(List("a").iterator).toProducer(gen) + val p2 = Flow(p).transformRecover("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen) val c = StreamTestKit.consumerProbe[String] p2.produceTo(c) val s = c.expectSubscription() @@ -117,7 +118,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { "allow cancellation using isComplete" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Stream(p).transformRecover("")((s, in) ⇒ (s + in, List(in.get)), isComplete = _ == "Success(1)").toProducer(gen) + val p2 = Flow(p).transformRecover("")((s, in) ⇒ (s + in, List(in.get)), isComplete = _ == "Success(1)").toProducer(gen) val proc = p.expectSubscription val c = StreamTestKit.consumerProbe[Int] p2.produceTo(c) @@ -132,7 +133,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { "call onComplete after isComplete signaled completion" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Stream(p).transformRecover("")( + val p2 = Flow(p).transformRecover("")( (s, in) ⇒ (s + in, List(in.get)), onComplete = x ⇒ List(x.size + 10), isComplete = _ == "Success(1)") @@ -151,8 +152,8 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "report error when exception is thrown" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transformRecover(0) { (_, elem) ⇒ if (elem.get == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem.get, elem.get)) }. @@ -173,7 +174,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { "transform errors when received" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Stream(p).transformRecover("")( + val p2 = Flow(p).transformRecover("")( { case (s, Failure(ex)) ⇒ (s + ex.getMessage, List(ex)) }, onComplete = x ⇒ List(TE(x.size + "10"))) .toProducer(gen) @@ -190,7 +191,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { "forward errors when received and thrown" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Stream(p).transformRecover("")((_, in) ⇒ "" -> List(in.get)).toProducer(gen) + val p2 = Flow(p).transformRecover("")((_, in) ⇒ "" -> List(in.get)).toProducer(gen) val proc = p.expectSubscription() val c = StreamTestKit.consumerProbe[Int] p2.produceTo(c) @@ -203,8 +204,8 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "support cancel as expected" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transformRecover(0) { (_, elem) ⇒ (0, List(elem.get, elem.get)) }. toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] diff --git a/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala similarity index 81% rename from akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala index e5647394b0..ba0dc27350 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala @@ -8,9 +8,10 @@ import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec import akka.testkit.EventFilter import com.typesafe.config.ConfigFactory +import akka.stream.scala_api.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { +class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { import system.dispatcher @@ -20,10 +21,10 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor initialFanOutBufferSize = 2, maxFanOutBufferSize = 2)) - "A Stream with transform operations" must { + "A Flow with transform operations" must { "produce one-to-one transformation as expected" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transform(0)((tot, elem) ⇒ (tot + elem, List(tot + elem))). toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] @@ -39,8 +40,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "produce one-to-several transformation as expected" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transform(0)((tot, elem) ⇒ (tot + elem, Vector.fill(elem)(tot + elem))). toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] @@ -59,8 +60,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "produce dropping transformation as expected" in { - val p = Stream(List(1, 2, 3, 4).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3, 4).iterator).toProducer(gen) + val p2 = Flow(p). transform(0)((tot, elem) ⇒ (tot + elem, if (elem % 2 == 0) Nil else List(tot + elem))). toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] @@ -76,8 +77,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "produce multi-step transformation as expected" in { - val p = Stream(List("a", "bc", "def").iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List("a", "bc", "def").iterator).toProducer(gen) + val p2 = Flow(p). transform("") { (str, elem) ⇒ val concat = str + elem (concat, List(concat.length)) @@ -106,8 +107,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "invoke onComplete when done" in { - val p = Stream(List("a").iterator).toProducer(gen) - val p2 = Stream(p).transform("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen) + val p = Flow(List("a").iterator).toProducer(gen) + val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen) val c = StreamTestKit.consumerProbe[String] p2.produceTo(c) val s = c.expectSubscription() @@ -118,7 +119,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor "allow cancellation using isComplete" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Stream(p).transform("")((s, in) ⇒ (s + in, List(in)), isComplete = _ == "1").toProducer(gen) + val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, List(in)), isComplete = _ == "1").toProducer(gen) val proc = p.expectSubscription val c = StreamTestKit.consumerProbe[Int] p2.produceTo(c) @@ -133,7 +134,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor "call onComplete after isComplete signaled completion" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Stream(p).transform("")((s, in) ⇒ (s + in, List(in)), onComplete = x ⇒ List(x.size + 10), isComplete = _ == "1").toProducer(gen) + val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, List(in)), onComplete = x ⇒ List(x.size + 10), isComplete = _ == "1").toProducer(gen) val proc = p.expectSubscription val c = StreamTestKit.consumerProbe[Int] p2.produceTo(c) @@ -148,8 +149,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "report error when exception is thrown" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transform(0) { (_, elem) ⇒ if (elem == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem, elem)) }. @@ -167,8 +168,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "support cancel as expected" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transform(0) { (_, elem) ⇒ (0, List(elem, elem)) }. toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] @@ -184,8 +185,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "support producing elements from empty inputs" in { - val p = Stream(List.empty[Int].iterator).toProducer(gen) - val p2 = Stream(p).transform(List(1, 2, 3))((s, _) ⇒ (s, Nil), onComplete = s ⇒ s). + val p = Flow(List.empty[Int].iterator).toProducer(gen) + val p2 = Flow(p).transform(List(1, 2, 3))((s, _) ⇒ (s, Nil), onComplete = s ⇒ s). toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] p2.produceTo(consumer) diff --git a/akka-stream/src/test/scala/akka/stream/StreamZipSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala similarity index 77% rename from akka-stream/src/test/scala/akka/stream/StreamZipSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala index 9da57bd7c7..44eeaff89f 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamZipSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala @@ -6,8 +6,9 @@ package akka.stream import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec +import akka.stream.scala_api.Flow -class StreamZipSpec extends AkkaSpec { +class FlowZipSpec extends AkkaSpec { val gen = new ActorBasedProcessorGenerator(GeneratorSettings( initialInputBufferSize = 2, @@ -19,9 +20,9 @@ class StreamZipSpec extends AkkaSpec { "work in the happy case" in { // Different input sizes (4 and 6) - val source1 = Stream((1 to 4).iterator).toProducer(gen) - val source2 = Stream(List("A", "B", "C", "D", "E", "F").iterator).toProducer(gen) - val p = Stream(source1).zip(source2).toProducer(gen) + val source1 = Flow((1 to 4).iterator).toProducer(gen) + val source2 = Flow(List("A", "B", "C", "D", "E", "F").iterator).toProducer(gen) + val p = Flow(source1).zip(source2).toProducer(gen) val probe = StreamTestKit.consumerProbe[(Int, String)] p.produceTo(probe) diff --git a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala index 292c2a5ce8..1bbd567896 100644 --- a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala @@ -14,6 +14,7 @@ import akka.stream.impl.Ast import akka.testkit.TestEvent import akka.testkit.EventFilter import akka.stream.impl.ActorBasedProcessorGenerator +import akka.stream.scala_api.Flow class IdentityProcessorTest extends IdentityProcessorVerification[Int] with WithActorSystem with TestNGSuiteLike { @@ -40,7 +41,7 @@ class IdentityProcessorTest extends IdentityProcessorVerification[Int] with With val gen = ProcessorGenerator(GeneratorSettings( maximumInputBufferSize = 512))(system) val iter = Iterator from 1000 - Stream(if (elements > 0) iter take elements else iter).toProducer(gen).getPublisher + Flow(if (elements > 0) iter take elements else iter).toProducer(gen).getPublisher } } diff --git a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala index 8cd0cb5606..3e905c11d4 100644 --- a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala @@ -7,6 +7,7 @@ import org.scalatest.testng.TestNGSuiteLike import org.reactivestreams.spi.Publisher import org.reactivestreams.tck.PublisherVerification import scala.collection.immutable +import akka.stream.scala_api.Flow class IterableProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { @@ -19,10 +20,10 @@ class IterableProducerTest extends PublisherVerification[Int] with WithActorSyst new immutable.Iterable[Int] { override def iterator = Iterator from 0 } else 0 until elements - Stream(iterable).toProducer(gen).getPublisher + Flow(iterable).toProducer(gen).getPublisher } override def createCompletedStatePublisher(): Publisher[Int] = - Stream[Int](Nil).toProducer(gen).getPublisher + Flow[Int](Nil).toProducer(gen).getPublisher } \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala index 3874383a84..6d5a116637 100644 --- a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala @@ -6,6 +6,7 @@ package akka.stream import org.scalatest.testng.TestNGSuiteLike import org.reactivestreams.spi.Publisher import org.reactivestreams.tck.PublisherVerification +import akka.stream.scala_api.Flow class IteratorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { @@ -18,10 +19,10 @@ class IteratorProducerTest extends PublisherVerification[Int] with WithActorSyst Iterator from 0 else (Iterator from 0).take(elements) - Stream(iter).toProducer(gen).getPublisher + Flow(iter).toProducer(gen).getPublisher } override def createCompletedStatePublisher(): Publisher[Int] = - Stream(List.empty[Int].iterator).toProducer(gen).getPublisher + Flow(List.empty[Int].iterator).toProducer(gen).getPublisher } \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala b/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala index e011a2d4ca..c64ad218e5 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala @@ -3,14 +3,15 @@ */ package akka.stream.testkit -import akka.stream.{ GeneratorSettings, Stream, ProcessorGenerator } +import akka.stream.{ GeneratorSettings, ProcessorGenerator } import akka.actor.ActorSystem +import akka.stream.scala_api.Flow -class ChainSetup[I, O](stream: Stream[I] ⇒ Stream[O], val settings: GeneratorSettings)(implicit val system: ActorSystem) { +class ChainSetup[I, O](stream: Flow[I] ⇒ Flow[O], val settings: GeneratorSettings)(implicit val system: ActorSystem) { val upstream = StreamTestKit.producerProbe[I]() val downstream = StreamTestKit.consumerProbe[O]() - private val s = stream(Stream(upstream)) + private val s = stream(Flow(upstream)) val producer = s.toProducer(ProcessorGenerator(settings)) val upstreamSubscription = upstream.expectSubscription() producer.produceTo(downstream) diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala index 18f1b0fd6e..45f91819b5 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala @@ -10,7 +10,8 @@ import scala.collection.immutable import scala.concurrent.forkjoin.ThreadLocalRandom import scala.concurrent.duration._ import scala.util.control.NonFatal -import akka.stream._ +import akka.stream.scala_api.Flow +import akka.stream.GeneratorSettings trait ScriptedTest extends ShouldMatchers { @@ -76,7 +77,7 @@ trait ScriptedTest extends ShouldMatchers { } class ScriptRunner[In, Out]( - op: Stream[In] ⇒ Stream[Out], + op: Flow[In] ⇒ Flow[Out], gen: GeneratorSettings, script: Script[In, Out], maximumOverrun: Int, @@ -186,7 +187,7 @@ trait ScriptedTest extends ShouldMatchers { } def runScript[In, Out](script: Script[In, Out], gen: GeneratorSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)( - op: Stream[In] ⇒ Stream[Out])(implicit system: ActorSystem): Unit = { + op: Flow[In] ⇒ Flow[Out])(implicit system: ActorSystem): Unit = { new ScriptRunner(op, gen, script, maximumOverrun, maximumRequest, maximumBuffer).run() }