From 9b78618c3aeba842902814c3436eaeced25c19fc Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 1 Apr 2014 15:19:42 +0200 Subject: [PATCH] !str The Big Rename Stream as a name is taken, so we use Flow (which does not happen to be in scope for every Scala program already). This also makes it clear that this constructs just the the pipe, which needs to be activated to become a Producer. Then, the different language bindings need to live in different packages, otherwise they would not be able to use the same name of the central abstraction. The plan is to use scala_api, java_api and java8_api, for starters. --- .../akka/stream/ProcessorGenerator.scala | 57 ++++++++++ .../src/main/scala/akka/stream/Stream.scala | 105 ------------------ .../src/main/scala/akka/stream/Support.scala | 8 ++ .../akka/stream/impl/ActorProducer.scala | 3 +- .../scala/akka/stream/impl/FlowImpl.scala | 85 ++++++++++++++ .../stream/impl/GroupByProcessorImpl.scala | 2 +- .../akka/stream/impl/IteratorProducer.scala | 4 +- .../stream/impl/SingleStreamProcessors.scala | 2 +- .../stream/impl/SplitWhenProcessorImpl.scala | 2 +- .../scala/akka/stream/impl/StreamImpl.scala | 84 -------------- .../scala/akka/stream/scala_api/Flow.scala | 55 +++++++++ .../scala/akka/stream/ActorProducerTest.scala | 5 +- ...mConcatSpec.scala => FlowConcatSpec.scala} | 11 +- ...treamDropSpec.scala => FlowDropSpec.scala} | 0 ...mFilterSpec.scala => FlowFilterSpec.scala} | 0 ...treamFoldSpec.scala => FlowFoldSpec.scala} | 0 ...oreachTest.scala => FlowForeachTest.scala} | 2 +- ...roupBySpec.scala => FlowGroupBySpec.scala} | 5 +- ...roupedSpec.scala => FlowGroupedSpec.scala} | 0 ...rableSpec.scala => FlowIterableSpec.scala} | 19 ++-- ...ratorSpec.scala => FlowIteratorSpec.scala} | 19 ++-- ...ncatSpec.scala => FlowMapConcatSpec.scala} | 2 +- ...{StreamMapSpec.scala => FlowMapSpec.scala} | 0 ...eamMergeSpec.scala => FlowMergeSpec.scala} | 11 +- .../{StreamSpec.scala => FlowSpec.scala} | 11 +- ...WhenSpec.scala => FlowSplitWhenSpec.scala} | 5 +- ...treamTakeSpec.scala => FlowTakeSpec.scala} | 0 ...utureSpec.scala => FlowToFutureSpec.scala} | 9 +- ...c.scala => FlowTransformRecoverSpec.scala} | 41 +++---- ...formSpec.scala => FlowTransformSpec.scala} | 41 +++---- ...{StreamZipSpec.scala => FlowZipSpec.scala} | 9 +- .../akka/stream/IdentityProcessorTest.scala | 3 +- .../akka/stream/IterableProducerTest.scala | 5 +- .../akka/stream/IteratorProducerTest.scala | 5 +- .../akka/stream/testkit/ChainSetup.scala | 7 +- .../akka/stream/testkit/ScriptedTest.scala | 7 +- 36 files changed, 328 insertions(+), 296 deletions(-) create mode 100644 akka-stream/src/main/scala/akka/stream/ProcessorGenerator.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/Stream.scala create mode 100644 akka-stream/src/main/scala/akka/stream/Support.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala create mode 100644 akka-stream/src/main/scala/akka/stream/scala_api/Flow.scala rename akka-stream/src/test/scala/akka/stream/{StreamConcatSpec.scala => FlowConcatSpec.scala} (69%) rename akka-stream/src/test/scala/akka/stream/{StreamDropSpec.scala => FlowDropSpec.scala} (100%) rename akka-stream/src/test/scala/akka/stream/{StreamFilterSpec.scala => FlowFilterSpec.scala} (100%) rename akka-stream/src/test/scala/akka/stream/{StreamFoldSpec.scala => FlowFoldSpec.scala} (100%) rename akka-stream/src/test/scala/akka/stream/{StreamForeachTest.scala => FlowForeachTest.scala} (92%) rename akka-stream/src/test/scala/akka/stream/{StreamGroupBySpec.scala => FlowGroupBySpec.scala} (96%) rename akka-stream/src/test/scala/akka/stream/{StreamGroupedSpec.scala => FlowGroupedSpec.scala} (100%) rename akka-stream/src/test/scala/akka/stream/{StreamIterableSpec.scala => FlowIterableSpec.scala} (87%) rename akka-stream/src/test/scala/akka/stream/{StreamIteratorSpec.scala => FlowIteratorSpec.scala} (85%) rename akka-stream/src/test/scala/akka/stream/{StreamMapConcatSpec.scala => FlowMapConcatSpec.scala} (91%) rename akka-stream/src/test/scala/akka/stream/{StreamMapSpec.scala => FlowMapSpec.scala} (100%) rename akka-stream/src/test/scala/akka/stream/{StreamMergeSpec.scala => FlowMergeSpec.scala} (74%) rename akka-stream/src/test/scala/akka/stream/{StreamSpec.scala => FlowSpec.scala} (97%) rename akka-stream/src/test/scala/akka/stream/{StreamSplitWhenSpec.scala => FlowSplitWhenSpec.scala} (94%) rename akka-stream/src/test/scala/akka/stream/{StreamTakeSpec.scala => FlowTakeSpec.scala} (100%) rename akka-stream/src/test/scala/akka/stream/{StreamToFutureSpec.scala => FlowToFutureSpec.scala} (84%) rename akka-stream/src/test/scala/akka/stream/{StreamTransformRecoverSpec.scala => FlowTransformRecoverSpec.scala} (85%) rename akka-stream/src/test/scala/akka/stream/{StreamTransformSpec.scala => FlowTransformSpec.scala} (81%) rename akka-stream/src/test/scala/akka/stream/{StreamZipSpec.scala => FlowZipSpec.scala} (77%) diff --git a/akka-stream/src/main/scala/akka/stream/ProcessorGenerator.scala b/akka-stream/src/main/scala/akka/stream/ProcessorGenerator.scala new file mode 100644 index 0000000000..6a0178fe07 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/ProcessorGenerator.scala @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration.FiniteDuration +import akka.actor.ActorRefFactory +import akka.stream.impl.ActorBasedProcessorGenerator +import akka.stream.impl.Ast +import org.reactivestreams.api.Producer +import scala.concurrent.duration._ + +// FIXME is Processor the right naming here? +object ProcessorGenerator { + def apply(settings: GeneratorSettings)(implicit context: ActorRefFactory): ProcessorGenerator = + new ActorBasedProcessorGenerator(settings, context) +} + +trait ProcessorGenerator { + /** + * INTERNAL API + * ops are stored in reverse order + */ + private[akka] def toProducer[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Producer[O] + /** + * INTERNAL API + */ + private[akka] def consume[I](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Unit + /** + * INTERNAL API + */ + private[akka] def produce[T](f: () ⇒ T): Producer[T] +} + +// FIXME default values? Should we have an extension that reads from config? +case class GeneratorSettings( + initialFanOutBufferSize: Int = 4, + maxFanOutBufferSize: Int = 16, + initialInputBufferSize: Int = 4, + maximumInputBufferSize: Int = 16, + upstreamSubscriptionTimeout: FiniteDuration = 3.seconds, + downstreamSubscriptionTimeout: FiniteDuration = 3.seconds) { + + private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 + require(initialFanOutBufferSize > 0, "initialFanOutBufferSize must be > 0") + require(maxFanOutBufferSize > 0, "maxFanOutBufferSize must be > 0") + require(initialFanOutBufferSize <= maxFanOutBufferSize, + s"initialFanOutBufferSize($initialFanOutBufferSize) must be <= maxFanOutBufferSize($maxFanOutBufferSize)") + + require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") + require(isPowerOfTwo(initialInputBufferSize), "initialInputBufferSize must be a power of two") + require(maximumInputBufferSize > 0, "maximumInputBufferSize must be > 0") + require(isPowerOfTwo(maximumInputBufferSize), "initialInputBufferSize must be a power of two") + require(initialInputBufferSize <= maximumInputBufferSize, + s"initialInputBufferSize($initialInputBufferSize) must be <= maximumInputBufferSize($maximumInputBufferSize)") +} + diff --git a/akka-stream/src/main/scala/akka/stream/Stream.scala b/akka-stream/src/main/scala/akka/stream/Stream.scala deleted file mode 100644 index f48568b933..0000000000 --- a/akka-stream/src/main/scala/akka/stream/Stream.scala +++ /dev/null @@ -1,105 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream - -import scala.collection.immutable -import org.reactivestreams.api.{ Consumer, Producer } -import scala.util.control.NonFatal -import akka.stream.impl._ -import akka.actor.ActorRefFactory -import scala.concurrent.ExecutionContext -import scala.concurrent.Future -import scala.util.Try -import scala.concurrent.duration._ -import scala.util.control.NoStackTrace -import akka.stream.impl.Ast.IteratorProducerNode -import akka.stream.impl.Ast.IterableProducerNode -import akka.stream.impl.Ast.ExistingProducer -import scala.annotation.unchecked.uncheckedVariance - -object Stream { - def apply[T](producer: Producer[T]): Stream[T] = StreamImpl(ExistingProducer(producer), Nil) - def apply[T](iterator: Iterator[T]): Stream[T] = StreamImpl(IteratorProducerNode(iterator), Nil) - def apply[T](iterable: immutable.Iterable[T]): Stream[T] = StreamImpl(IterableProducerNode(iterable), Nil) - - def apply[T](gen: ProcessorGenerator, f: () ⇒ T): Stream[T] = apply(gen.produce(f)) - - object Stop extends RuntimeException with NoStackTrace -} - -trait Stream[+T] { - def map[U](f: T ⇒ U): Stream[U] - def filter(p: T ⇒ Boolean): Stream[T] - def foreach(c: T ⇒ Unit): Stream[Unit] - def fold[U](zero: U)(f: (U, T) ⇒ U): Stream[U] - def drop(n: Int): Stream[T] - def take(n: Int): Stream[T] - def grouped(n: Int): Stream[immutable.Seq[T]] - def mapConcat[U](f: T ⇒ immutable.Seq[U]): Stream[U] - def transform[S, U](zero: S)( - f: (S, T) ⇒ (S, immutable.Seq[U]), - onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, - isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U] - def transformRecover[S, U](zero: S)( - f: (S, Try[T]) ⇒ (S, immutable.Seq[U]), - onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, - isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U] - - def groupBy[K](f: T ⇒ K): Stream[(K, Producer[T @uncheckedVariance])] - def splitWhen(p: T ⇒ Boolean): Stream[Producer[T @uncheckedVariance]] - - def merge[U >: T](other: Producer[U]): Stream[U] - def zip[U](other: Producer[U]): Stream[(T, U)] - def concat[U >: T](next: Producer[U]): Stream[U] - - def toFuture(generator: ProcessorGenerator): Future[T] - def consume(generator: ProcessorGenerator): Unit - def toProducer(generator: ProcessorGenerator): Producer[T @uncheckedVariance] -} - -// FIXME is Processor the right naming here? -object ProcessorGenerator { - def apply(settings: GeneratorSettings)(implicit context: ActorRefFactory): ProcessorGenerator = - new ActorBasedProcessorGenerator(settings, context) -} - -trait ProcessorGenerator { - /** - * INTERNAL API - * ops are stored in reverse order - */ - private[akka] def toProducer[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Producer[O] - /** - * INTERNAL API - */ - private[akka] def consume[I](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Unit - /** - * INTERNAL API - */ - private[akka] def produce[T](f: () ⇒ T): Producer[T] -} - -// FIXME default values? Should we have an extension that reads from config? -case class GeneratorSettings( - initialFanOutBufferSize: Int = 4, - maxFanOutBufferSize: Int = 16, - initialInputBufferSize: Int = 4, - maximumInputBufferSize: Int = 16, - upstreamSubscriptionTimeout: FiniteDuration = 3.seconds, - downstreamSubscriptionTimeout: FiniteDuration = 3.seconds) { - - private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 - require(initialFanOutBufferSize > 0, "initialFanOutBufferSize must be > 0") - require(maxFanOutBufferSize > 0, "maxFanOutBufferSize must be > 0") - require(initialFanOutBufferSize <= maxFanOutBufferSize, - s"initialFanOutBufferSize($initialFanOutBufferSize) must be <= maxFanOutBufferSize($maxFanOutBufferSize)") - - require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") - require(isPowerOfTwo(initialInputBufferSize), "initialInputBufferSize must be a power of two") - require(maximumInputBufferSize > 0, "maximumInputBufferSize must be > 0") - require(isPowerOfTwo(maximumInputBufferSize), "initialInputBufferSize must be a power of two") - require(initialInputBufferSize <= maximumInputBufferSize, - s"initialInputBufferSize($initialInputBufferSize) must be <= maximumInputBufferSize($maximumInputBufferSize)") -} - diff --git a/akka-stream/src/main/scala/akka/stream/Support.scala b/akka-stream/src/main/scala/akka/stream/Support.scala new file mode 100644 index 0000000000..0014a25e96 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/Support.scala @@ -0,0 +1,8 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream + +import scala.util.control.NoStackTrace + +case object Stop extends RuntimeException("Stop this flow") with NoStackTrace \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala index 82cbc6c218..8df5f79fa8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala @@ -16,7 +16,7 @@ import scala.concurrent.duration.Duration import scala.util.control.NonFatal import akka.actor.Props import scala.util.control.NoStackTrace -import akka.stream.Stream +import akka.stream.Stop /** * INTERNAL API @@ -123,7 +123,6 @@ private[akka] object ActorProducerImpl { * INTERNAL API */ private[akka] class ActorProducerImpl[T](f: () ⇒ T, settings: GeneratorSettings) extends Actor with ActorLogging with SubscriberManagement[T] { - import Stream._ import ActorProducerImpl._ type S = ActorSubscription[T] diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala new file mode 100644 index 0000000000..1591c5f618 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import scala.collection.immutable +import scala.concurrent.{ Future, Promise } +import scala.util.Try + +import org.reactivestreams.api.Producer + +import Ast.{ AstNode, Recover, Transform } +import akka.stream.ProcessorGenerator +import akka.stream.scala_api.Flow + +/** + * INTERNAL API + */ +private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]) extends Flow[O] { + import Ast._ + // Storing ops in reverse order + private def andThen[U](op: AstNode): Flow[U] = this.copy(ops = op :: ops) + + def map[U](f: O ⇒ U): Flow[U] = transform(())((_, in) ⇒ ((), List(f(in)))) + + def filter(p: O ⇒ Boolean): Flow[O] = transform(())((_, in) ⇒ if (p(in)) ((), List(in)) else ((), Nil)) + + def foreach(c: O ⇒ Unit): Flow[Unit] = transform(())((_, in) ⇒ c(in) -> Nil, _ ⇒ List(())) + + def fold[U](zero: U)(f: (U, O) ⇒ U): Flow[U] = transform(zero)((z, in) ⇒ f(z, in) -> Nil, z ⇒ List(z)) + + def drop(n: Int): Flow[O] = transform(n)((x, in) ⇒ if (x == 0) 0 -> List(in) else (x - 1) -> Nil) + + def take(n: Int): Flow[O] = transform(n)((x, in) ⇒ if (x == 0) 0 -> Nil else (x - 1) -> List(in), isComplete = _ == 0) + + def grouped(n: Int): Flow[immutable.Seq[O]] = + transform(immutable.Seq.empty[O])((buf, in) ⇒ { + val group = buf :+ in + if (group.size == n) (Nil, List(group)) + else (group, Nil) + }, x ⇒ if (x.isEmpty) Nil else List(x)) + + def mapConcat[U](f: O ⇒ immutable.Seq[U]): Flow[U] = transform(())((_, in) ⇒ ((), f(in))) + + def transform[S, U](zero: S)( + f: (S, O) ⇒ (S, immutable.Seq[U]), + onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, + isComplete: S ⇒ Boolean = (_: S) ⇒ false): Flow[U] = + andThen(Transform( + zero, + f.asInstanceOf[(Any, Any) ⇒ (Any, immutable.Seq[Any])], + onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]], + isComplete.asInstanceOf[Any ⇒ Boolean])) + + def transformRecover[S, U](zero: S)( + f: (S, Try[O]) ⇒ (S, immutable.Seq[U]), + onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, + isComplete: S ⇒ Boolean = (_: S) ⇒ false): Flow[U] = + andThen(Recover(Transform( + zero, + f.asInstanceOf[(Any, Any) ⇒ (Any, immutable.Seq[Any])], + onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]], + isComplete.asInstanceOf[Any ⇒ Boolean]))) + + override def zip[O2](other: Producer[O2]): Flow[(O, O2)] = andThen(Zip(other.asInstanceOf[Producer[Any]])) + + override def concat[U >: O](next: Producer[U]): Flow[U] = andThen(Concat(next.asInstanceOf[Producer[Any]])) + + override def merge[U >: O](other: Producer[U]): Flow[U] = andThen(Merge(other.asInstanceOf[Producer[Any]])) + + override def splitWhen(p: (O) ⇒ Boolean): Flow[Producer[O]] = andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) + + override def groupBy[K](f: (O) ⇒ K): Flow[(K, Producer[O])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) + + def toFuture(generator: ProcessorGenerator): Future[O] = { + val p = Promise[O]() + transformRecover(0)((x, in) ⇒ { p complete in; 1 -> Nil }, isComplete = _ == 1).consume(generator) + p.future + } + + def consume(generator: ProcessorGenerator): Unit = generator.consume(producerNode, ops) + + def toProducer(generator: ProcessorGenerator): Producer[O] = generator.toProducer(producerNode, ops) +} + diff --git a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala index 5a6d8a978d..047e607c3d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala @@ -5,7 +5,7 @@ package akka.stream.impl import org.reactivestreams.spi.Subscription import akka.actor.{ Terminated, Props, ActorRef } -import akka.stream.{ Stream, GeneratorSettings } +import akka.stream.GeneratorSettings import akka.stream.impl._ /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala index 967037e671..ff9389f3b0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala @@ -5,7 +5,7 @@ package akka.stream.impl import akka.actor.Props import akka.stream.GeneratorSettings -import akka.stream.Stream +import akka.stream.Stop /** * INTERNAL API @@ -13,7 +13,7 @@ import akka.stream.Stream private[akka] object IteratorProducer { def props(iterator: Iterator[Any], settings: GeneratorSettings): Props = { def f(): Any = { - if (!iterator.hasNext) throw Stream.Stop + if (!iterator.hasNext) throw Stop iterator.next() } ActorProducer.props(settings, f) diff --git a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala index 698df96370..1c634b62be 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala @@ -4,7 +4,7 @@ package akka.stream.impl import scala.collection.immutable -import scala.util.{Failure, Success} +import scala.util.{ Failure, Success } import akka.actor.Props import akka.stream.GeneratorSettings diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala index 41005eb1b3..4165fd22f6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala @@ -6,7 +6,7 @@ package akka.stream.impl import akka.actor.{ Props, ActorRef } import org.reactivestreams.spi.Subscription import akka.stream.impl._ -import akka.stream.{ Stream, GeneratorSettings } +import akka.stream.GeneratorSettings import akka.actor.Terminated /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala deleted file mode 100644 index cf5ef50082..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.impl - -import scala.collection.immutable -import scala.concurrent.{ Future, Promise } -import scala.util.Try - -import org.reactivestreams.api.Producer - -import Ast.{ AstNode, Recover, Transform } -import akka.stream.{ ProcessorGenerator, Stream } - -/** - * INTERNAL API - */ -private[akka] case class StreamImpl[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]) extends Stream[O] { - import Ast._ - // Storing ops in reverse order - private def andThen[U](op: AstNode): Stream[U] = this.copy(ops = op :: ops) - - def map[U](f: O ⇒ U): Stream[U] = transform(())((_, in) ⇒ ((), List(f(in)))) - - def filter(p: O ⇒ Boolean): Stream[O] = transform(())((_, in) ⇒ if (p(in)) ((), List(in)) else ((), Nil)) - - def foreach(c: O ⇒ Unit): Stream[Unit] = transform(())((_, in) ⇒ c(in) -> Nil, _ ⇒ List(())) - - def fold[U](zero: U)(f: (U, O) ⇒ U): Stream[U] = transform(zero)((z, in) ⇒ f(z, in) -> Nil, z ⇒ List(z)) - - def drop(n: Int): Stream[O] = transform(n)((x, in) ⇒ if (x == 0) 0 -> List(in) else (x - 1) -> Nil) - - def take(n: Int): Stream[O] = transform(n)((x, in) ⇒ if (x == 0) 0 -> Nil else (x - 1) -> List(in), isComplete = _ == 0) - - def grouped(n: Int): Stream[immutable.Seq[O]] = - transform(immutable.Seq.empty[O])((buf, in) ⇒ { - val group = buf :+ in - if (group.size == n) (Nil, List(group)) - else (group, Nil) - }, x ⇒ if (x.isEmpty) Nil else List(x)) - - def mapConcat[U](f: O ⇒ immutable.Seq[U]): Stream[U] = transform(())((_, in) ⇒ ((), f(in))) - - def transform[S, U](zero: S)( - f: (S, O) ⇒ (S, immutable.Seq[U]), - onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, - isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U] = - andThen(Transform( - zero, - f.asInstanceOf[(Any, Any) ⇒ (Any, immutable.Seq[Any])], - onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]], - isComplete.asInstanceOf[Any ⇒ Boolean])) - - def transformRecover[S, U](zero: S)( - f: (S, Try[O]) ⇒ (S, immutable.Seq[U]), - onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, - isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U] = - andThen(Recover(Transform( - zero, - f.asInstanceOf[(Any, Any) ⇒ (Any, immutable.Seq[Any])], - onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]], - isComplete.asInstanceOf[Any ⇒ Boolean]))) - - override def zip[O2](other: Producer[O2]): Stream[(O, O2)] = andThen(Zip(other.asInstanceOf[Producer[Any]])) - - override def concat[U >: O](next: Producer[U]): Stream[U] = andThen(Concat(next.asInstanceOf[Producer[Any]])) - - override def merge[U >: O](other: Producer[U]): Stream[U] = andThen(Merge(other.asInstanceOf[Producer[Any]])) - - override def splitWhen(p: (O) ⇒ Boolean): Stream[Producer[O]] = andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) - - override def groupBy[K](f: (O) ⇒ K): Stream[(K, Producer[O])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) - - def toFuture(generator: ProcessorGenerator): Future[O] = { - val p = Promise[O]() - transformRecover(0)((x, in) ⇒ { p complete in; 1 -> Nil }, isComplete = _ == 1).consume(generator) - p.future - } - - def consume(generator: ProcessorGenerator): Unit = generator.consume(producerNode, ops) - - def toProducer(generator: ProcessorGenerator): Producer[O] = generator.toProducer(producerNode, ops) -} - diff --git a/akka-stream/src/main/scala/akka/stream/scala_api/Flow.scala b/akka-stream/src/main/scala/akka/stream/scala_api/Flow.scala new file mode 100644 index 0000000000..c395a7f268 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scala_api/Flow.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scala_api + +import scala.annotation.unchecked.uncheckedVariance +import scala.collection.immutable +import scala.concurrent.Future +import scala.util.Try +import scala.util.control.NoStackTrace + +import org.reactivestreams.api.Producer + +import akka.stream.ProcessorGenerator +import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode } +import akka.stream.impl.FlowImpl + +object Flow { + def apply[T](producer: Producer[T]): Flow[T] = FlowImpl(ExistingProducer(producer), Nil) + def apply[T](iterator: Iterator[T]): Flow[T] = FlowImpl(IteratorProducerNode(iterator), Nil) + def apply[T](iterable: immutable.Iterable[T]): Flow[T] = FlowImpl(IterableProducerNode(iterable), Nil) + + def apply[T](gen: ProcessorGenerator, f: () ⇒ T): Flow[T] = apply(gen.produce(f)) +} + +trait Flow[+T] { + def map[U](f: T ⇒ U): Flow[U] + def filter(p: T ⇒ Boolean): Flow[T] + def foreach(c: T ⇒ Unit): Flow[Unit] + def fold[U](zero: U)(f: (U, T) ⇒ U): Flow[U] + def drop(n: Int): Flow[T] + def take(n: Int): Flow[T] + def grouped(n: Int): Flow[immutable.Seq[T]] + def mapConcat[U](f: T ⇒ immutable.Seq[U]): Flow[U] + def transform[S, U](zero: S)( + f: (S, T) ⇒ (S, immutable.Seq[U]), + onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, + isComplete: S ⇒ Boolean = (_: S) ⇒ false): Flow[U] + def transformRecover[S, U](zero: S)( + f: (S, Try[T]) ⇒ (S, immutable.Seq[U]), + onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, + isComplete: S ⇒ Boolean = (_: S) ⇒ false): Flow[U] + + def groupBy[K](f: T ⇒ K): Flow[(K, Producer[T @uncheckedVariance])] + def splitWhen(p: T ⇒ Boolean): Flow[Producer[T @uncheckedVariance]] + + def merge[U >: T](other: Producer[U]): Flow[U] + def zip[U](other: Producer[U]): Flow[(T, U)] + def concat[U >: T](next: Producer[U]): Flow[U] + + def toFuture(generator: ProcessorGenerator): Future[T] + def consume(generator: ProcessorGenerator): Unit + def toProducer(generator: ProcessorGenerator): Producer[T @uncheckedVariance] +} + diff --git a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala index 2cb6469130..dee8bd3b9f 100644 --- a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala @@ -8,6 +8,7 @@ import org.reactivestreams.spi.Publisher import org.reactivestreams.tck.PublisherVerification import akka.stream.impl.ActorBasedProcessorGenerator import org.reactivestreams.api.Producer +import akka.stream.scala_api.Flow class ActorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { import system.dispatcher @@ -17,14 +18,14 @@ class ActorProducerTest extends PublisherVerification[Int] with WithActorSystem private def createProducer(elements: Int): Producer[Int] = { val iter = Iterator from 1000 val iter2 = if (elements > 0) iter take elements else iter - Stream(factory, () ⇒ if (iter2.hasNext) iter2.next() else throw Stream.Stop).toProducer(factory) + Flow(factory, () ⇒ if (iter2.hasNext) iter2.next() else throw Stop).toProducer(factory) } def createPublisher(elements: Int): Publisher[Int] = createProducer(elements).getPublisher override def createCompletedStatePublisher(): Publisher[Int] = { val pub = createProducer(1) - Stream(pub).consume(factory) + Flow(pub).consume(factory) Thread.sleep(100) pub.getPublisher } diff --git a/akka-stream/src/test/scala/akka/stream/StreamConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala similarity index 69% rename from akka-stream/src/test/scala/akka/stream/StreamConcatSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala index 618b46ff6f..f84c3cea26 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamConcatSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala @@ -6,8 +6,9 @@ package akka.stream import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec +import akka.stream.scala_api.Flow -class StreamConcatSpec extends AkkaSpec { +class FlowConcatSpec extends AkkaSpec { val gen = new ActorBasedProcessorGenerator(GeneratorSettings( initialInputBufferSize = 2, @@ -18,10 +19,10 @@ class StreamConcatSpec extends AkkaSpec { "Concat" must { "work in the happy case" in { - val source0 = Stream(List.empty[Int].iterator).toProducer(gen) - val source1 = Stream((1 to 4).iterator).toProducer(gen) - val source2 = Stream((5 to 10).iterator).toProducer(gen) - val p = Stream(source0).concat(source1).concat(source2).toProducer(gen) + val source0 = Flow(List.empty[Int].iterator).toProducer(gen) + val source1 = Flow((1 to 4).iterator).toProducer(gen) + val source2 = Flow((5 to 10).iterator).toProducer(gen) + val p = Flow(source0).concat(source1).concat(source2).toProducer(gen) val probe = StreamTestKit.consumerProbe[Int] p.produceTo(probe) diff --git a/akka-stream/src/test/scala/akka/stream/StreamDropSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala similarity index 100% rename from akka-stream/src/test/scala/akka/stream/StreamDropSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala diff --git a/akka-stream/src/test/scala/akka/stream/StreamFilterSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala similarity index 100% rename from akka-stream/src/test/scala/akka/stream/StreamFilterSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala diff --git a/akka-stream/src/test/scala/akka/stream/StreamFoldSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala similarity index 100% rename from akka-stream/src/test/scala/akka/stream/StreamFoldSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala diff --git a/akka-stream/src/test/scala/akka/stream/StreamForeachTest.scala b/akka-stream/src/test/scala/akka/stream/FlowForeachTest.scala similarity index 92% rename from akka-stream/src/test/scala/akka/stream/StreamForeachTest.scala rename to akka-stream/src/test/scala/akka/stream/FlowForeachTest.scala index 3831695899..13788f1bf7 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamForeachTest.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowForeachTest.scala @@ -7,7 +7,7 @@ import akka.testkit.AkkaSpec import akka.stream.testkit.ScriptedTest import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } -class StreamForeachSpec extends AkkaSpec with ScriptedTest { +class FlowForeachSpec extends AkkaSpec with ScriptedTest { val genSettings = GeneratorSettings( initialInputBufferSize = 2, diff --git a/akka-stream/src/test/scala/akka/stream/StreamGroupBySpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala similarity index 96% rename from akka-stream/src/test/scala/akka/stream/StreamGroupBySpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala index 4dcd1a093b..9d7d88aef8 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamGroupBySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala @@ -8,6 +8,7 @@ import akka.stream.testkit._ import akka.testkit.AkkaSpec import org.reactivestreams.api.Producer import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.scala_api.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class StreamGroupBySpec extends AkkaSpec { @@ -31,8 +32,8 @@ class StreamGroupBySpec extends AkkaSpec { } class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) { - val source = Stream((1 to elementCount).iterator).toProducer(gen) - val groupStream = Stream(source).groupBy(_ % groupCount).toProducer(gen) + val source = Flow((1 to elementCount).iterator).toProducer(gen) + val groupStream = Flow(source).groupBy(_ % groupCount).toProducer(gen) val masterConsumer = StreamTestKit.consumerProbe[(Int, Producer[Int])] groupStream.produceTo(masterConsumer) diff --git a/akka-stream/src/test/scala/akka/stream/StreamGroupedSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala similarity index 100% rename from akka-stream/src/test/scala/akka/stream/StreamGroupedSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala diff --git a/akka-stream/src/test/scala/akka/stream/StreamIterableSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala similarity index 87% rename from akka-stream/src/test/scala/akka/stream/StreamIterableSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala index 0a1b13cf65..98859645a1 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamIterableSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala @@ -11,16 +11,17 @@ import akka.dispatch.OnComplete import akka.stream.testkit.OnComplete import akka.stream.testkit.OnError import akka.stream.testkit.OnSubscribe +import akka.stream.scala_api.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamIterableSpec extends AkkaSpec { +class FlowIterableSpec extends AkkaSpec { val gen = ProcessorGenerator(GeneratorSettings( maximumInputBufferSize = 512)) - "A Stream based on an iterable" must { + "A Flow based on an iterable" must { "produce elements" in { - val p = Stream(List(1, 2, 3)).toProducer(gen) + val p = Flow(List(1, 2, 3)).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -34,7 +35,7 @@ class StreamIterableSpec extends AkkaSpec { } "complete empty" in { - val p = Stream(List.empty[Int]).toProducer(gen) + val p = Flow(List.empty[Int]).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) c.expectComplete() @@ -46,7 +47,7 @@ class StreamIterableSpec extends AkkaSpec { } "produce elements with multiple subscribers" in { - val p = Stream(List(1, 2, 3)).toProducer(gen) + val p = Flow(List(1, 2, 3)).toProducer(gen) val c1 = StreamTestKit.consumerProbe[Int] val c2 = StreamTestKit.consumerProbe[Int] p.produceTo(c1) @@ -70,7 +71,7 @@ class StreamIterableSpec extends AkkaSpec { } "produce elements to later subscriber" in { - val p = Stream(List(1, 2, 3)).toProducer(gen) + val p = Flow(List(1, 2, 3)).toProducer(gen) val c1 = StreamTestKit.consumerProbe[Int] val c2 = StreamTestKit.consumerProbe[Int] p.produceTo(c1) @@ -96,7 +97,7 @@ class StreamIterableSpec extends AkkaSpec { } "produce elements with one transformation step" in { - val p = Stream(List(1, 2, 3)).map(_ * 2).toProducer(gen) + val p = Flow(List(1, 2, 3)).map(_ * 2).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -108,7 +109,7 @@ class StreamIterableSpec extends AkkaSpec { } "produce elements with two transformation steps" in { - val p = Stream(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toProducer(gen) + val p = Flow(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -120,7 +121,7 @@ class StreamIterableSpec extends AkkaSpec { "allow cancel before receiving all elements" in { val count = 100000 - val p = Stream(1 to count).toProducer(gen) + val p = Flow(1 to count).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() diff --git a/akka-stream/src/test/scala/akka/stream/StreamIteratorSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala similarity index 85% rename from akka-stream/src/test/scala/akka/stream/StreamIteratorSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala index 814a4c2c20..3ceb680280 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamIteratorSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala @@ -9,9 +9,10 @@ import akka.testkit.AkkaSpec import akka.stream.testkit.OnNext import akka.stream.testkit.OnComplete import akka.stream.testkit.OnError +import akka.stream.scala_api.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamIteratorSpec extends AkkaSpec { +class FlowIteratorSpec extends AkkaSpec { val gen = ProcessorGenerator(GeneratorSettings( initialInputBufferSize = 2, @@ -19,9 +20,9 @@ class StreamIteratorSpec extends AkkaSpec { initialFanOutBufferSize = 4, maxFanOutBufferSize = 4)) - "A Stream based on an iterator" must { + "A Flow based on an iterator" must { "produce elements" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -35,7 +36,7 @@ class StreamIteratorSpec extends AkkaSpec { } "complete empty" in { - val p = Stream(List.empty[Int].iterator).toProducer(gen) + val p = Flow(List.empty[Int].iterator).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) c.expectComplete() @@ -47,7 +48,7 @@ class StreamIteratorSpec extends AkkaSpec { } "produce elements with multiple subscribers" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) val c1 = StreamTestKit.consumerProbe[Int] val c2 = StreamTestKit.consumerProbe[Int] p.produceTo(c1) @@ -71,7 +72,7 @@ class StreamIteratorSpec extends AkkaSpec { } "produce elements to later subscriber" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) val c1 = StreamTestKit.consumerProbe[Int] val c2 = StreamTestKit.consumerProbe[Int] p.produceTo(c1) @@ -94,7 +95,7 @@ class StreamIteratorSpec extends AkkaSpec { } "produce elements with one transformation step" in { - val p = Stream(List(1, 2, 3).iterator).map(_ * 2).toProducer(gen) + val p = Flow(List(1, 2, 3).iterator).map(_ * 2).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -106,7 +107,7 @@ class StreamIteratorSpec extends AkkaSpec { } "produce elements with two transformation steps" in { - val p = Stream(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toProducer(gen) + val p = Flow(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() @@ -118,7 +119,7 @@ class StreamIteratorSpec extends AkkaSpec { "allow cancel before receiving all elements" in { val count = 100000 - val p = Stream((1 to count).iterator).toProducer(gen) + val p = Flow((1 to count).iterator).toProducer(gen) val c = StreamTestKit.consumerProbe[Int] p.produceTo(c) val sub = c.expectSubscription() diff --git a/akka-stream/src/test/scala/akka/stream/StreamMapConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala similarity index 91% rename from akka-stream/src/test/scala/akka/stream/StreamMapConcatSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala index ba6b31ba42..f0b8270b0b 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamMapConcatSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala @@ -6,7 +6,7 @@ package akka.stream import akka.testkit.AkkaSpec import akka.stream.testkit.ScriptedTest -class StreamMapConcatSpec extends AkkaSpec with ScriptedTest { +class FlowMapConcatSpec extends AkkaSpec with ScriptedTest { val genSettings = GeneratorSettings( initialInputBufferSize = 2, diff --git a/akka-stream/src/test/scala/akka/stream/StreamMapSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala similarity index 100% rename from akka-stream/src/test/scala/akka/stream/StreamMapSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala diff --git a/akka-stream/src/test/scala/akka/stream/StreamMergeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala similarity index 74% rename from akka-stream/src/test/scala/akka/stream/StreamMergeSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala index 61ca547d46..c52dbe6be8 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamMergeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala @@ -8,8 +8,9 @@ import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec import org.reactivestreams.api.Producer import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.scala_api.Flow -class StreamMergeSpec extends AkkaSpec { +class FlowMergeSpec extends AkkaSpec { val gen = new ActorBasedProcessorGenerator(GeneratorSettings( initialInputBufferSize = 2, @@ -21,10 +22,10 @@ class StreamMergeSpec extends AkkaSpec { "work in the happy case" in { // Different input sizes (4 and 6) - val source1 = Stream((1 to 4).iterator).toProducer(gen) - val source2 = Stream((5 to 10).iterator).toProducer(gen) - val source3 = Stream(List.empty[Int].iterator).toProducer(gen) - val p = Stream(source1).merge(source2).merge(source3).toProducer(gen) + val source1 = Flow((1 to 4).iterator).toProducer(gen) + val source2 = Flow((5 to 10).iterator).toProducer(gen) + val source3 = Flow(List.empty[Int].iterator).toProducer(gen) + val p = Flow(source1).merge(source2).merge(source3).toProducer(gen) val probe = StreamTestKit.consumerProbe[Int] p.produceTo(probe) diff --git a/akka-stream/src/test/scala/akka/stream/StreamSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala similarity index 97% rename from akka-stream/src/test/scala/akka/stream/StreamSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowSpec.scala index da1f62f67d..af480166a9 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala @@ -9,9 +9,10 @@ import akka.testkit._ import org.reactivestreams.api.Producer import org.scalatest.FreeSpecLike import com.typesafe.config.ConfigFactory +import akka.stream.scala_api.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { +class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { import system.dispatcher @@ -21,10 +22,10 @@ class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.re initialFanOutBufferSize = 1, maxFanOutBufferSize = 16) - val identity: Stream[Any] ⇒ Stream[Any] = in ⇒ in.map(e ⇒ e) - val identity2: Stream[Any] ⇒ Stream[Any] = in ⇒ identity(in) + val identity: Flow[Any] ⇒ Flow[Any] = in ⇒ in.map(e ⇒ e) + val identity2: Flow[Any] ⇒ Flow[Any] = in ⇒ identity(in) - "A Stream" must { + "A Flow" must { for ((name, op) ← List("identity" -> identity, "identity2" -> identity2); n ← List(1, 2, 4)) { s"requests initial elements from upstream ($name, $n)" in { @@ -99,7 +100,7 @@ class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.re } - "A Stream with multiple subscribers (FanOutBox)" must { + "A Flow with multiple subscribers (FanOutBox)" must { "adapt speed to the currently slowest consumer" in { new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) { val downstream2 = StreamTestKit.consumerProbe[Any]() diff --git a/akka-stream/src/test/scala/akka/stream/StreamSplitWhenSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala similarity index 94% rename from akka-stream/src/test/scala/akka/stream/StreamSplitWhenSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala index f459731440..1a6b6f10f8 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamSplitWhenSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala @@ -8,6 +8,7 @@ import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec import org.reactivestreams.api.Producer import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.scala_api.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class StreamSplitWhenSpec extends AkkaSpec { @@ -31,8 +32,8 @@ class StreamSplitWhenSpec extends AkkaSpec { } class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) { - val source = Stream((1 to elementCount).iterator).toProducer(gen) - val groupStream = Stream(source).splitWhen(_ == splitWhen).toProducer(gen) + val source = Flow((1 to elementCount).iterator).toProducer(gen) + val groupStream = Flow(source).splitWhen(_ == splitWhen).toProducer(gen) val masterConsumer = StreamTestKit.consumerProbe[Producer[Int]] groupStream.produceTo(masterConsumer) diff --git a/akka-stream/src/test/scala/akka/stream/StreamTakeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala similarity index 100% rename from akka-stream/src/test/scala/akka/stream/StreamTakeSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala diff --git a/akka-stream/src/test/scala/akka/stream/StreamToFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala similarity index 84% rename from akka-stream/src/test/scala/akka/stream/StreamToFutureSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala index bdc77d71aa..4c05d12f62 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamToFutureSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala @@ -10,8 +10,9 @@ import akka.stream.testkit.StreamTestKit import scala.concurrent.Await import scala.concurrent.duration._ import scala.util.Failure +import akka.stream.scala_api.Flow -class StreamToFutureSpec extends AkkaSpec with ScriptedTest { +class FlowToFutureSpec extends AkkaSpec with ScriptedTest { val gen = ProcessorGenerator(GeneratorSettings( initialInputBufferSize = 2, @@ -19,11 +20,11 @@ class StreamToFutureSpec extends AkkaSpec with ScriptedTest { initialFanOutBufferSize = 1, maxFanOutBufferSize = 16)) - "A Stream with toFuture" must { + "A Flow with toFuture" must { "yield the first value" in { val p = StreamTestKit.producerProbe[Int] - val f = Stream(p).toFuture(gen) + val f = Flow(p).toFuture(gen) val proc = p.expectSubscription proc.expectRequestMore() proc.sendNext(42) @@ -33,7 +34,7 @@ class StreamToFutureSpec extends AkkaSpec with ScriptedTest { "yield the first error" in { val p = StreamTestKit.producerProbe[Int] - val f = Stream(p).toFuture(gen) + val f = Flow(p).toFuture(gen) val proc = p.expectSubscription proc.expectRequestMore() val ex = new RuntimeException("ex") diff --git a/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala similarity index 85% rename from akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala index d8e9c10ef2..715c6df593 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala @@ -9,9 +9,10 @@ import akka.testkit.AkkaSpec import akka.testkit.EventFilter import scala.util.Failure import scala.util.control.NoStackTrace +import akka.stream.scala_api.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamTransformRecoverSpec extends AkkaSpec { +class FlowTransformRecoverSpec extends AkkaSpec { val gen = ProcessorGenerator(GeneratorSettings( initialInputBufferSize = 2, @@ -19,10 +20,10 @@ class StreamTransformRecoverSpec extends AkkaSpec { initialFanOutBufferSize = 2, maxFanOutBufferSize = 2)) - "A Stream with transformRecover operations" must { + "A Flow with transformRecover operations" must { "produce one-to-one transformation as expected" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transformRecover(0)((tot, elem) ⇒ (tot + elem.get, List(tot + elem.get))). toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] @@ -38,8 +39,8 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "produce one-to-several transformation as expected" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transformRecover(0)((tot, elem) ⇒ (tot + elem.get, Vector.fill(elem.get)(tot + elem.get))). toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] @@ -58,8 +59,8 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "produce dropping transformation as expected" in { - val p = Stream(List(1, 2, 3, 4).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3, 4).iterator).toProducer(gen) + val p2 = Flow(p). transformRecover(0)((tot, elem) ⇒ (tot + elem.get, if (elem.get % 2 == 0) Nil else List(tot + elem.get))). toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] @@ -75,8 +76,8 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "produce multi-step transformation as expected" in { - val p = Stream(List("a", "bc", "def").iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List("a", "bc", "def").iterator).toProducer(gen) + val p2 = Flow(p). transformRecover("") { (str, elem) ⇒ val concat = str + elem (concat, List(concat.length)) @@ -105,8 +106,8 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "invoke onComplete when done" in { - val p = Stream(List("a").iterator).toProducer(gen) - val p2 = Stream(p).transformRecover("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen) + val p = Flow(List("a").iterator).toProducer(gen) + val p2 = Flow(p).transformRecover("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen) val c = StreamTestKit.consumerProbe[String] p2.produceTo(c) val s = c.expectSubscription() @@ -117,7 +118,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { "allow cancellation using isComplete" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Stream(p).transformRecover("")((s, in) ⇒ (s + in, List(in.get)), isComplete = _ == "Success(1)").toProducer(gen) + val p2 = Flow(p).transformRecover("")((s, in) ⇒ (s + in, List(in.get)), isComplete = _ == "Success(1)").toProducer(gen) val proc = p.expectSubscription val c = StreamTestKit.consumerProbe[Int] p2.produceTo(c) @@ -132,7 +133,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { "call onComplete after isComplete signaled completion" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Stream(p).transformRecover("")( + val p2 = Flow(p).transformRecover("")( (s, in) ⇒ (s + in, List(in.get)), onComplete = x ⇒ List(x.size + 10), isComplete = _ == "Success(1)") @@ -151,8 +152,8 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "report error when exception is thrown" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transformRecover(0) { (_, elem) ⇒ if (elem.get == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem.get, elem.get)) }. @@ -173,7 +174,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { "transform errors when received" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Stream(p).transformRecover("")( + val p2 = Flow(p).transformRecover("")( { case (s, Failure(ex)) ⇒ (s + ex.getMessage, List(ex)) }, onComplete = x ⇒ List(TE(x.size + "10"))) .toProducer(gen) @@ -190,7 +191,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { "forward errors when received and thrown" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Stream(p).transformRecover("")((_, in) ⇒ "" -> List(in.get)).toProducer(gen) + val p2 = Flow(p).transformRecover("")((_, in) ⇒ "" -> List(in.get)).toProducer(gen) val proc = p.expectSubscription() val c = StreamTestKit.consumerProbe[Int] p2.produceTo(c) @@ -203,8 +204,8 @@ class StreamTransformRecoverSpec extends AkkaSpec { } "support cancel as expected" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transformRecover(0) { (_, elem) ⇒ (0, List(elem.get, elem.get)) }. toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] diff --git a/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala similarity index 81% rename from akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala index e5647394b0..ba0dc27350 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala @@ -8,9 +8,10 @@ import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec import akka.testkit.EventFilter import com.typesafe.config.ConfigFactory +import akka.stream.scala_api.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { +class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { import system.dispatcher @@ -20,10 +21,10 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor initialFanOutBufferSize = 2, maxFanOutBufferSize = 2)) - "A Stream with transform operations" must { + "A Flow with transform operations" must { "produce one-to-one transformation as expected" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transform(0)((tot, elem) ⇒ (tot + elem, List(tot + elem))). toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] @@ -39,8 +40,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "produce one-to-several transformation as expected" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transform(0)((tot, elem) ⇒ (tot + elem, Vector.fill(elem)(tot + elem))). toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] @@ -59,8 +60,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "produce dropping transformation as expected" in { - val p = Stream(List(1, 2, 3, 4).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3, 4).iterator).toProducer(gen) + val p2 = Flow(p). transform(0)((tot, elem) ⇒ (tot + elem, if (elem % 2 == 0) Nil else List(tot + elem))). toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] @@ -76,8 +77,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "produce multi-step transformation as expected" in { - val p = Stream(List("a", "bc", "def").iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List("a", "bc", "def").iterator).toProducer(gen) + val p2 = Flow(p). transform("") { (str, elem) ⇒ val concat = str + elem (concat, List(concat.length)) @@ -106,8 +107,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "invoke onComplete when done" in { - val p = Stream(List("a").iterator).toProducer(gen) - val p2 = Stream(p).transform("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen) + val p = Flow(List("a").iterator).toProducer(gen) + val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen) val c = StreamTestKit.consumerProbe[String] p2.produceTo(c) val s = c.expectSubscription() @@ -118,7 +119,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor "allow cancellation using isComplete" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Stream(p).transform("")((s, in) ⇒ (s + in, List(in)), isComplete = _ == "1").toProducer(gen) + val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, List(in)), isComplete = _ == "1").toProducer(gen) val proc = p.expectSubscription val c = StreamTestKit.consumerProbe[Int] p2.produceTo(c) @@ -133,7 +134,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor "call onComplete after isComplete signaled completion" in { val p = StreamTestKit.producerProbe[Int] - val p2 = Stream(p).transform("")((s, in) ⇒ (s + in, List(in)), onComplete = x ⇒ List(x.size + 10), isComplete = _ == "1").toProducer(gen) + val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, List(in)), onComplete = x ⇒ List(x.size + 10), isComplete = _ == "1").toProducer(gen) val proc = p.expectSubscription val c = StreamTestKit.consumerProbe[Int] p2.produceTo(c) @@ -148,8 +149,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "report error when exception is thrown" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transform(0) { (_, elem) ⇒ if (elem == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem, elem)) }. @@ -167,8 +168,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "support cancel as expected" in { - val p = Stream(List(1, 2, 3).iterator).toProducer(gen) - val p2 = Stream(p). + val p = Flow(List(1, 2, 3).iterator).toProducer(gen) + val p2 = Flow(p). transform(0) { (_, elem) ⇒ (0, List(elem, elem)) }. toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] @@ -184,8 +185,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor } "support producing elements from empty inputs" in { - val p = Stream(List.empty[Int].iterator).toProducer(gen) - val p2 = Stream(p).transform(List(1, 2, 3))((s, _) ⇒ (s, Nil), onComplete = s ⇒ s). + val p = Flow(List.empty[Int].iterator).toProducer(gen) + val p2 = Flow(p).transform(List(1, 2, 3))((s, _) ⇒ (s, Nil), onComplete = s ⇒ s). toProducer(gen) val consumer = StreamTestKit.consumerProbe[Int] p2.produceTo(consumer) diff --git a/akka-stream/src/test/scala/akka/stream/StreamZipSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala similarity index 77% rename from akka-stream/src/test/scala/akka/stream/StreamZipSpec.scala rename to akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala index 9da57bd7c7..44eeaff89f 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamZipSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala @@ -6,8 +6,9 @@ package akka.stream import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec +import akka.stream.scala_api.Flow -class StreamZipSpec extends AkkaSpec { +class FlowZipSpec extends AkkaSpec { val gen = new ActorBasedProcessorGenerator(GeneratorSettings( initialInputBufferSize = 2, @@ -19,9 +20,9 @@ class StreamZipSpec extends AkkaSpec { "work in the happy case" in { // Different input sizes (4 and 6) - val source1 = Stream((1 to 4).iterator).toProducer(gen) - val source2 = Stream(List("A", "B", "C", "D", "E", "F").iterator).toProducer(gen) - val p = Stream(source1).zip(source2).toProducer(gen) + val source1 = Flow((1 to 4).iterator).toProducer(gen) + val source2 = Flow(List("A", "B", "C", "D", "E", "F").iterator).toProducer(gen) + val p = Flow(source1).zip(source2).toProducer(gen) val probe = StreamTestKit.consumerProbe[(Int, String)] p.produceTo(probe) diff --git a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala index 292c2a5ce8..1bbd567896 100644 --- a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala @@ -14,6 +14,7 @@ import akka.stream.impl.Ast import akka.testkit.TestEvent import akka.testkit.EventFilter import akka.stream.impl.ActorBasedProcessorGenerator +import akka.stream.scala_api.Flow class IdentityProcessorTest extends IdentityProcessorVerification[Int] with WithActorSystem with TestNGSuiteLike { @@ -40,7 +41,7 @@ class IdentityProcessorTest extends IdentityProcessorVerification[Int] with With val gen = ProcessorGenerator(GeneratorSettings( maximumInputBufferSize = 512))(system) val iter = Iterator from 1000 - Stream(if (elements > 0) iter take elements else iter).toProducer(gen).getPublisher + Flow(if (elements > 0) iter take elements else iter).toProducer(gen).getPublisher } } diff --git a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala index 8cd0cb5606..3e905c11d4 100644 --- a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala @@ -7,6 +7,7 @@ import org.scalatest.testng.TestNGSuiteLike import org.reactivestreams.spi.Publisher import org.reactivestreams.tck.PublisherVerification import scala.collection.immutable +import akka.stream.scala_api.Flow class IterableProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { @@ -19,10 +20,10 @@ class IterableProducerTest extends PublisherVerification[Int] with WithActorSyst new immutable.Iterable[Int] { override def iterator = Iterator from 0 } else 0 until elements - Stream(iterable).toProducer(gen).getPublisher + Flow(iterable).toProducer(gen).getPublisher } override def createCompletedStatePublisher(): Publisher[Int] = - Stream[Int](Nil).toProducer(gen).getPublisher + Flow[Int](Nil).toProducer(gen).getPublisher } \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala index 3874383a84..6d5a116637 100644 --- a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala @@ -6,6 +6,7 @@ package akka.stream import org.scalatest.testng.TestNGSuiteLike import org.reactivestreams.spi.Publisher import org.reactivestreams.tck.PublisherVerification +import akka.stream.scala_api.Flow class IteratorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { @@ -18,10 +19,10 @@ class IteratorProducerTest extends PublisherVerification[Int] with WithActorSyst Iterator from 0 else (Iterator from 0).take(elements) - Stream(iter).toProducer(gen).getPublisher + Flow(iter).toProducer(gen).getPublisher } override def createCompletedStatePublisher(): Publisher[Int] = - Stream(List.empty[Int].iterator).toProducer(gen).getPublisher + Flow(List.empty[Int].iterator).toProducer(gen).getPublisher } \ No newline at end of file diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala b/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala index e011a2d4ca..c64ad218e5 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala @@ -3,14 +3,15 @@ */ package akka.stream.testkit -import akka.stream.{ GeneratorSettings, Stream, ProcessorGenerator } +import akka.stream.{ GeneratorSettings, ProcessorGenerator } import akka.actor.ActorSystem +import akka.stream.scala_api.Flow -class ChainSetup[I, O](stream: Stream[I] ⇒ Stream[O], val settings: GeneratorSettings)(implicit val system: ActorSystem) { +class ChainSetup[I, O](stream: Flow[I] ⇒ Flow[O], val settings: GeneratorSettings)(implicit val system: ActorSystem) { val upstream = StreamTestKit.producerProbe[I]() val downstream = StreamTestKit.consumerProbe[O]() - private val s = stream(Stream(upstream)) + private val s = stream(Flow(upstream)) val producer = s.toProducer(ProcessorGenerator(settings)) val upstreamSubscription = upstream.expectSubscription() producer.produceTo(downstream) diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala index 18f1b0fd6e..45f91819b5 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala @@ -10,7 +10,8 @@ import scala.collection.immutable import scala.concurrent.forkjoin.ThreadLocalRandom import scala.concurrent.duration._ import scala.util.control.NonFatal -import akka.stream._ +import akka.stream.scala_api.Flow +import akka.stream.GeneratorSettings trait ScriptedTest extends ShouldMatchers { @@ -76,7 +77,7 @@ trait ScriptedTest extends ShouldMatchers { } class ScriptRunner[In, Out]( - op: Stream[In] ⇒ Stream[Out], + op: Flow[In] ⇒ Flow[Out], gen: GeneratorSettings, script: Script[In, Out], maximumOverrun: Int, @@ -186,7 +187,7 @@ trait ScriptedTest extends ShouldMatchers { } def runScript[In, Out](script: Script[In, Out], gen: GeneratorSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)( - op: Stream[In] ⇒ Stream[Out])(implicit system: ActorSystem): Unit = { + op: Flow[In] ⇒ Flow[Out])(implicit system: ActorSystem): Unit = { new ScriptRunner(op, gen, script, maximumOverrun, maximumRequest, maximumBuffer).run() }