2014-08-26 17:59:12 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
|
|
|
|
*/
|
|
|
|
|
package akka.stream.scaladsl2
|
2014-08-18 11:28:30 +03:00
|
|
|
|
2014-08-22 12:15:04 +03:00
|
|
|
import akka.stream.impl.Ast
|
|
|
|
|
import org.reactivestreams.{ Subscriber, Publisher }
|
|
|
|
|
|
2014-08-20 09:06:46 +03:00
|
|
|
import scala.collection.immutable
|
2014-08-18 11:28:30 +03:00
|
|
|
import scala.concurrent.Future
|
2014-08-20 09:06:46 +03:00
|
|
|
import scala.concurrent.duration.FiniteDuration
|
2014-08-22 12:15:04 +03:00
|
|
|
import akka.stream.{ Transformer, OverflowStrategy, FlattenStrategy }
|
2014-08-18 11:28:30 +03:00
|
|
|
|
2014-08-20 09:06:46 +03:00
|
|
|
sealed trait Flow[-In, +Out] {
|
2014-08-18 11:28:30 +03:00
|
|
|
val transform: Transform[In, Out]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
object From {
|
|
|
|
|
/**
|
|
|
|
|
* Helper to create Flow without Input.
|
|
|
|
|
* Example usage: From[Int]
|
|
|
|
|
*/
|
2014-08-26 18:11:11 +02:00
|
|
|
def apply[T]: ProcessorFlow[T, T] = ProcessorFlow[T, T](EmptyTransform[T, T]())
|
2014-08-18 11:28:30 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Helper to create Flow with Input from Iterable.
|
|
|
|
|
* Example usage: Flow(Seq(1,2,3))
|
|
|
|
|
*/
|
2014-08-26 18:11:11 +02:00
|
|
|
def apply[T](i: immutable.Iterable[T]): PublisherFlow[T, T] = From[T].withInput(IterableIn(i))
|
2014-08-18 11:28:30 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Helper to create Flow with Input from Future.
|
|
|
|
|
* Example usage: Flow(Future { 1 })
|
|
|
|
|
*/
|
2014-08-26 18:11:11 +02:00
|
|
|
def apply[T](f: Future[T]): PublisherFlow[T, T] = From[T].withInput(FutureIn(f))
|
2014-08-22 12:15:04 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Helper to create Flow with Input from Publisher.
|
|
|
|
|
*/
|
2014-08-26 18:11:11 +02:00
|
|
|
def apply[T](p: Publisher[T]): PublisherFlow[T, T] = From[T].withInput(PublisherIn(p))
|
2014-08-18 11:28:30 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait Input[-In]
|
|
|
|
|
|
2014-08-22 12:15:04 +03:00
|
|
|
/**
|
|
|
|
|
* Default input.
|
|
|
|
|
* Allows to materialize a Flow with this input to Subscriber.
|
|
|
|
|
*/
|
2014-08-22 14:19:41 +03:00
|
|
|
final case class SubscriberIn[-In]() extends Input[In] {
|
|
|
|
|
def subscriber[I <: In]: Subscriber[I] = ???
|
|
|
|
|
}
|
2014-08-22 12:15:04 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Input from Publisher.
|
|
|
|
|
*/
|
|
|
|
|
final case class PublisherIn[-In](p: Publisher[_ >: In]) extends Input[In]
|
|
|
|
|
|
2014-08-18 11:28:30 +03:00
|
|
|
/**
|
|
|
|
|
* Input from Iterable
|
|
|
|
|
*
|
|
|
|
|
* Changing In from Contravariant to Covariant is needed because Iterable[+A].
|
|
|
|
|
* But this brakes IterableIn variance and we get IterableIn(Seq(1,2,3)): IterableIn[Any]
|
|
|
|
|
*/
|
2014-08-20 09:06:46 +03:00
|
|
|
final case class IterableIn[-In](i: immutable.Iterable[_ >: In]) extends Input[In]
|
2014-08-18 11:28:30 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Input from Future
|
|
|
|
|
*
|
|
|
|
|
* Changing In from Contravariant to Covariant is needed because Future[+A].
|
|
|
|
|
* But this brakes FutureIn variance and we get FutureIn(Future{1}): FutureIn[Any]
|
|
|
|
|
*/
|
2014-08-20 09:06:46 +03:00
|
|
|
final case class FutureIn[-In](f: Future[_ >: In]) extends Input[In]
|
2014-08-18 11:28:30 +03:00
|
|
|
|
2014-08-22 14:19:41 +03:00
|
|
|
trait Output[+Out]
|
2014-08-18 11:28:30 +03:00
|
|
|
|
2014-08-22 12:15:04 +03:00
|
|
|
/**
|
2014-08-22 14:19:41 +03:00
|
|
|
* Default output.
|
2014-08-22 12:15:04 +03:00
|
|
|
* Allows to materialize a Flow with this output to Publisher.
|
|
|
|
|
*/
|
2014-08-22 14:19:41 +03:00
|
|
|
final case class PublisherOut[+Out]() extends Output[Out] {
|
|
|
|
|
def publisher[O >: Out]: Publisher[O] = ???
|
|
|
|
|
}
|
2014-08-18 11:28:30 +03:00
|
|
|
|
2014-08-22 12:15:04 +03:00
|
|
|
/**
|
|
|
|
|
* Output to a Subscriber.
|
|
|
|
|
*/
|
|
|
|
|
final case class SubscriberOut[+Out](s: Subscriber[_ <: Out]) extends Output[Out]
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Fold output. Reduces output stream according to the given fold function.
|
|
|
|
|
*/
|
|
|
|
|
final case class FoldOut[T, +Out](zero: T)(f: (T, Out) ⇒ T) extends Output[Out] {
|
2014-08-22 14:19:41 +03:00
|
|
|
def future: Future[T] = ???
|
2014-08-22 12:15:04 +03:00
|
|
|
}
|
|
|
|
|
|
2014-08-18 11:28:30 +03:00
|
|
|
/**
|
|
|
|
|
* Operations with a Flow which has open (no attached) Input.
|
|
|
|
|
*
|
|
|
|
|
* No Out type parameter would be useful for Graph signatures, but we need it here
|
|
|
|
|
* for `withInput` and `prependTransform` methods.
|
|
|
|
|
*/
|
2014-08-20 09:06:46 +03:00
|
|
|
sealed trait HasOpenInput[-In, +Out] {
|
|
|
|
|
type Repr[-In, +Out] <: HasOpenInput[In, Out]
|
2014-08-18 11:28:30 +03:00
|
|
|
type AfterCloseInput[-In, +Out] <: Flow[In, Out]
|
|
|
|
|
|
|
|
|
|
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out]
|
|
|
|
|
protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out]
|
|
|
|
|
|
|
|
|
|
// linear combinators with flows
|
2014-08-26 18:11:11 +02:00
|
|
|
def prepend[T](f: ProcessorFlow[T, In]): Repr[T, Out] =
|
2014-08-18 11:28:30 +03:00
|
|
|
prependTransform(f.transform)
|
2014-08-26 18:11:11 +02:00
|
|
|
def prepend[T](f: PublisherFlow[T, In]): Repr[T, Out]#AfterCloseInput[T, Out] =
|
2014-08-18 11:28:30 +03:00
|
|
|
prependTransform(f.transform).withInput(f.input)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Operations with a Flow which has open (no attached) Output.
|
|
|
|
|
*
|
|
|
|
|
* No In type parameter would be useful for Graph signatures, but we need it here
|
|
|
|
|
* for `withOutput` and `appendTransform` methods.
|
|
|
|
|
*/
|
2014-08-20 09:06:46 +03:00
|
|
|
trait HasOpenOutput[-In, +Out] {
|
|
|
|
|
type Repr[-In, +Out] <: HasOpenOutput[In, Out]
|
2014-08-18 11:28:30 +03:00
|
|
|
type AfterCloseOutput[-In, +Out] <: Flow[In, Out]
|
|
|
|
|
|
|
|
|
|
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O]
|
2014-08-20 09:06:46 +03:00
|
|
|
protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T]
|
2014-08-18 11:28:30 +03:00
|
|
|
|
|
|
|
|
// linear simple combinators
|
2014-08-20 09:06:46 +03:00
|
|
|
def map[T](f: Out ⇒ T): Repr[In, T] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, T]())
|
|
|
|
|
def mapFuture[T](f: Out ⇒ Future[T]): Repr[In, T] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, T]())
|
|
|
|
|
def filter(p: Out ⇒ Boolean): Repr[In, Out] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, Out]())
|
|
|
|
|
def collect[T](pf: PartialFunction[Out, T]): Repr[In, T] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, T]())
|
|
|
|
|
def drop(n: Int): Repr[In, Out] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, Out]())
|
|
|
|
|
def dropWithin(d: FiniteDuration): Repr[In, Out] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, Out]())
|
|
|
|
|
def take(n: Int): Repr[In, Out] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, Out]())
|
|
|
|
|
def takeWithin(d: FiniteDuration): Repr[In, Out] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, Out]())
|
|
|
|
|
def grouped(n: Int): Repr[In, immutable.Seq[Out]] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, immutable.Seq[Out]]())
|
|
|
|
|
def groupedWithin(n: Int, d: FiniteDuration): Repr[In, immutable.Seq[Out]] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, immutable.Seq[Out]]())
|
|
|
|
|
def mapConcat[T](f: Out ⇒ immutable.Seq[T]): Repr[In, T] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, T]())
|
2014-08-22 12:15:04 +03:00
|
|
|
def transform[T](transformer: Transformer[Out, T]): Repr[In, T] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, T]())
|
2014-08-20 09:06:46 +03:00
|
|
|
def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Repr[In, S] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, S]())
|
|
|
|
|
def expand[S, O](seed: Out ⇒ S, extrapolate: S ⇒ (O, S)): Repr[In, O] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, O]())
|
|
|
|
|
def buffer(size: Int, overflowStrategy: OverflowStrategy): Repr[In, Out] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, Out]())
|
|
|
|
|
|
|
|
|
|
// linear combinators which produce multiple flows
|
2014-08-26 18:11:11 +02:00
|
|
|
def prefixAndTail[O >: Out](n: Int): Repr[In, (immutable.Seq[O], PublisherFlow[O, O])] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, (immutable.Seq[O], PublisherFlow[O, O])]())
|
|
|
|
|
def groupBy[O >: Out, K](f: O ⇒ K): Repr[In, (K, PublisherFlow[O, O])] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, (K, PublisherFlow[O, O])]())
|
|
|
|
|
def splitWhen[O >: Out](p: Out ⇒ Boolean): Repr[In, PublisherFlow[O, O]] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, PublisherFlow[O, O]]())
|
2014-08-20 09:06:46 +03:00
|
|
|
|
|
|
|
|
// linear combinators which consume multiple flows
|
2014-08-22 12:15:04 +03:00
|
|
|
def flatten[T](strategy: FlattenStrategy[Out, T]): Repr[In, T] =
|
|
|
|
|
appendTransform(EmptyTransform[Out, T]())
|
2014-08-18 11:28:30 +03:00
|
|
|
|
|
|
|
|
// linear combinators with flows
|
2014-08-26 18:11:11 +02:00
|
|
|
def append[T](f: ProcessorFlow[Out, T]): Repr[In, T] =
|
2014-08-20 09:06:46 +03:00
|
|
|
appendTransform(f.transform)
|
2014-08-26 18:11:11 +02:00
|
|
|
def append[T](f: SubscriberFlow[Out, T]): Repr[In, T]#AfterCloseOutput[In, T] =
|
2014-08-20 09:06:46 +03:00
|
|
|
appendTransform(f.transform).withOutput(f.output)
|
2014-08-18 11:28:30 +03:00
|
|
|
}
|
|
|
|
|
|
2014-08-26 18:11:11 +02:00
|
|
|
/**
|
|
|
|
|
* Flow without attached input and without attached output, can be used as a `Processor`.
|
|
|
|
|
*/
|
|
|
|
|
final case class ProcessorFlow[-In, +Out](transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] with HasOpenInput[In, Out] {
|
|
|
|
|
override type Repr[-In, +Out] = ProcessorFlow[In, Out]
|
|
|
|
|
type AfterCloseOutput[-In, +Out] = SubscriberFlow[In, Out]
|
|
|
|
|
type AfterCloseInput[-In, +Out] = PublisherFlow[In, Out]
|
2014-08-18 11:28:30 +03:00
|
|
|
|
2014-08-26 18:11:11 +02:00
|
|
|
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = SubscriberFlow(out, transform)
|
|
|
|
|
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = PublisherFlow(in, transform)
|
2014-08-18 11:28:30 +03:00
|
|
|
|
2014-08-26 18:11:11 +02:00
|
|
|
protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] = ProcessorFlow(t ++ transform)
|
|
|
|
|
protected def appendTransform[T](t: Transform[Out, T]): Repr[In, T] = ProcessorFlow(transform ++ t)
|
2014-08-18 11:28:30 +03:00
|
|
|
}
|
|
|
|
|
|
2014-08-26 18:11:11 +02:00
|
|
|
/**
|
|
|
|
|
* Flow with attached output, can be used as a `Subscriber`.
|
|
|
|
|
*/
|
|
|
|
|
final case class SubscriberFlow[-In, +Out](output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenInput[In, Out] {
|
|
|
|
|
type Repr[-In, +Out] = SubscriberFlow[In, Out]
|
|
|
|
|
type AfterCloseInput[-In, +Out] = RunnableFlow[In, Out]
|
2014-08-18 11:28:30 +03:00
|
|
|
|
2014-08-26 18:11:11 +02:00
|
|
|
def withInput[I <: In](in: Input[I]): AfterCloseInput[I, Out] = RunnableFlow(in, output, transform)
|
|
|
|
|
def withoutOutput: ProcessorFlow[In, Out] = ProcessorFlow(transform)
|
2014-08-18 11:28:30 +03:00
|
|
|
|
2014-08-20 09:06:46 +03:00
|
|
|
protected def prependTransform[T](t: Transform[T, In]): Repr[T, Out] =
|
2014-08-26 18:11:11 +02:00
|
|
|
SubscriberFlow(output, t ++ transform)
|
2014-08-18 11:28:30 +03:00
|
|
|
}
|
|
|
|
|
|
2014-08-26 18:11:11 +02:00
|
|
|
/**
|
|
|
|
|
* Flow with attached input, can be used as a `Publisher`.
|
|
|
|
|
*/
|
|
|
|
|
final case class PublisherFlow[-In, +Out](input: Input[In], transform: Transform[In, Out]) extends Flow[In, Out] with HasOpenOutput[In, Out] {
|
|
|
|
|
override type Repr[-In, +Out] = PublisherFlow[In, Out]
|
|
|
|
|
type AfterCloseOutput[-In, +Out] = RunnableFlow[In, Out]
|
2014-08-18 11:28:30 +03:00
|
|
|
|
2014-08-26 18:11:11 +02:00
|
|
|
def withOutput[O >: Out](out: Output[O]): AfterCloseOutput[In, O] = RunnableFlow(input, out, transform)
|
|
|
|
|
def withoutInput: ProcessorFlow[In, Out] = ProcessorFlow(transform)
|
2014-08-18 11:28:30 +03:00
|
|
|
|
2014-08-26 18:11:11 +02:00
|
|
|
protected def appendTransform[T](t: Transform[Out, T]) = PublisherFlow(input, transform ++ t)
|
2014-08-18 11:28:30 +03:00
|
|
|
}
|
|
|
|
|
|
2014-08-26 18:11:11 +02:00
|
|
|
/**
|
|
|
|
|
* Flow with attached input and output, can be executed.
|
|
|
|
|
*/
|
|
|
|
|
final case class RunnableFlow[-In, +Out](input: Input[In], output: Output[Out], transform: Transform[In, Out]) extends Flow[In, Out] {
|
|
|
|
|
def withoutOutput: PublisherFlow[In, Out] = PublisherFlow(input, transform)
|
|
|
|
|
def withoutInput: SubscriberFlow[In, Out] = SubscriberFlow(output, transform)
|
2014-08-18 11:28:30 +03:00
|
|
|
|
|
|
|
|
def run(): Unit = ()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
trait Transform[-In, +Out] {
|
|
|
|
|
def ++[T](t: Transform[Out, T]): Transform[In, T] = EmptyTransform[In, T]()
|
|
|
|
|
}
|
2014-08-20 09:06:46 +03:00
|
|
|
final case class EmptyTransform[-In, +Out]() extends Transform[In, Out]
|
2014-08-22 12:15:04 +03:00
|
|
|
|
|
|
|
|
object FlattenStrategy {
|
2014-08-26 18:11:11 +02:00
|
|
|
def concatPublisherFlow[In, Out]: FlattenStrategy[PublisherFlow[In, Out], Out] = ConcatPublisherFlow[In, Out]()
|
|
|
|
|
def concatProcessorFlow[In, Out]: FlattenStrategy[ProcessorFlow[In, Out], Out] = ConcatProcessorFlow[In, Out]()
|
2014-08-22 12:15:04 +03:00
|
|
|
|
2014-08-26 18:11:11 +02:00
|
|
|
final case class ConcatPublisherFlow[In, Out]() extends FlattenStrategy[PublisherFlow[In, Out], Out]
|
|
|
|
|
final case class ConcatProcessorFlow[In, Out]() extends FlattenStrategy[ProcessorFlow[In, Out], Out]
|
2014-08-22 12:15:04 +03:00
|
|
|
}
|