!str The Big Rename
Stream as a name is taken, so we use Flow (which does not happen to be in scope for every Scala program already). This also makes it clear that this constructs just the the pipe, which needs to be activated to become a Producer. Then, the different language bindings need to live in different packages, otherwise they would not be able to use the same name of the central abstraction. The plan is to use scala_api, java_api and java8_api, for starters.
This commit is contained in:
parent
289c03d1a1
commit
9b78618c3a
36 changed files with 328 additions and 296 deletions
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream
|
||||
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.actor.ActorRefFactory
|
||||
import akka.stream.impl.ActorBasedProcessorGenerator
|
||||
import akka.stream.impl.Ast
|
||||
import org.reactivestreams.api.Producer
|
||||
import scala.concurrent.duration._
|
||||
|
||||
// FIXME is Processor the right naming here?
|
||||
object ProcessorGenerator {
|
||||
def apply(settings: GeneratorSettings)(implicit context: ActorRefFactory): ProcessorGenerator =
|
||||
new ActorBasedProcessorGenerator(settings, context)
|
||||
}
|
||||
|
||||
trait ProcessorGenerator {
|
||||
/**
|
||||
* INTERNAL API
|
||||
* ops are stored in reverse order
|
||||
*/
|
||||
private[akka] def toProducer[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Producer[O]
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] def consume[I](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Unit
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] def produce[T](f: () ⇒ T): Producer[T]
|
||||
}
|
||||
|
||||
// FIXME default values? Should we have an extension that reads from config?
|
||||
case class GeneratorSettings(
|
||||
initialFanOutBufferSize: Int = 4,
|
||||
maxFanOutBufferSize: Int = 16,
|
||||
initialInputBufferSize: Int = 4,
|
||||
maximumInputBufferSize: Int = 16,
|
||||
upstreamSubscriptionTimeout: FiniteDuration = 3.seconds,
|
||||
downstreamSubscriptionTimeout: FiniteDuration = 3.seconds) {
|
||||
|
||||
private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0
|
||||
require(initialFanOutBufferSize > 0, "initialFanOutBufferSize must be > 0")
|
||||
require(maxFanOutBufferSize > 0, "maxFanOutBufferSize must be > 0")
|
||||
require(initialFanOutBufferSize <= maxFanOutBufferSize,
|
||||
s"initialFanOutBufferSize($initialFanOutBufferSize) must be <= maxFanOutBufferSize($maxFanOutBufferSize)")
|
||||
|
||||
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
|
||||
require(isPowerOfTwo(initialInputBufferSize), "initialInputBufferSize must be a power of two")
|
||||
require(maximumInputBufferSize > 0, "maximumInputBufferSize must be > 0")
|
||||
require(isPowerOfTwo(maximumInputBufferSize), "initialInputBufferSize must be a power of two")
|
||||
require(initialInputBufferSize <= maximumInputBufferSize,
|
||||
s"initialInputBufferSize($initialInputBufferSize) must be <= maximumInputBufferSize($maximumInputBufferSize)")
|
||||
}
|
||||
|
||||
|
|
@ -1,105 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream
|
||||
|
||||
import scala.collection.immutable
|
||||
import org.reactivestreams.api.{ Consumer, Producer }
|
||||
import scala.util.control.NonFatal
|
||||
import akka.stream.impl._
|
||||
import akka.actor.ActorRefFactory
|
||||
import scala.concurrent.ExecutionContext
|
||||
import scala.concurrent.Future
|
||||
import scala.util.Try
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.stream.impl.Ast.IteratorProducerNode
|
||||
import akka.stream.impl.Ast.IterableProducerNode
|
||||
import akka.stream.impl.Ast.ExistingProducer
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
|
||||
object Stream {
|
||||
def apply[T](producer: Producer[T]): Stream[T] = StreamImpl(ExistingProducer(producer), Nil)
|
||||
def apply[T](iterator: Iterator[T]): Stream[T] = StreamImpl(IteratorProducerNode(iterator), Nil)
|
||||
def apply[T](iterable: immutable.Iterable[T]): Stream[T] = StreamImpl(IterableProducerNode(iterable), Nil)
|
||||
|
||||
def apply[T](gen: ProcessorGenerator, f: () ⇒ T): Stream[T] = apply(gen.produce(f))
|
||||
|
||||
object Stop extends RuntimeException with NoStackTrace
|
||||
}
|
||||
|
||||
trait Stream[+T] {
|
||||
def map[U](f: T ⇒ U): Stream[U]
|
||||
def filter(p: T ⇒ Boolean): Stream[T]
|
||||
def foreach(c: T ⇒ Unit): Stream[Unit]
|
||||
def fold[U](zero: U)(f: (U, T) ⇒ U): Stream[U]
|
||||
def drop(n: Int): Stream[T]
|
||||
def take(n: Int): Stream[T]
|
||||
def grouped(n: Int): Stream[immutable.Seq[T]]
|
||||
def mapConcat[U](f: T ⇒ immutable.Seq[U]): Stream[U]
|
||||
def transform[S, U](zero: S)(
|
||||
f: (S, T) ⇒ (S, immutable.Seq[U]),
|
||||
onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil,
|
||||
isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U]
|
||||
def transformRecover[S, U](zero: S)(
|
||||
f: (S, Try[T]) ⇒ (S, immutable.Seq[U]),
|
||||
onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil,
|
||||
isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U]
|
||||
|
||||
def groupBy[K](f: T ⇒ K): Stream[(K, Producer[T @uncheckedVariance])]
|
||||
def splitWhen(p: T ⇒ Boolean): Stream[Producer[T @uncheckedVariance]]
|
||||
|
||||
def merge[U >: T](other: Producer[U]): Stream[U]
|
||||
def zip[U](other: Producer[U]): Stream[(T, U)]
|
||||
def concat[U >: T](next: Producer[U]): Stream[U]
|
||||
|
||||
def toFuture(generator: ProcessorGenerator): Future[T]
|
||||
def consume(generator: ProcessorGenerator): Unit
|
||||
def toProducer(generator: ProcessorGenerator): Producer[T @uncheckedVariance]
|
||||
}
|
||||
|
||||
// FIXME is Processor the right naming here?
|
||||
object ProcessorGenerator {
|
||||
def apply(settings: GeneratorSettings)(implicit context: ActorRefFactory): ProcessorGenerator =
|
||||
new ActorBasedProcessorGenerator(settings, context)
|
||||
}
|
||||
|
||||
trait ProcessorGenerator {
|
||||
/**
|
||||
* INTERNAL API
|
||||
* ops are stored in reverse order
|
||||
*/
|
||||
private[akka] def toProducer[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Producer[O]
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] def consume[I](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]): Unit
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] def produce[T](f: () ⇒ T): Producer[T]
|
||||
}
|
||||
|
||||
// FIXME default values? Should we have an extension that reads from config?
|
||||
case class GeneratorSettings(
|
||||
initialFanOutBufferSize: Int = 4,
|
||||
maxFanOutBufferSize: Int = 16,
|
||||
initialInputBufferSize: Int = 4,
|
||||
maximumInputBufferSize: Int = 16,
|
||||
upstreamSubscriptionTimeout: FiniteDuration = 3.seconds,
|
||||
downstreamSubscriptionTimeout: FiniteDuration = 3.seconds) {
|
||||
|
||||
private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0
|
||||
require(initialFanOutBufferSize > 0, "initialFanOutBufferSize must be > 0")
|
||||
require(maxFanOutBufferSize > 0, "maxFanOutBufferSize must be > 0")
|
||||
require(initialFanOutBufferSize <= maxFanOutBufferSize,
|
||||
s"initialFanOutBufferSize($initialFanOutBufferSize) must be <= maxFanOutBufferSize($maxFanOutBufferSize)")
|
||||
|
||||
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
|
||||
require(isPowerOfTwo(initialInputBufferSize), "initialInputBufferSize must be a power of two")
|
||||
require(maximumInputBufferSize > 0, "maximumInputBufferSize must be > 0")
|
||||
require(isPowerOfTwo(maximumInputBufferSize), "initialInputBufferSize must be a power of two")
|
||||
require(initialInputBufferSize <= maximumInputBufferSize,
|
||||
s"initialInputBufferSize($initialInputBufferSize) must be <= maximumInputBufferSize($maximumInputBufferSize)")
|
||||
}
|
||||
|
||||
8
akka-stream/src/main/scala/akka/stream/Support.scala
Normal file
8
akka-stream/src/main/scala/akka/stream/Support.scala
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream
|
||||
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
case object Stop extends RuntimeException("Stop this flow") with NoStackTrace
|
||||
|
|
@ -16,7 +16,7 @@ import scala.concurrent.duration.Duration
|
|||
import scala.util.control.NonFatal
|
||||
import akka.actor.Props
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.stream.Stream
|
||||
import akka.stream.Stop
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -123,7 +123,6 @@ private[akka] object ActorProducerImpl {
|
|||
* INTERNAL API
|
||||
*/
|
||||
private[akka] class ActorProducerImpl[T](f: () ⇒ T, settings: GeneratorSettings) extends Actor with ActorLogging with SubscriberManagement[T] {
|
||||
import Stream._
|
||||
import ActorProducerImpl._
|
||||
|
||||
type S = ActorSubscription[T]
|
||||
|
|
|
|||
85
akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala
Normal file
85
akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala
Normal file
|
|
@ -0,0 +1,85 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.impl
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
import scala.util.Try
|
||||
|
||||
import org.reactivestreams.api.Producer
|
||||
|
||||
import Ast.{ AstNode, Recover, Transform }
|
||||
import akka.stream.ProcessorGenerator
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]) extends Flow[O] {
|
||||
import Ast._
|
||||
// Storing ops in reverse order
|
||||
private def andThen[U](op: AstNode): Flow[U] = this.copy(ops = op :: ops)
|
||||
|
||||
def map[U](f: O ⇒ U): Flow[U] = transform(())((_, in) ⇒ ((), List(f(in))))
|
||||
|
||||
def filter(p: O ⇒ Boolean): Flow[O] = transform(())((_, in) ⇒ if (p(in)) ((), List(in)) else ((), Nil))
|
||||
|
||||
def foreach(c: O ⇒ Unit): Flow[Unit] = transform(())((_, in) ⇒ c(in) -> Nil, _ ⇒ List(()))
|
||||
|
||||
def fold[U](zero: U)(f: (U, O) ⇒ U): Flow[U] = transform(zero)((z, in) ⇒ f(z, in) -> Nil, z ⇒ List(z))
|
||||
|
||||
def drop(n: Int): Flow[O] = transform(n)((x, in) ⇒ if (x == 0) 0 -> List(in) else (x - 1) -> Nil)
|
||||
|
||||
def take(n: Int): Flow[O] = transform(n)((x, in) ⇒ if (x == 0) 0 -> Nil else (x - 1) -> List(in), isComplete = _ == 0)
|
||||
|
||||
def grouped(n: Int): Flow[immutable.Seq[O]] =
|
||||
transform(immutable.Seq.empty[O])((buf, in) ⇒ {
|
||||
val group = buf :+ in
|
||||
if (group.size == n) (Nil, List(group))
|
||||
else (group, Nil)
|
||||
}, x ⇒ if (x.isEmpty) Nil else List(x))
|
||||
|
||||
def mapConcat[U](f: O ⇒ immutable.Seq[U]): Flow[U] = transform(())((_, in) ⇒ ((), f(in)))
|
||||
|
||||
def transform[S, U](zero: S)(
|
||||
f: (S, O) ⇒ (S, immutable.Seq[U]),
|
||||
onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil,
|
||||
isComplete: S ⇒ Boolean = (_: S) ⇒ false): Flow[U] =
|
||||
andThen(Transform(
|
||||
zero,
|
||||
f.asInstanceOf[(Any, Any) ⇒ (Any, immutable.Seq[Any])],
|
||||
onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]],
|
||||
isComplete.asInstanceOf[Any ⇒ Boolean]))
|
||||
|
||||
def transformRecover[S, U](zero: S)(
|
||||
f: (S, Try[O]) ⇒ (S, immutable.Seq[U]),
|
||||
onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil,
|
||||
isComplete: S ⇒ Boolean = (_: S) ⇒ false): Flow[U] =
|
||||
andThen(Recover(Transform(
|
||||
zero,
|
||||
f.asInstanceOf[(Any, Any) ⇒ (Any, immutable.Seq[Any])],
|
||||
onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]],
|
||||
isComplete.asInstanceOf[Any ⇒ Boolean])))
|
||||
|
||||
override def zip[O2](other: Producer[O2]): Flow[(O, O2)] = andThen(Zip(other.asInstanceOf[Producer[Any]]))
|
||||
|
||||
override def concat[U >: O](next: Producer[U]): Flow[U] = andThen(Concat(next.asInstanceOf[Producer[Any]]))
|
||||
|
||||
override def merge[U >: O](other: Producer[U]): Flow[U] = andThen(Merge(other.asInstanceOf[Producer[Any]]))
|
||||
|
||||
override def splitWhen(p: (O) ⇒ Boolean): Flow[Producer[O]] = andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean]))
|
||||
|
||||
override def groupBy[K](f: (O) ⇒ K): Flow[(K, Producer[O])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any]))
|
||||
|
||||
def toFuture(generator: ProcessorGenerator): Future[O] = {
|
||||
val p = Promise[O]()
|
||||
transformRecover(0)((x, in) ⇒ { p complete in; 1 -> Nil }, isComplete = _ == 1).consume(generator)
|
||||
p.future
|
||||
}
|
||||
|
||||
def consume(generator: ProcessorGenerator): Unit = generator.consume(producerNode, ops)
|
||||
|
||||
def toProducer(generator: ProcessorGenerator): Producer[O] = generator.toProducer(producerNode, ops)
|
||||
}
|
||||
|
||||
|
|
@ -5,7 +5,7 @@ package akka.stream.impl
|
|||
|
||||
import org.reactivestreams.spi.Subscription
|
||||
import akka.actor.{ Terminated, Props, ActorRef }
|
||||
import akka.stream.{ Stream, GeneratorSettings }
|
||||
import akka.stream.GeneratorSettings
|
||||
import akka.stream.impl._
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ package akka.stream.impl
|
|||
|
||||
import akka.actor.Props
|
||||
import akka.stream.GeneratorSettings
|
||||
import akka.stream.Stream
|
||||
import akka.stream.Stop
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -13,7 +13,7 @@ import akka.stream.Stream
|
|||
private[akka] object IteratorProducer {
|
||||
def props(iterator: Iterator[Any], settings: GeneratorSettings): Props = {
|
||||
def f(): Any = {
|
||||
if (!iterator.hasNext) throw Stream.Stop
|
||||
if (!iterator.hasNext) throw Stop
|
||||
iterator.next()
|
||||
}
|
||||
ActorProducer.props(settings, f)
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@
|
|||
package akka.stream.impl
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.util.{Failure, Success}
|
||||
import scala.util.{ Failure, Success }
|
||||
|
||||
import akka.actor.Props
|
||||
import akka.stream.GeneratorSettings
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ package akka.stream.impl
|
|||
import akka.actor.{ Props, ActorRef }
|
||||
import org.reactivestreams.spi.Subscription
|
||||
import akka.stream.impl._
|
||||
import akka.stream.{ Stream, GeneratorSettings }
|
||||
import akka.stream.GeneratorSettings
|
||||
import akka.actor.Terminated
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -1,84 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.impl
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
import scala.util.Try
|
||||
|
||||
import org.reactivestreams.api.Producer
|
||||
|
||||
import Ast.{ AstNode, Recover, Transform }
|
||||
import akka.stream.{ ProcessorGenerator, Stream }
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] case class StreamImpl[I, O](producerNode: Ast.ProducerNode[I], ops: List[Ast.AstNode]) extends Stream[O] {
|
||||
import Ast._
|
||||
// Storing ops in reverse order
|
||||
private def andThen[U](op: AstNode): Stream[U] = this.copy(ops = op :: ops)
|
||||
|
||||
def map[U](f: O ⇒ U): Stream[U] = transform(())((_, in) ⇒ ((), List(f(in))))
|
||||
|
||||
def filter(p: O ⇒ Boolean): Stream[O] = transform(())((_, in) ⇒ if (p(in)) ((), List(in)) else ((), Nil))
|
||||
|
||||
def foreach(c: O ⇒ Unit): Stream[Unit] = transform(())((_, in) ⇒ c(in) -> Nil, _ ⇒ List(()))
|
||||
|
||||
def fold[U](zero: U)(f: (U, O) ⇒ U): Stream[U] = transform(zero)((z, in) ⇒ f(z, in) -> Nil, z ⇒ List(z))
|
||||
|
||||
def drop(n: Int): Stream[O] = transform(n)((x, in) ⇒ if (x == 0) 0 -> List(in) else (x - 1) -> Nil)
|
||||
|
||||
def take(n: Int): Stream[O] = transform(n)((x, in) ⇒ if (x == 0) 0 -> Nil else (x - 1) -> List(in), isComplete = _ == 0)
|
||||
|
||||
def grouped(n: Int): Stream[immutable.Seq[O]] =
|
||||
transform(immutable.Seq.empty[O])((buf, in) ⇒ {
|
||||
val group = buf :+ in
|
||||
if (group.size == n) (Nil, List(group))
|
||||
else (group, Nil)
|
||||
}, x ⇒ if (x.isEmpty) Nil else List(x))
|
||||
|
||||
def mapConcat[U](f: O ⇒ immutable.Seq[U]): Stream[U] = transform(())((_, in) ⇒ ((), f(in)))
|
||||
|
||||
def transform[S, U](zero: S)(
|
||||
f: (S, O) ⇒ (S, immutable.Seq[U]),
|
||||
onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil,
|
||||
isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U] =
|
||||
andThen(Transform(
|
||||
zero,
|
||||
f.asInstanceOf[(Any, Any) ⇒ (Any, immutable.Seq[Any])],
|
||||
onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]],
|
||||
isComplete.asInstanceOf[Any ⇒ Boolean]))
|
||||
|
||||
def transformRecover[S, U](zero: S)(
|
||||
f: (S, Try[O]) ⇒ (S, immutable.Seq[U]),
|
||||
onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil,
|
||||
isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U] =
|
||||
andThen(Recover(Transform(
|
||||
zero,
|
||||
f.asInstanceOf[(Any, Any) ⇒ (Any, immutable.Seq[Any])],
|
||||
onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]],
|
||||
isComplete.asInstanceOf[Any ⇒ Boolean])))
|
||||
|
||||
override def zip[O2](other: Producer[O2]): Stream[(O, O2)] = andThen(Zip(other.asInstanceOf[Producer[Any]]))
|
||||
|
||||
override def concat[U >: O](next: Producer[U]): Stream[U] = andThen(Concat(next.asInstanceOf[Producer[Any]]))
|
||||
|
||||
override def merge[U >: O](other: Producer[U]): Stream[U] = andThen(Merge(other.asInstanceOf[Producer[Any]]))
|
||||
|
||||
override def splitWhen(p: (O) ⇒ Boolean): Stream[Producer[O]] = andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean]))
|
||||
|
||||
override def groupBy[K](f: (O) ⇒ K): Stream[(K, Producer[O])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any]))
|
||||
|
||||
def toFuture(generator: ProcessorGenerator): Future[O] = {
|
||||
val p = Promise[O]()
|
||||
transformRecover(0)((x, in) ⇒ { p complete in; 1 -> Nil }, isComplete = _ == 1).consume(generator)
|
||||
p.future
|
||||
}
|
||||
|
||||
def consume(generator: ProcessorGenerator): Unit = generator.consume(producerNode, ops)
|
||||
|
||||
def toProducer(generator: ProcessorGenerator): Producer[O] = generator.toProducer(producerNode, ops)
|
||||
}
|
||||
|
||||
55
akka-stream/src/main/scala/akka/stream/scala_api/Flow.scala
Normal file
55
akka-stream/src/main/scala/akka/stream/scala_api/Flow.scala
Normal file
|
|
@ -0,0 +1,55 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.scala_api
|
||||
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import scala.util.Try
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
import org.reactivestreams.api.Producer
|
||||
|
||||
import akka.stream.ProcessorGenerator
|
||||
import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode }
|
||||
import akka.stream.impl.FlowImpl
|
||||
|
||||
object Flow {
|
||||
def apply[T](producer: Producer[T]): Flow[T] = FlowImpl(ExistingProducer(producer), Nil)
|
||||
def apply[T](iterator: Iterator[T]): Flow[T] = FlowImpl(IteratorProducerNode(iterator), Nil)
|
||||
def apply[T](iterable: immutable.Iterable[T]): Flow[T] = FlowImpl(IterableProducerNode(iterable), Nil)
|
||||
|
||||
def apply[T](gen: ProcessorGenerator, f: () ⇒ T): Flow[T] = apply(gen.produce(f))
|
||||
}
|
||||
|
||||
trait Flow[+T] {
|
||||
def map[U](f: T ⇒ U): Flow[U]
|
||||
def filter(p: T ⇒ Boolean): Flow[T]
|
||||
def foreach(c: T ⇒ Unit): Flow[Unit]
|
||||
def fold[U](zero: U)(f: (U, T) ⇒ U): Flow[U]
|
||||
def drop(n: Int): Flow[T]
|
||||
def take(n: Int): Flow[T]
|
||||
def grouped(n: Int): Flow[immutable.Seq[T]]
|
||||
def mapConcat[U](f: T ⇒ immutable.Seq[U]): Flow[U]
|
||||
def transform[S, U](zero: S)(
|
||||
f: (S, T) ⇒ (S, immutable.Seq[U]),
|
||||
onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil,
|
||||
isComplete: S ⇒ Boolean = (_: S) ⇒ false): Flow[U]
|
||||
def transformRecover[S, U](zero: S)(
|
||||
f: (S, Try[T]) ⇒ (S, immutable.Seq[U]),
|
||||
onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil,
|
||||
isComplete: S ⇒ Boolean = (_: S) ⇒ false): Flow[U]
|
||||
|
||||
def groupBy[K](f: T ⇒ K): Flow[(K, Producer[T @uncheckedVariance])]
|
||||
def splitWhen(p: T ⇒ Boolean): Flow[Producer[T @uncheckedVariance]]
|
||||
|
||||
def merge[U >: T](other: Producer[U]): Flow[U]
|
||||
def zip[U](other: Producer[U]): Flow[(T, U)]
|
||||
def concat[U >: T](next: Producer[U]): Flow[U]
|
||||
|
||||
def toFuture(generator: ProcessorGenerator): Future[T]
|
||||
def consume(generator: ProcessorGenerator): Unit
|
||||
def toProducer(generator: ProcessorGenerator): Producer[T @uncheckedVariance]
|
||||
}
|
||||
|
||||
|
|
@ -8,6 +8,7 @@ import org.reactivestreams.spi.Publisher
|
|||
import org.reactivestreams.tck.PublisherVerification
|
||||
import akka.stream.impl.ActorBasedProcessorGenerator
|
||||
import org.reactivestreams.api.Producer
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
class ActorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike {
|
||||
import system.dispatcher
|
||||
|
|
@ -17,14 +18,14 @@ class ActorProducerTest extends PublisherVerification[Int] with WithActorSystem
|
|||
private def createProducer(elements: Int): Producer[Int] = {
|
||||
val iter = Iterator from 1000
|
||||
val iter2 = if (elements > 0) iter take elements else iter
|
||||
Stream(factory, () ⇒ if (iter2.hasNext) iter2.next() else throw Stream.Stop).toProducer(factory)
|
||||
Flow(factory, () ⇒ if (iter2.hasNext) iter2.next() else throw Stop).toProducer(factory)
|
||||
}
|
||||
|
||||
def createPublisher(elements: Int): Publisher[Int] = createProducer(elements).getPublisher
|
||||
|
||||
override def createCompletedStatePublisher(): Publisher[Int] = {
|
||||
val pub = createProducer(1)
|
||||
Stream(pub).consume(factory)
|
||||
Flow(pub).consume(factory)
|
||||
Thread.sleep(100)
|
||||
pub.getPublisher
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,8 +6,9 @@ package akka.stream
|
|||
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
|
||||
import akka.stream.testkit.StreamTestKit
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
class StreamConcatSpec extends AkkaSpec {
|
||||
class FlowConcatSpec extends AkkaSpec {
|
||||
|
||||
val gen = new ActorBasedProcessorGenerator(GeneratorSettings(
|
||||
initialInputBufferSize = 2,
|
||||
|
|
@ -18,10 +19,10 @@ class StreamConcatSpec extends AkkaSpec {
|
|||
"Concat" must {
|
||||
|
||||
"work in the happy case" in {
|
||||
val source0 = Stream(List.empty[Int].iterator).toProducer(gen)
|
||||
val source1 = Stream((1 to 4).iterator).toProducer(gen)
|
||||
val source2 = Stream((5 to 10).iterator).toProducer(gen)
|
||||
val p = Stream(source0).concat(source1).concat(source2).toProducer(gen)
|
||||
val source0 = Flow(List.empty[Int].iterator).toProducer(gen)
|
||||
val source1 = Flow((1 to 4).iterator).toProducer(gen)
|
||||
val source2 = Flow((5 to 10).iterator).toProducer(gen)
|
||||
val p = Flow(source0).concat(source1).concat(source2).toProducer(gen)
|
||||
|
||||
val probe = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(probe)
|
||||
|
|
@ -7,7 +7,7 @@ import akka.testkit.AkkaSpec
|
|||
import akka.stream.testkit.ScriptedTest
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random }
|
||||
|
||||
class StreamForeachSpec extends AkkaSpec with ScriptedTest {
|
||||
class FlowForeachSpec extends AkkaSpec with ScriptedTest {
|
||||
|
||||
val genSettings = GeneratorSettings(
|
||||
initialInputBufferSize = 2,
|
||||
|
|
@ -8,6 +8,7 @@ import akka.stream.testkit._
|
|||
import akka.testkit.AkkaSpec
|
||||
import org.reactivestreams.api.Producer
|
||||
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class StreamGroupBySpec extends AkkaSpec {
|
||||
|
|
@ -31,8 +32,8 @@ class StreamGroupBySpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) {
|
||||
val source = Stream((1 to elementCount).iterator).toProducer(gen)
|
||||
val groupStream = Stream(source).groupBy(_ % groupCount).toProducer(gen)
|
||||
val source = Flow((1 to elementCount).iterator).toProducer(gen)
|
||||
val groupStream = Flow(source).groupBy(_ % groupCount).toProducer(gen)
|
||||
val masterConsumer = StreamTestKit.consumerProbe[(Int, Producer[Int])]
|
||||
|
||||
groupStream.produceTo(masterConsumer)
|
||||
|
|
@ -11,16 +11,17 @@ import akka.dispatch.OnComplete
|
|||
import akka.stream.testkit.OnComplete
|
||||
import akka.stream.testkit.OnError
|
||||
import akka.stream.testkit.OnSubscribe
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class StreamIterableSpec extends AkkaSpec {
|
||||
class FlowIterableSpec extends AkkaSpec {
|
||||
|
||||
val gen = ProcessorGenerator(GeneratorSettings(
|
||||
maximumInputBufferSize = 512))
|
||||
|
||||
"A Stream based on an iterable" must {
|
||||
"A Flow based on an iterable" must {
|
||||
"produce elements" in {
|
||||
val p = Stream(List(1, 2, 3)).toProducer(gen)
|
||||
val p = Flow(List(1, 2, 3)).toProducer(gen)
|
||||
val c = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(c)
|
||||
val sub = c.expectSubscription()
|
||||
|
|
@ -34,7 +35,7 @@ class StreamIterableSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"complete empty" in {
|
||||
val p = Stream(List.empty[Int]).toProducer(gen)
|
||||
val p = Flow(List.empty[Int]).toProducer(gen)
|
||||
val c = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(c)
|
||||
c.expectComplete()
|
||||
|
|
@ -46,7 +47,7 @@ class StreamIterableSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"produce elements with multiple subscribers" in {
|
||||
val p = Stream(List(1, 2, 3)).toProducer(gen)
|
||||
val p = Flow(List(1, 2, 3)).toProducer(gen)
|
||||
val c1 = StreamTestKit.consumerProbe[Int]
|
||||
val c2 = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(c1)
|
||||
|
|
@ -70,7 +71,7 @@ class StreamIterableSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"produce elements to later subscriber" in {
|
||||
val p = Stream(List(1, 2, 3)).toProducer(gen)
|
||||
val p = Flow(List(1, 2, 3)).toProducer(gen)
|
||||
val c1 = StreamTestKit.consumerProbe[Int]
|
||||
val c2 = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(c1)
|
||||
|
|
@ -96,7 +97,7 @@ class StreamIterableSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"produce elements with one transformation step" in {
|
||||
val p = Stream(List(1, 2, 3)).map(_ * 2).toProducer(gen)
|
||||
val p = Flow(List(1, 2, 3)).map(_ * 2).toProducer(gen)
|
||||
val c = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(c)
|
||||
val sub = c.expectSubscription()
|
||||
|
|
@ -108,7 +109,7 @@ class StreamIterableSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"produce elements with two transformation steps" in {
|
||||
val p = Stream(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toProducer(gen)
|
||||
val p = Flow(List(1, 2, 3, 4)).filter(_ % 2 == 0).map(_ * 2).toProducer(gen)
|
||||
val c = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(c)
|
||||
val sub = c.expectSubscription()
|
||||
|
|
@ -120,7 +121,7 @@ class StreamIterableSpec extends AkkaSpec {
|
|||
|
||||
"allow cancel before receiving all elements" in {
|
||||
val count = 100000
|
||||
val p = Stream(1 to count).toProducer(gen)
|
||||
val p = Flow(1 to count).toProducer(gen)
|
||||
val c = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(c)
|
||||
val sub = c.expectSubscription()
|
||||
|
|
@ -9,9 +9,10 @@ import akka.testkit.AkkaSpec
|
|||
import akka.stream.testkit.OnNext
|
||||
import akka.stream.testkit.OnComplete
|
||||
import akka.stream.testkit.OnError
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class StreamIteratorSpec extends AkkaSpec {
|
||||
class FlowIteratorSpec extends AkkaSpec {
|
||||
|
||||
val gen = ProcessorGenerator(GeneratorSettings(
|
||||
initialInputBufferSize = 2,
|
||||
|
|
@ -19,9 +20,9 @@ class StreamIteratorSpec extends AkkaSpec {
|
|||
initialFanOutBufferSize = 4,
|
||||
maxFanOutBufferSize = 4))
|
||||
|
||||
"A Stream based on an iterator" must {
|
||||
"A Flow based on an iterator" must {
|
||||
"produce elements" in {
|
||||
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val c = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(c)
|
||||
val sub = c.expectSubscription()
|
||||
|
|
@ -35,7 +36,7 @@ class StreamIteratorSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"complete empty" in {
|
||||
val p = Stream(List.empty[Int].iterator).toProducer(gen)
|
||||
val p = Flow(List.empty[Int].iterator).toProducer(gen)
|
||||
val c = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(c)
|
||||
c.expectComplete()
|
||||
|
|
@ -47,7 +48,7 @@ class StreamIteratorSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"produce elements with multiple subscribers" in {
|
||||
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val c1 = StreamTestKit.consumerProbe[Int]
|
||||
val c2 = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(c1)
|
||||
|
|
@ -71,7 +72,7 @@ class StreamIteratorSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"produce elements to later subscriber" in {
|
||||
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val c1 = StreamTestKit.consumerProbe[Int]
|
||||
val c2 = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(c1)
|
||||
|
|
@ -94,7 +95,7 @@ class StreamIteratorSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"produce elements with one transformation step" in {
|
||||
val p = Stream(List(1, 2, 3).iterator).map(_ * 2).toProducer(gen)
|
||||
val p = Flow(List(1, 2, 3).iterator).map(_ * 2).toProducer(gen)
|
||||
val c = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(c)
|
||||
val sub = c.expectSubscription()
|
||||
|
|
@ -106,7 +107,7 @@ class StreamIteratorSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"produce elements with two transformation steps" in {
|
||||
val p = Stream(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toProducer(gen)
|
||||
val p = Flow(List(1, 2, 3, 4).iterator).filter(_ % 2 == 0).map(_ * 2).toProducer(gen)
|
||||
val c = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(c)
|
||||
val sub = c.expectSubscription()
|
||||
|
|
@ -118,7 +119,7 @@ class StreamIteratorSpec extends AkkaSpec {
|
|||
|
||||
"allow cancel before receiving all elements" in {
|
||||
val count = 100000
|
||||
val p = Stream((1 to count).iterator).toProducer(gen)
|
||||
val p = Flow((1 to count).iterator).toProducer(gen)
|
||||
val c = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(c)
|
||||
val sub = c.expectSubscription()
|
||||
|
|
@ -6,7 +6,7 @@ package akka.stream
|
|||
import akka.testkit.AkkaSpec
|
||||
import akka.stream.testkit.ScriptedTest
|
||||
|
||||
class StreamMapConcatSpec extends AkkaSpec with ScriptedTest {
|
||||
class FlowMapConcatSpec extends AkkaSpec with ScriptedTest {
|
||||
|
||||
val genSettings = GeneratorSettings(
|
||||
initialInputBufferSize = 2,
|
||||
|
|
@ -8,8 +8,9 @@ import akka.stream.testkit.StreamTestKit
|
|||
import akka.testkit.AkkaSpec
|
||||
import org.reactivestreams.api.Producer
|
||||
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
class StreamMergeSpec extends AkkaSpec {
|
||||
class FlowMergeSpec extends AkkaSpec {
|
||||
|
||||
val gen = new ActorBasedProcessorGenerator(GeneratorSettings(
|
||||
initialInputBufferSize = 2,
|
||||
|
|
@ -21,10 +22,10 @@ class StreamMergeSpec extends AkkaSpec {
|
|||
|
||||
"work in the happy case" in {
|
||||
// Different input sizes (4 and 6)
|
||||
val source1 = Stream((1 to 4).iterator).toProducer(gen)
|
||||
val source2 = Stream((5 to 10).iterator).toProducer(gen)
|
||||
val source3 = Stream(List.empty[Int].iterator).toProducer(gen)
|
||||
val p = Stream(source1).merge(source2).merge(source3).toProducer(gen)
|
||||
val source1 = Flow((1 to 4).iterator).toProducer(gen)
|
||||
val source2 = Flow((5 to 10).iterator).toProducer(gen)
|
||||
val source3 = Flow(List.empty[Int].iterator).toProducer(gen)
|
||||
val p = Flow(source1).merge(source2).merge(source3).toProducer(gen)
|
||||
|
||||
val probe = StreamTestKit.consumerProbe[Int]
|
||||
p.produceTo(probe)
|
||||
|
|
@ -9,9 +9,10 @@ import akka.testkit._
|
|||
import org.reactivestreams.api.Producer
|
||||
import org.scalatest.FreeSpecLike
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) {
|
||||
class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) {
|
||||
|
||||
import system.dispatcher
|
||||
|
||||
|
|
@ -21,10 +22,10 @@ class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.re
|
|||
initialFanOutBufferSize = 1,
|
||||
maxFanOutBufferSize = 16)
|
||||
|
||||
val identity: Stream[Any] ⇒ Stream[Any] = in ⇒ in.map(e ⇒ e)
|
||||
val identity2: Stream[Any] ⇒ Stream[Any] = in ⇒ identity(in)
|
||||
val identity: Flow[Any] ⇒ Flow[Any] = in ⇒ in.map(e ⇒ e)
|
||||
val identity2: Flow[Any] ⇒ Flow[Any] = in ⇒ identity(in)
|
||||
|
||||
"A Stream" must {
|
||||
"A Flow" must {
|
||||
|
||||
for ((name, op) ← List("identity" -> identity, "identity2" -> identity2); n ← List(1, 2, 4)) {
|
||||
s"requests initial elements from upstream ($name, $n)" in {
|
||||
|
|
@ -99,7 +100,7 @@ class StreamSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.re
|
|||
|
||||
}
|
||||
|
||||
"A Stream with multiple subscribers (FanOutBox)" must {
|
||||
"A Flow with multiple subscribers (FanOutBox)" must {
|
||||
"adapt speed to the currently slowest consumer" in {
|
||||
new ChainSetup(identity, genSettings.copy(initialInputBufferSize = 1, maxFanOutBufferSize = 1)) {
|
||||
val downstream2 = StreamTestKit.consumerProbe[Any]()
|
||||
|
|
@ -8,6 +8,7 @@ import akka.stream.testkit.StreamTestKit
|
|||
import akka.testkit.AkkaSpec
|
||||
import org.reactivestreams.api.Producer
|
||||
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class StreamSplitWhenSpec extends AkkaSpec {
|
||||
|
|
@ -31,8 +32,8 @@ class StreamSplitWhenSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) {
|
||||
val source = Stream((1 to elementCount).iterator).toProducer(gen)
|
||||
val groupStream = Stream(source).splitWhen(_ == splitWhen).toProducer(gen)
|
||||
val source = Flow((1 to elementCount).iterator).toProducer(gen)
|
||||
val groupStream = Flow(source).splitWhen(_ == splitWhen).toProducer(gen)
|
||||
val masterConsumer = StreamTestKit.consumerProbe[Producer[Int]]
|
||||
|
||||
groupStream.produceTo(masterConsumer)
|
||||
|
|
@ -10,8 +10,9 @@ import akka.stream.testkit.StreamTestKit
|
|||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.Failure
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
class StreamToFutureSpec extends AkkaSpec with ScriptedTest {
|
||||
class FlowToFutureSpec extends AkkaSpec with ScriptedTest {
|
||||
|
||||
val gen = ProcessorGenerator(GeneratorSettings(
|
||||
initialInputBufferSize = 2,
|
||||
|
|
@ -19,11 +20,11 @@ class StreamToFutureSpec extends AkkaSpec with ScriptedTest {
|
|||
initialFanOutBufferSize = 1,
|
||||
maxFanOutBufferSize = 16))
|
||||
|
||||
"A Stream with toFuture" must {
|
||||
"A Flow with toFuture" must {
|
||||
|
||||
"yield the first value" in {
|
||||
val p = StreamTestKit.producerProbe[Int]
|
||||
val f = Stream(p).toFuture(gen)
|
||||
val f = Flow(p).toFuture(gen)
|
||||
val proc = p.expectSubscription
|
||||
proc.expectRequestMore()
|
||||
proc.sendNext(42)
|
||||
|
|
@ -33,7 +34,7 @@ class StreamToFutureSpec extends AkkaSpec with ScriptedTest {
|
|||
|
||||
"yield the first error" in {
|
||||
val p = StreamTestKit.producerProbe[Int]
|
||||
val f = Stream(p).toFuture(gen)
|
||||
val f = Flow(p).toFuture(gen)
|
||||
val proc = p.expectSubscription
|
||||
proc.expectRequestMore()
|
||||
val ex = new RuntimeException("ex")
|
||||
|
|
@ -9,9 +9,10 @@ import akka.testkit.AkkaSpec
|
|||
import akka.testkit.EventFilter
|
||||
import scala.util.Failure
|
||||
import scala.util.control.NoStackTrace
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class StreamTransformRecoverSpec extends AkkaSpec {
|
||||
class FlowTransformRecoverSpec extends AkkaSpec {
|
||||
|
||||
val gen = ProcessorGenerator(GeneratorSettings(
|
||||
initialInputBufferSize = 2,
|
||||
|
|
@ -19,10 +20,10 @@ class StreamTransformRecoverSpec extends AkkaSpec {
|
|||
initialFanOutBufferSize = 2,
|
||||
maxFanOutBufferSize = 2))
|
||||
|
||||
"A Stream with transformRecover operations" must {
|
||||
"A Flow with transformRecover operations" must {
|
||||
"produce one-to-one transformation as expected" in {
|
||||
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Stream(p).
|
||||
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Flow(p).
|
||||
transformRecover(0)((tot, elem) ⇒ (tot + elem.get, List(tot + elem.get))).
|
||||
toProducer(gen)
|
||||
val consumer = StreamTestKit.consumerProbe[Int]
|
||||
|
|
@ -38,8 +39,8 @@ class StreamTransformRecoverSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"produce one-to-several transformation as expected" in {
|
||||
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Stream(p).
|
||||
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Flow(p).
|
||||
transformRecover(0)((tot, elem) ⇒ (tot + elem.get, Vector.fill(elem.get)(tot + elem.get))).
|
||||
toProducer(gen)
|
||||
val consumer = StreamTestKit.consumerProbe[Int]
|
||||
|
|
@ -58,8 +59,8 @@ class StreamTransformRecoverSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"produce dropping transformation as expected" in {
|
||||
val p = Stream(List(1, 2, 3, 4).iterator).toProducer(gen)
|
||||
val p2 = Stream(p).
|
||||
val p = Flow(List(1, 2, 3, 4).iterator).toProducer(gen)
|
||||
val p2 = Flow(p).
|
||||
transformRecover(0)((tot, elem) ⇒ (tot + elem.get, if (elem.get % 2 == 0) Nil else List(tot + elem.get))).
|
||||
toProducer(gen)
|
||||
val consumer = StreamTestKit.consumerProbe[Int]
|
||||
|
|
@ -75,8 +76,8 @@ class StreamTransformRecoverSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"produce multi-step transformation as expected" in {
|
||||
val p = Stream(List("a", "bc", "def").iterator).toProducer(gen)
|
||||
val p2 = Stream(p).
|
||||
val p = Flow(List("a", "bc", "def").iterator).toProducer(gen)
|
||||
val p2 = Flow(p).
|
||||
transformRecover("") { (str, elem) ⇒
|
||||
val concat = str + elem
|
||||
(concat, List(concat.length))
|
||||
|
|
@ -105,8 +106,8 @@ class StreamTransformRecoverSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"invoke onComplete when done" in {
|
||||
val p = Stream(List("a").iterator).toProducer(gen)
|
||||
val p2 = Stream(p).transformRecover("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen)
|
||||
val p = Flow(List("a").iterator).toProducer(gen)
|
||||
val p2 = Flow(p).transformRecover("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen)
|
||||
val c = StreamTestKit.consumerProbe[String]
|
||||
p2.produceTo(c)
|
||||
val s = c.expectSubscription()
|
||||
|
|
@ -117,7 +118,7 @@ class StreamTransformRecoverSpec extends AkkaSpec {
|
|||
|
||||
"allow cancellation using isComplete" in {
|
||||
val p = StreamTestKit.producerProbe[Int]
|
||||
val p2 = Stream(p).transformRecover("")((s, in) ⇒ (s + in, List(in.get)), isComplete = _ == "Success(1)").toProducer(gen)
|
||||
val p2 = Flow(p).transformRecover("")((s, in) ⇒ (s + in, List(in.get)), isComplete = _ == "Success(1)").toProducer(gen)
|
||||
val proc = p.expectSubscription
|
||||
val c = StreamTestKit.consumerProbe[Int]
|
||||
p2.produceTo(c)
|
||||
|
|
@ -132,7 +133,7 @@ class StreamTransformRecoverSpec extends AkkaSpec {
|
|||
|
||||
"call onComplete after isComplete signaled completion" in {
|
||||
val p = StreamTestKit.producerProbe[Int]
|
||||
val p2 = Stream(p).transformRecover("")(
|
||||
val p2 = Flow(p).transformRecover("")(
|
||||
(s, in) ⇒ (s + in, List(in.get)),
|
||||
onComplete = x ⇒ List(x.size + 10),
|
||||
isComplete = _ == "Success(1)")
|
||||
|
|
@ -151,8 +152,8 @@ class StreamTransformRecoverSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"report error when exception is thrown" in {
|
||||
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Stream(p).
|
||||
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Flow(p).
|
||||
transformRecover(0) { (_, elem) ⇒
|
||||
if (elem.get == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem.get, elem.get))
|
||||
}.
|
||||
|
|
@ -173,7 +174,7 @@ class StreamTransformRecoverSpec extends AkkaSpec {
|
|||
|
||||
"transform errors when received" in {
|
||||
val p = StreamTestKit.producerProbe[Int]
|
||||
val p2 = Stream(p).transformRecover("")(
|
||||
val p2 = Flow(p).transformRecover("")(
|
||||
{ case (s, Failure(ex)) ⇒ (s + ex.getMessage, List(ex)) },
|
||||
onComplete = x ⇒ List(TE(x.size + "10")))
|
||||
.toProducer(gen)
|
||||
|
|
@ -190,7 +191,7 @@ class StreamTransformRecoverSpec extends AkkaSpec {
|
|||
|
||||
"forward errors when received and thrown" in {
|
||||
val p = StreamTestKit.producerProbe[Int]
|
||||
val p2 = Stream(p).transformRecover("")((_, in) ⇒ "" -> List(in.get)).toProducer(gen)
|
||||
val p2 = Flow(p).transformRecover("")((_, in) ⇒ "" -> List(in.get)).toProducer(gen)
|
||||
val proc = p.expectSubscription()
|
||||
val c = StreamTestKit.consumerProbe[Int]
|
||||
p2.produceTo(c)
|
||||
|
|
@ -203,8 +204,8 @@ class StreamTransformRecoverSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"support cancel as expected" in {
|
||||
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Stream(p).
|
||||
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Flow(p).
|
||||
transformRecover(0) { (_, elem) ⇒ (0, List(elem.get, elem.get)) }.
|
||||
toProducer(gen)
|
||||
val consumer = StreamTestKit.consumerProbe[Int]
|
||||
|
|
@ -8,9 +8,10 @@ import akka.stream.testkit.StreamTestKit
|
|||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.EventFilter
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) {
|
||||
class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) {
|
||||
|
||||
import system.dispatcher
|
||||
|
||||
|
|
@ -20,10 +21,10 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
|
|||
initialFanOutBufferSize = 2,
|
||||
maxFanOutBufferSize = 2))
|
||||
|
||||
"A Stream with transform operations" must {
|
||||
"A Flow with transform operations" must {
|
||||
"produce one-to-one transformation as expected" in {
|
||||
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Stream(p).
|
||||
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Flow(p).
|
||||
transform(0)((tot, elem) ⇒ (tot + elem, List(tot + elem))).
|
||||
toProducer(gen)
|
||||
val consumer = StreamTestKit.consumerProbe[Int]
|
||||
|
|
@ -39,8 +40,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
|
|||
}
|
||||
|
||||
"produce one-to-several transformation as expected" in {
|
||||
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Stream(p).
|
||||
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Flow(p).
|
||||
transform(0)((tot, elem) ⇒ (tot + elem, Vector.fill(elem)(tot + elem))).
|
||||
toProducer(gen)
|
||||
val consumer = StreamTestKit.consumerProbe[Int]
|
||||
|
|
@ -59,8 +60,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
|
|||
}
|
||||
|
||||
"produce dropping transformation as expected" in {
|
||||
val p = Stream(List(1, 2, 3, 4).iterator).toProducer(gen)
|
||||
val p2 = Stream(p).
|
||||
val p = Flow(List(1, 2, 3, 4).iterator).toProducer(gen)
|
||||
val p2 = Flow(p).
|
||||
transform(0)((tot, elem) ⇒ (tot + elem, if (elem % 2 == 0) Nil else List(tot + elem))).
|
||||
toProducer(gen)
|
||||
val consumer = StreamTestKit.consumerProbe[Int]
|
||||
|
|
@ -76,8 +77,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
|
|||
}
|
||||
|
||||
"produce multi-step transformation as expected" in {
|
||||
val p = Stream(List("a", "bc", "def").iterator).toProducer(gen)
|
||||
val p2 = Stream(p).
|
||||
val p = Flow(List("a", "bc", "def").iterator).toProducer(gen)
|
||||
val p2 = Flow(p).
|
||||
transform("") { (str, elem) ⇒
|
||||
val concat = str + elem
|
||||
(concat, List(concat.length))
|
||||
|
|
@ -106,8 +107,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
|
|||
}
|
||||
|
||||
"invoke onComplete when done" in {
|
||||
val p = Stream(List("a").iterator).toProducer(gen)
|
||||
val p2 = Stream(p).transform("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen)
|
||||
val p = Flow(List("a").iterator).toProducer(gen)
|
||||
val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, Nil), x ⇒ List(x + "B")).toProducer(gen)
|
||||
val c = StreamTestKit.consumerProbe[String]
|
||||
p2.produceTo(c)
|
||||
val s = c.expectSubscription()
|
||||
|
|
@ -118,7 +119,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
|
|||
|
||||
"allow cancellation using isComplete" in {
|
||||
val p = StreamTestKit.producerProbe[Int]
|
||||
val p2 = Stream(p).transform("")((s, in) ⇒ (s + in, List(in)), isComplete = _ == "1").toProducer(gen)
|
||||
val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, List(in)), isComplete = _ == "1").toProducer(gen)
|
||||
val proc = p.expectSubscription
|
||||
val c = StreamTestKit.consumerProbe[Int]
|
||||
p2.produceTo(c)
|
||||
|
|
@ -133,7 +134,7 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
|
|||
|
||||
"call onComplete after isComplete signaled completion" in {
|
||||
val p = StreamTestKit.producerProbe[Int]
|
||||
val p2 = Stream(p).transform("")((s, in) ⇒ (s + in, List(in)), onComplete = x ⇒ List(x.size + 10), isComplete = _ == "1").toProducer(gen)
|
||||
val p2 = Flow(p).transform("")((s, in) ⇒ (s + in, List(in)), onComplete = x ⇒ List(x.size + 10), isComplete = _ == "1").toProducer(gen)
|
||||
val proc = p.expectSubscription
|
||||
val c = StreamTestKit.consumerProbe[Int]
|
||||
p2.produceTo(c)
|
||||
|
|
@ -148,8 +149,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
|
|||
}
|
||||
|
||||
"report error when exception is thrown" in {
|
||||
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Stream(p).
|
||||
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Flow(p).
|
||||
transform(0) { (_, elem) ⇒
|
||||
if (elem == 2) throw new IllegalArgumentException("two not allowed") else (0, List(elem, elem))
|
||||
}.
|
||||
|
|
@ -167,8 +168,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
|
|||
}
|
||||
|
||||
"support cancel as expected" in {
|
||||
val p = Stream(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Stream(p).
|
||||
val p = Flow(List(1, 2, 3).iterator).toProducer(gen)
|
||||
val p2 = Flow(p).
|
||||
transform(0) { (_, elem) ⇒ (0, List(elem, elem)) }.
|
||||
toProducer(gen)
|
||||
val consumer = StreamTestKit.consumerProbe[Int]
|
||||
|
|
@ -184,8 +185,8 @@ class StreamTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor
|
|||
}
|
||||
|
||||
"support producing elements from empty inputs" in {
|
||||
val p = Stream(List.empty[Int].iterator).toProducer(gen)
|
||||
val p2 = Stream(p).transform(List(1, 2, 3))((s, _) ⇒ (s, Nil), onComplete = s ⇒ s).
|
||||
val p = Flow(List.empty[Int].iterator).toProducer(gen)
|
||||
val p2 = Flow(p).transform(List(1, 2, 3))((s, _) ⇒ (s, Nil), onComplete = s ⇒ s).
|
||||
toProducer(gen)
|
||||
val consumer = StreamTestKit.consumerProbe[Int]
|
||||
p2.produceTo(consumer)
|
||||
|
|
@ -6,8 +6,9 @@ package akka.stream
|
|||
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
|
||||
import akka.stream.testkit.StreamTestKit
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
class StreamZipSpec extends AkkaSpec {
|
||||
class FlowZipSpec extends AkkaSpec {
|
||||
|
||||
val gen = new ActorBasedProcessorGenerator(GeneratorSettings(
|
||||
initialInputBufferSize = 2,
|
||||
|
|
@ -19,9 +20,9 @@ class StreamZipSpec extends AkkaSpec {
|
|||
|
||||
"work in the happy case" in {
|
||||
// Different input sizes (4 and 6)
|
||||
val source1 = Stream((1 to 4).iterator).toProducer(gen)
|
||||
val source2 = Stream(List("A", "B", "C", "D", "E", "F").iterator).toProducer(gen)
|
||||
val p = Stream(source1).zip(source2).toProducer(gen)
|
||||
val source1 = Flow((1 to 4).iterator).toProducer(gen)
|
||||
val source2 = Flow(List("A", "B", "C", "D", "E", "F").iterator).toProducer(gen)
|
||||
val p = Flow(source1).zip(source2).toProducer(gen)
|
||||
|
||||
val probe = StreamTestKit.consumerProbe[(Int, String)]
|
||||
p.produceTo(probe)
|
||||
|
|
@ -14,6 +14,7 @@ import akka.stream.impl.Ast
|
|||
import akka.testkit.TestEvent
|
||||
import akka.testkit.EventFilter
|
||||
import akka.stream.impl.ActorBasedProcessorGenerator
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
class IdentityProcessorTest extends IdentityProcessorVerification[Int] with WithActorSystem with TestNGSuiteLike {
|
||||
|
||||
|
|
@ -40,7 +41,7 @@ class IdentityProcessorTest extends IdentityProcessorVerification[Int] with With
|
|||
val gen = ProcessorGenerator(GeneratorSettings(
|
||||
maximumInputBufferSize = 512))(system)
|
||||
val iter = Iterator from 1000
|
||||
Stream(if (elements > 0) iter take elements else iter).toProducer(gen).getPublisher
|
||||
Flow(if (elements > 0) iter take elements else iter).toProducer(gen).getPublisher
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import org.scalatest.testng.TestNGSuiteLike
|
|||
import org.reactivestreams.spi.Publisher
|
||||
import org.reactivestreams.tck.PublisherVerification
|
||||
import scala.collection.immutable
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
class IterableProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike {
|
||||
|
||||
|
|
@ -19,10 +20,10 @@ class IterableProducerTest extends PublisherVerification[Int] with WithActorSyst
|
|||
new immutable.Iterable[Int] { override def iterator = Iterator from 0 }
|
||||
else
|
||||
0 until elements
|
||||
Stream(iterable).toProducer(gen).getPublisher
|
||||
Flow(iterable).toProducer(gen).getPublisher
|
||||
}
|
||||
|
||||
override def createCompletedStatePublisher(): Publisher[Int] =
|
||||
Stream[Int](Nil).toProducer(gen).getPublisher
|
||||
Flow[Int](Nil).toProducer(gen).getPublisher
|
||||
|
||||
}
|
||||
|
|
@ -6,6 +6,7 @@ package akka.stream
|
|||
import org.scalatest.testng.TestNGSuiteLike
|
||||
import org.reactivestreams.spi.Publisher
|
||||
import org.reactivestreams.tck.PublisherVerification
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
class IteratorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike {
|
||||
|
||||
|
|
@ -18,10 +19,10 @@ class IteratorProducerTest extends PublisherVerification[Int] with WithActorSyst
|
|||
Iterator from 0
|
||||
else
|
||||
(Iterator from 0).take(elements)
|
||||
Stream(iter).toProducer(gen).getPublisher
|
||||
Flow(iter).toProducer(gen).getPublisher
|
||||
}
|
||||
|
||||
override def createCompletedStatePublisher(): Publisher[Int] =
|
||||
Stream(List.empty[Int].iterator).toProducer(gen).getPublisher
|
||||
Flow(List.empty[Int].iterator).toProducer(gen).getPublisher
|
||||
|
||||
}
|
||||
|
|
@ -3,14 +3,15 @@
|
|||
*/
|
||||
package akka.stream.testkit
|
||||
|
||||
import akka.stream.{ GeneratorSettings, Stream, ProcessorGenerator }
|
||||
import akka.stream.{ GeneratorSettings, ProcessorGenerator }
|
||||
import akka.actor.ActorSystem
|
||||
import akka.stream.scala_api.Flow
|
||||
|
||||
class ChainSetup[I, O](stream: Stream[I] ⇒ Stream[O], val settings: GeneratorSettings)(implicit val system: ActorSystem) {
|
||||
class ChainSetup[I, O](stream: Flow[I] ⇒ Flow[O], val settings: GeneratorSettings)(implicit val system: ActorSystem) {
|
||||
val upstream = StreamTestKit.producerProbe[I]()
|
||||
val downstream = StreamTestKit.consumerProbe[O]()
|
||||
|
||||
private val s = stream(Stream(upstream))
|
||||
private val s = stream(Flow(upstream))
|
||||
val producer = s.toProducer(ProcessorGenerator(settings))
|
||||
val upstreamSubscription = upstream.expectSubscription()
|
||||
producer.produceTo(downstream)
|
||||
|
|
|
|||
|
|
@ -10,7 +10,8 @@ import scala.collection.immutable
|
|||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NonFatal
|
||||
import akka.stream._
|
||||
import akka.stream.scala_api.Flow
|
||||
import akka.stream.GeneratorSettings
|
||||
|
||||
trait ScriptedTest extends ShouldMatchers {
|
||||
|
||||
|
|
@ -76,7 +77,7 @@ trait ScriptedTest extends ShouldMatchers {
|
|||
}
|
||||
|
||||
class ScriptRunner[In, Out](
|
||||
op: Stream[In] ⇒ Stream[Out],
|
||||
op: Flow[In] ⇒ Flow[Out],
|
||||
gen: GeneratorSettings,
|
||||
script: Script[In, Out],
|
||||
maximumOverrun: Int,
|
||||
|
|
@ -186,7 +187,7 @@ trait ScriptedTest extends ShouldMatchers {
|
|||
}
|
||||
|
||||
def runScript[In, Out](script: Script[In, Out], gen: GeneratorSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)(
|
||||
op: Stream[In] ⇒ Stream[Out])(implicit system: ActorSystem): Unit = {
|
||||
op: Flow[In] ⇒ Flow[Out])(implicit system: ActorSystem): Unit = {
|
||||
new ScriptRunner(op, gen, script, maximumOverrun, maximumRequest, maximumBuffer).run()
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue