+str #17383 implement intersperse

This commit is contained in:
Konrad Malawski 2015-10-16 01:55:20 +02:00
parent 99158f515c
commit 61c2213e02
10 changed files with 368 additions and 3 deletions

View file

@ -105,6 +105,53 @@ public class FlowTest extends StreamTest {
Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS));
}
@Test
public void mustBeAbleToUseIntersperse() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Source<String, ?> source = Source.from(Arrays.asList("0", "1", "2", "3"));
final Flow<String, String, ?> flow = Flow.of(String.class).intersperse("[", ",", "]");
final Future<BoxedUnit> future = source.via(flow).runWith(Sink.foreach(new Procedure<String>() { // Scala Future
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}), materializer);
probe.expectMsgEquals("[");
probe.expectMsgEquals("0");
probe.expectMsgEquals(",");
probe.expectMsgEquals("1");
probe.expectMsgEquals(",");
probe.expectMsgEquals("2");
probe.expectMsgEquals(",");
probe.expectMsgEquals("3");
probe.expectMsgEquals("]");
Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS));
}
@Test
public void mustBeAbleToUseIntersperseAndConcat() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final Source<String, ?> source = Source.from(Arrays.asList("0", "1", "2", "3"));
final Flow<String, String, ?> flow = Flow.of(String.class).intersperse(",");
final Future<BoxedUnit> future = Source.single(">> ").concat(source.via(flow)).runWith(Sink.foreach(new Procedure<String>() { // Scala Future
public void apply(String elem) {
probe.getRef().tell(elem, ActorRef.noSender());
}
}), materializer);
probe.expectMsgEquals(">> ");
probe.expectMsgEquals("0");
probe.expectMsgEquals(",");
probe.expectMsgEquals("1");
probe.expectMsgEquals(",");
probe.expectMsgEquals("2");
probe.expectMsgEquals(",");
probe.expectMsgEquals("3");
Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS));
}
@Test
public void mustBeAbleToUseTakeWhile() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);

View file

@ -0,0 +1,79 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.testkit._
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import org.scalatest.concurrent.ScalaFutures
import scala.concurrent.duration._
class FlowIntersperseSpec extends AkkaSpec with ScalaFutures {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
implicit val materializer = ActorMaterializer(settings)
"A Intersperse" must {
"inject element between existing elements" in {
val probe = Source(List(1, 2, 3))
.map(_.toString)
.intersperse(",")
.runWith(TestSink.probe)
probe.expectSubscription()
probe.toStrict(1.second).mkString("") should ===(List(1, 2, 3).mkString(","))
}
"inject element between existing elements, when downstream is fold" in {
val concated = Source(List(1, 2, 3))
.map(_.toString)
.intersperse(",")
.runFold("")(_ + _)
concated.futureValue should ===("1,2,3")
}
"inject element between existing elements, and surround with []" in {
val probe = Source(List(1, 2, 3))
.map(_.toString)
.intersperse("[", ",", "]")
.runWith(TestSink.probe)
probe.toStrict(1.second).mkString("") should ===(List(1, 2, 3).mkString("[", ",", "]"))
}
"demonstrate how to prepend only" in {
val probe = (
Source.single(">> ") ++ Source(List("1", "2", "3")).intersperse(","))
.runWith(TestSink.probe)
probe.toStrict(1.second).mkString("") should ===(List(1, 2, 3).mkString(">> ", ",", ""))
}
"surround empty stream with []" in {
val probe = Source(List())
.map(_.toString)
.intersperse("[", ",", "]")
.runWith(TestSink.probe)
probe.expectSubscription()
probe.toStrict(1.second).mkString("") should ===(List().mkString("[", ",", "]"))
}
"surround single element stream with []" in {
val probe = Source(List(1))
.map(_.toString)
.intersperse("[", ",", "]")
.runWith(TestSink.probe)
probe.expectSubscription()
probe.toStrict(1.second).mkString("") should ===(List(1).mkString("[", ",", "]"))
}
}
}

View file

@ -314,6 +314,7 @@ private[akka] object ActorProcessorFactory {
case Collect(pf, _) interp(fusing.Collect(pf, settings.supervisionDecider))
case Scan(z, f, _) interp(fusing.Scan(z, f, settings.supervisionDecider))
case Fold(z, f, _) interp(fusing.Fold(z, f, settings.supervisionDecider))
case Intersperse(s, i, e, _) interp(fusing.Intersperse(s, i, e))
case Recover(pf, _) interp(fusing.Recover(pf))
case Expand(s, f, _) interp(fusing.Expand(s, f))
case Conflate(s, f, _) interp(fusing.Conflate(s, f, settings.supervisionDecider))

View file

@ -26,6 +26,8 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] {
* INTERNAL API
*/
private[akka] final case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] {
ReactiveStreamsCompliance.requireNonNullElement(t)
import ReactiveStreamsCompliance._
override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit =
try {
@ -42,6 +44,8 @@ private[akka] final case class ErrorPublisher(t: Throwable, name: String) extend
private[akka] final case class SingleElementPublisher[T](value: T, name: String) extends Publisher[T] {
import ReactiveStreamsCompliance._
requireNonNullElement(value)
private[this] class SingleElementSubscription(subscriber: Subscriber[_ >: T]) extends Subscription {
private[this] var done: Boolean = false
override def cancel(): Unit = done = true

View file

@ -36,6 +36,7 @@ private[stream] object Stages {
val dropWhile = name("dropWhile")
val scan = name("scan")
val fold = name("fold")
val intersperse = name("intersperse")
val buffer = name("buffer")
val conflate = name("conflate")
val expand = name("expand")
@ -176,6 +177,10 @@ private[stream] object Stages {
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
}
final case class Intersperse(start: Option[Any], inject: Any, end: Option[Any], attributes: Attributes = intersperse) extends StageModule {
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
}
final case class Buffer(size: Int, overflowStrategy: OverflowStrategy, attributes: Attributes = buffer) extends StageModule {
require(size > 0, s"Buffer size must be larger than zero but was [$size]")

View file

@ -236,6 +236,49 @@ private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, de
override def restart(): Fold[In, Out] = copy()
}
/**
* INTERNAL API
*/
private[akka] final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends StatefulStage[T, T] {
private var needsToEmitStart = start.isDefined
override def initial: StageState[T, T] =
start match {
case Some(initial) firstWithInitial(initial)
case _ first
}
def firstWithInitial(initial: T) = new StageState[T, T] {
override def onPush(elem: T, ctx: Context[T]) = {
needsToEmitStart = false
emit(Iterator(initial, elem), ctx, running)
}
}
def first = new StageState[T, T] {
override def onPush(elem: T, ctx: Context[T]) = {
become(running)
ctx.push(elem)
}
}
def running = new StageState[T, T] {
override def onPush(elem: T, ctx: Context[T]): SyncDirective =
emit(Iterator(inject, elem), ctx)
}
override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = {
end match {
case Some(e) if needsToEmitStart
terminationEmit(Iterator(start.get, end.get), ctx)
case Some(e)
terminationEmit(Iterator(end.get), ctx)
case _
terminationEmit(Iterator(), ctx)
}
}
}
/**
* INTERNAL API
*/

View file

@ -5,7 +5,8 @@ package akka.stream.javadsl
import akka.event.LoggingAdapter
import akka.japi.{ Pair, function }
import akka.stream.impl.StreamLayout
import akka.stream.impl.Stages.Intersperse
import akka.stream.impl.{ ReactiveStreamsCompliance, StreamLayout }
import akka.stream.{ scaladsl, _ }
import akka.stream.stage.Stage
import org.reactivestreams.Processor
@ -406,6 +407,65 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph
def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.fold(zero)(f.apply))
/**
* Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]]
* injects a separator between a List's elements.
*
* Additionally can inject start and end marker elements to stream.
*
* Examples:
*
* {{{
* Source<Integer, ?> nums = Source.from(Arrays.asList(0, 1, 2, 3));
* nums.intersperse(","); // 1 , 2 , 3
* nums.intersperse("[", ",", "]"); // [ 1 , 2 , 3 ]
* }}}
*
* In case you want to only prepend or only append an element (yet still use the `intercept` feature
* to inject a separator between elements, you may want to use the following pattern instead of the 3-argument
* version of intersperse (See [[Source.concat]] for semantics details):
*
* {{{
* Source.single(">> ").concat(flow.intersperse(","))
* flow.intersperse(",").concat(Source.single("END"))
* }}}
*
* '''Emits when''' upstream emits (or before with the `start` element if provided)
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def intersperse[T >: Out](start: T, inject: T, end: T): javadsl.Flow[In, T, Mat] =
new Flow(delegate.intersperse(start, inject, end))
/**
* Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]]
* injects a separator between a List's elements.
*
* Additionally can inject start and end marker elements to stream.
*
* Examples:
*
* {{{
* Source<Integer, ?> nums = Source.from(Arrays.asList(0, 1, 2, 3));
* nums.intersperse(","); // 1 , 2 , 3
* nums.intersperse("[", ",", "]"); // [ 1 , 2 , 3 ]
* }}}
*
* '''Emits when''' upstream emits (or before with the `start` element if provided)
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def intersperse[T >: Out](inject: T): javadsl.Flow[In, T, Mat] =
new Flow(delegate.intersperse(inject))
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the given number of elements, whatever happens first.

View file

@ -553,6 +553,64 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] =
new Source(delegate.fold(zero)(f.apply))
/**
* Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]]
* injects a separator between a List's elements.
*
* Additionally can inject start and end marker elements to stream.
*
* Examples:
*
* {{{
* Source<Integer, ?> nums = Source.from(Arrays.asList(0, 1, 2, 3));
* nums.intersperse(","); // 1 , 2 , 3
* nums.intersperse("[", ",", "]"); // [ 1 , 2 , 3 ]
* }}}
*
* In case you want to only prepend or only append an element (yet still use the `intercept` feature
* to inject a separator between elements, you may want to use the following pattern instead of the 3-argument
* version of intersperse (See [[Source.concat]] for semantics details):
*
* {{{
* Source.single(">> ").concat(list.intersperse(","))
* list.intersperse(",").concat(Source.single("END"))
* }}}
* '''Emits when''' upstream emits (or before with the `start` element if provided)
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def intersperse[T >: Out](start: T, inject: T, end: T): javadsl.Source[T, Mat] =
new Source(delegate.intersperse(start, inject, end))
/**
* Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]]
* injects a separator between a List's elements.
*
* Additionally can inject start and end marker elements to stream.
*
* Examples:
*
* {{{
* Source<Integer, ?> nums = Source.from(Arrays.asList(0, 1, 2, 3));
* nums.intersperse(","); // 1 , 2 , 3
* nums.intersperse("[", ",", "]"); // [ 1 , 2 , 3 ]
* }}}
*
* '''Emits when''' upstream emits (or before with the `start` element if provided)
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def intersperse[T >: Out](inject: T): javadsl.Source[T, Mat] =
new Source(delegate.intersperse(inject))
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the given number of elements, whatever happens first.

View file

@ -10,7 +10,7 @@ import akka.stream.impl.SplitDecision._
import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin }
import akka.stream.impl.{ Stages, StreamLayout }
import akka.stream.impl.{ ReactiveStreamsCompliance, Stages, StreamLayout }
import akka.stream.stage._
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
@ -604,6 +604,71 @@ trait FlowOps[+Out, +Mat] {
*/
def fold[T](zero: T)(f: (T, Out) T): Repr[T, Mat] = andThen(Fold(zero, f.asInstanceOf[(Any, Any) Any]))
/**
* Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]]
* injects a separator between a List's elements.
*
* Additionally can inject start and end marker elements to stream.
*
* Examples:
*
* {{{
* val nums = Source(List(1,2,3)).map(_.toString)
* nums.intersperse(",") // 1 , 2 , 3
* nums.intersperse("[", ",", "]") // [ 1 , 2 , 3 ]
* }}}
*
* In case you want to only prepend or only append an element (yet still use the `intercept` feature
* to inject a separator between elements, you may want to use the following pattern instead of the 3-argument
* version of intersperse (See [[Source.concat]] for semantics details):
*
* {{{
* Source.single(">> ") ++ Source(List("1", "2", "3")).intersperse(",")
* Source(List("1", "2", "3")).intersperse(",") ++ Source.single("END")
* }}}
*
* '''Emits when''' upstream emits (or before with the `start` element if provided)
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def intersperse[T >: Out](start: T, inject: T, end: T): Repr[T, Mat] = {
ReactiveStreamsCompliance.requireNonNullElement(start)
ReactiveStreamsCompliance.requireNonNullElement(inject)
ReactiveStreamsCompliance.requireNonNullElement(end)
andThen(Intersperse(Some(start), inject, Some(end)))
}
/**
* Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]]
* injects a separator between a List's elements.
*
* Additionally can inject start and end marker elements to stream.
*
* Examples:
*
* {{{
* val nums = Source(List(1,2,3)).map(_.toString)
* nums.intersperse(",") // 1 , 2 , 3
* nums.intersperse("[", ",", "]") // [ 1 , 2 , 3 ]
* }}}
*
* '''Emits when''' upstream emits (or before with the `start` element if provided)
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def intersperse[T >: Out](inject: T): Repr[T, Mat] = {
ReactiveStreamsCompliance.requireNonNullElement(inject)
andThen(Intersperse(None, inject, None))
}
/**
* Chunk up this stream into groups of elements received within a time window,
* or limited by the given number of elements, whatever happens first.

View file

@ -233,16 +233,19 @@ object Source extends SourceApply {
/**
* Create a `Source` that will continually emit the given element.
*/
def repeat[T](element: T): Source[T, Unit] =
def repeat[T](element: T): Source[T, Unit] = {
ReactiveStreamsCompliance.requireNonNullElement(element)
new Source(
new PublisherSource(
SingleElementPublisher(
new immutable.Iterable[T] {
override val iterator: Iterator[T] = Iterator.continually(element)
override def toString: String = "repeat(" + element + ")"
}, "RepeatSource"),
DefaultAttributes.repeat,
shape("RepeatSource"))).mapConcat(id)
}
/**
* A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.