diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 71e716159d..5112edcede 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -105,6 +105,53 @@ public class FlowTest extends StreamTest { Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); } + @Test + public void mustBeAbleToUseIntersperse() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Source source = Source.from(Arrays.asList("0", "1", "2", "3")); + final Flow flow = Flow.of(String.class).intersperse("[", ",", "]"); + + final Future future = source.via(flow).runWith(Sink.foreach(new Procedure() { // Scala Future + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }), materializer); + + probe.expectMsgEquals("["); + probe.expectMsgEquals("0"); + probe.expectMsgEquals(","); + probe.expectMsgEquals("1"); + probe.expectMsgEquals(","); + probe.expectMsgEquals("2"); + probe.expectMsgEquals(","); + probe.expectMsgEquals("3"); + probe.expectMsgEquals("]"); + Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); + } + + @Test + public void mustBeAbleToUseIntersperseAndConcat() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Source source = Source.from(Arrays.asList("0", "1", "2", "3")); + final Flow flow = Flow.of(String.class).intersperse(","); + + final Future future = Source.single(">> ").concat(source.via(flow)).runWith(Sink.foreach(new Procedure() { // Scala Future + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }), materializer); + + probe.expectMsgEquals(">> "); + probe.expectMsgEquals("0"); + probe.expectMsgEquals(","); + probe.expectMsgEquals("1"); + probe.expectMsgEquals(","); + probe.expectMsgEquals("2"); + probe.expectMsgEquals(","); + probe.expectMsgEquals("3"); + Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); + } + @Test public void mustBeAbleToUseTakeWhile() throws Exception { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIntersperseSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIntersperseSpec.scala new file mode 100644 index 0000000000..0f1360692e --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIntersperseSpec.scala @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import org.scalatest.concurrent.ScalaFutures + +import scala.concurrent.duration._ + +class FlowIntersperseSpec extends AkkaSpec with ScalaFutures { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + + implicit val materializer = ActorMaterializer(settings) + + "A Intersperse" must { + + "inject element between existing elements" in { + val probe = Source(List(1, 2, 3)) + .map(_.toString) + .intersperse(",") + .runWith(TestSink.probe) + + probe.expectSubscription() + probe.toStrict(1.second).mkString("") should ===(List(1, 2, 3).mkString(",")) + } + + "inject element between existing elements, when downstream is fold" in { + val concated = Source(List(1, 2, 3)) + .map(_.toString) + .intersperse(",") + .runFold("")(_ + _) + + concated.futureValue should ===("1,2,3") + } + + "inject element between existing elements, and surround with []" in { + val probe = Source(List(1, 2, 3)) + .map(_.toString) + .intersperse("[", ",", "]") + .runWith(TestSink.probe) + + probe.toStrict(1.second).mkString("") should ===(List(1, 2, 3).mkString("[", ",", "]")) + } + + "demonstrate how to prepend only" in { + val probe = ( + Source.single(">> ") ++ Source(List("1", "2", "3")).intersperse(",")) + .runWith(TestSink.probe) + + probe.toStrict(1.second).mkString("") should ===(List(1, 2, 3).mkString(">> ", ",", "")) + } + + "surround empty stream with []" in { + val probe = Source(List()) + .map(_.toString) + .intersperse("[", ",", "]") + .runWith(TestSink.probe) + + probe.expectSubscription() + probe.toStrict(1.second).mkString("") should ===(List().mkString("[", ",", "]")) + } + + "surround single element stream with []" in { + val probe = Source(List(1)) + .map(_.toString) + .intersperse("[", ",", "]") + .runWith(TestSink.probe) + + probe.expectSubscription() + probe.toStrict(1.second).mkString("") should ===(List(1).mkString("[", ",", "]")) + } + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index e0fb944365..1916b160e8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -314,6 +314,7 @@ private[akka] object ActorProcessorFactory { case Collect(pf, _) ⇒ interp(fusing.Collect(pf, settings.supervisionDecider)) case Scan(z, f, _) ⇒ interp(fusing.Scan(z, f, settings.supervisionDecider)) case Fold(z, f, _) ⇒ interp(fusing.Fold(z, f, settings.supervisionDecider)) + case Intersperse(s, i, e, _) ⇒ interp(fusing.Intersperse(s, i, e)) case Recover(pf, _) ⇒ interp(fusing.Recover(pf)) case Expand(s, f, _) ⇒ interp(fusing.Expand(s, f)) case Conflate(s, f, _) ⇒ interp(fusing.Conflate(s, f, settings.supervisionDecider)) diff --git a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala index 838046b491..f9638cfd6c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/CompletedPublishers.scala @@ -26,6 +26,8 @@ private[akka] case object EmptyPublisher extends Publisher[Nothing] { * INTERNAL API */ private[akka] final case class ErrorPublisher(t: Throwable, name: String) extends Publisher[Nothing] { + ReactiveStreamsCompliance.requireNonNullElement(t) + import ReactiveStreamsCompliance._ override def subscribe(subscriber: Subscriber[_ >: Nothing]): Unit = try { @@ -42,6 +44,8 @@ private[akka] final case class ErrorPublisher(t: Throwable, name: String) extend private[akka] final case class SingleElementPublisher[T](value: T, name: String) extends Publisher[T] { import ReactiveStreamsCompliance._ + requireNonNullElement(value) + private[this] class SingleElementSubscription(subscriber: Subscriber[_ >: T]) extends Subscription { private[this] var done: Boolean = false override def cancel(): Unit = done = true diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 51e712e378..041100040d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -36,6 +36,7 @@ private[stream] object Stages { val dropWhile = name("dropWhile") val scan = name("scan") val fold = name("fold") + val intersperse = name("intersperse") val buffer = name("buffer") val conflate = name("conflate") val expand = name("expand") @@ -176,6 +177,10 @@ private[stream] object Stages { override def withAttributes(attributes: Attributes) = copy(attributes = attributes) } + final case class Intersperse(start: Option[Any], inject: Any, end: Option[Any], attributes: Attributes = intersperse) extends StageModule { + override def withAttributes(attributes: Attributes) = copy(attributes = attributes) + } + final case class Buffer(size: Int, overflowStrategy: OverflowStrategy, attributes: Attributes = buffer) extends StageModule { require(size > 0, s"Buffer size must be larger than zero but was [$size]") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 545104b56c..7a23dd4b44 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -236,6 +236,49 @@ private[akka] final case class Fold[In, Out](zero: Out, f: (Out, In) ⇒ Out, de override def restart(): Fold[In, Out] = copy() } +/** + * INTERNAL API + */ +private[akka] final case class Intersperse[T](start: Option[T], inject: T, end: Option[T]) extends StatefulStage[T, T] { + private var needsToEmitStart = start.isDefined + + override def initial: StageState[T, T] = + start match { + case Some(initial) ⇒ firstWithInitial(initial) + case _ ⇒ first + } + + def firstWithInitial(initial: T) = new StageState[T, T] { + override def onPush(elem: T, ctx: Context[T]) = { + needsToEmitStart = false + emit(Iterator(initial, elem), ctx, running) + } + } + + def first = new StageState[T, T] { + override def onPush(elem: T, ctx: Context[T]) = { + become(running) + ctx.push(elem) + } + } + + def running = new StageState[T, T] { + override def onPush(elem: T, ctx: Context[T]): SyncDirective = + emit(Iterator(inject, elem), ctx) + } + + override def onUpstreamFinish(ctx: Context[T]): TerminationDirective = { + end match { + case Some(e) if needsToEmitStart ⇒ + terminationEmit(Iterator(start.get, end.get), ctx) + case Some(e) ⇒ + terminationEmit(Iterator(end.get), ctx) + case _ ⇒ + terminationEmit(Iterator(), ctx) + } + } +} + /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index b428ceedbb..58fdf9b49d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -5,7 +5,8 @@ package akka.stream.javadsl import akka.event.LoggingAdapter import akka.japi.{ Pair, function } -import akka.stream.impl.StreamLayout +import akka.stream.impl.Stages.Intersperse +import akka.stream.impl.{ ReactiveStreamsCompliance, StreamLayout } import akka.stream.{ scaladsl, _ } import akka.stream.stage.Stage import org.reactivestreams.Processor @@ -406,6 +407,65 @@ class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Graph def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Flow[In, T, Mat] = new Flow(delegate.fold(zero)(f.apply)) + /** + * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] + * injects a separator between a List's elements. + * + * Additionally can inject start and end marker elements to stream. + * + * Examples: + * + * {{{ + * Source nums = Source.from(Arrays.asList(0, 1, 2, 3)); + * nums.intersperse(","); // 1 , 2 , 3 + * nums.intersperse("[", ",", "]"); // [ 1 , 2 , 3 ] + * }}} + * + * In case you want to only prepend or only append an element (yet still use the `intercept` feature + * to inject a separator between elements, you may want to use the following pattern instead of the 3-argument + * version of intersperse (See [[Source.concat]] for semantics details): + * + * {{{ + * Source.single(">> ").concat(flow.intersperse(",")) + * flow.intersperse(",").concat(Source.single("END")) + * }}} + * + * '''Emits when''' upstream emits (or before with the `start` element if provided) + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def intersperse[T >: Out](start: T, inject: T, end: T): javadsl.Flow[In, T, Mat] = + new Flow(delegate.intersperse(start, inject, end)) + + /** + * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] + * injects a separator between a List's elements. + * + * Additionally can inject start and end marker elements to stream. + * + * Examples: + * + * {{{ + * Source nums = Source.from(Arrays.asList(0, 1, 2, 3)); + * nums.intersperse(","); // 1 , 2 , 3 + * nums.intersperse("[", ",", "]"); // [ 1 , 2 , 3 ] + * }}} + * + * '''Emits when''' upstream emits (or before with the `start` element if provided) + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def intersperse[T >: Out](inject: T): javadsl.Flow[In, T, Mat] = + new Flow(delegate.intersperse(inject)) + /** * Chunk up this stream into groups of elements received within a time window, * or limited by the given number of elements, whatever happens first. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index c9107b6443..b88134c7aa 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -553,6 +553,64 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour def fold[T](zero: T)(f: function.Function2[T, Out, T]): javadsl.Source[T, Mat] = new Source(delegate.fold(zero)(f.apply)) + /** + * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] + * injects a separator between a List's elements. + * + * Additionally can inject start and end marker elements to stream. + * + * Examples: + * + * {{{ + * Source nums = Source.from(Arrays.asList(0, 1, 2, 3)); + * nums.intersperse(","); // 1 , 2 , 3 + * nums.intersperse("[", ",", "]"); // [ 1 , 2 , 3 ] + * }}} + * + * In case you want to only prepend or only append an element (yet still use the `intercept` feature + * to inject a separator between elements, you may want to use the following pattern instead of the 3-argument + * version of intersperse (See [[Source.concat]] for semantics details): + * + * {{{ + * Source.single(">> ").concat(list.intersperse(",")) + * list.intersperse(",").concat(Source.single("END")) + * }}} + * '''Emits when''' upstream emits (or before with the `start` element if provided) + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def intersperse[T >: Out](start: T, inject: T, end: T): javadsl.Source[T, Mat] = + new Source(delegate.intersperse(start, inject, end)) + + /** + * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] + * injects a separator between a List's elements. + * + * Additionally can inject start and end marker elements to stream. + * + * Examples: + * + * {{{ + * Source nums = Source.from(Arrays.asList(0, 1, 2, 3)); + * nums.intersperse(","); // 1 , 2 , 3 + * nums.intersperse("[", ",", "]"); // [ 1 , 2 , 3 ] + * }}} + * + * '''Emits when''' upstream emits (or before with the `start` element if provided) + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def intersperse[T >: Out](inject: T): javadsl.Source[T, Mat] = + new Source(delegate.intersperse(inject)) + /** * Chunk up this stream into groups of elements received within a time window, * or limited by the given number of elements, whatever happens first. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index e9f4ccd3e3..348a3aed46 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -10,7 +10,7 @@ import akka.stream.impl.SplitDecision._ import akka.stream.impl.Stages.{ DirectProcessor, MaterializingStageFactory, StageModule } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin } -import akka.stream.impl.{ Stages, StreamLayout } +import akka.stream.impl.{ ReactiveStreamsCompliance, Stages, StreamLayout } import akka.stream.stage._ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } @@ -604,6 +604,71 @@ trait FlowOps[+Out, +Mat] { */ def fold[T](zero: T)(f: (T, Out) ⇒ T): Repr[T, Mat] = andThen(Fold(zero, f.asInstanceOf[(Any, Any) ⇒ Any])) + /** + * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] + * injects a separator between a List's elements. + * + * Additionally can inject start and end marker elements to stream. + * + * Examples: + * + * {{{ + * val nums = Source(List(1,2,3)).map(_.toString) + * nums.intersperse(",") // 1 , 2 , 3 + * nums.intersperse("[", ",", "]") // [ 1 , 2 , 3 ] + * }}} + * + * In case you want to only prepend or only append an element (yet still use the `intercept` feature + * to inject a separator between elements, you may want to use the following pattern instead of the 3-argument + * version of intersperse (See [[Source.concat]] for semantics details): + * + * {{{ + * Source.single(">> ") ++ Source(List("1", "2", "3")).intersperse(",") + * Source(List("1", "2", "3")).intersperse(",") ++ Source.single("END") + * }}} + * + * '''Emits when''' upstream emits (or before with the `start` element if provided) + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def intersperse[T >: Out](start: T, inject: T, end: T): Repr[T, Mat] = { + ReactiveStreamsCompliance.requireNonNullElement(start) + ReactiveStreamsCompliance.requireNonNullElement(inject) + ReactiveStreamsCompliance.requireNonNullElement(end) + andThen(Intersperse(Some(start), inject, Some(end))) + } + + /** + * Intersperses stream with provided element, similar to how [[scala.collection.immutable.List.mkString]] + * injects a separator between a List's elements. + * + * Additionally can inject start and end marker elements to stream. + * + * Examples: + * + * {{{ + * val nums = Source(List(1,2,3)).map(_.toString) + * nums.intersperse(",") // 1 , 2 , 3 + * nums.intersperse("[", ",", "]") // [ 1 , 2 , 3 ] + * }}} + * + * '''Emits when''' upstream emits (or before with the `start` element if provided) + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def intersperse[T >: Out](inject: T): Repr[T, Mat] = { + ReactiveStreamsCompliance.requireNonNullElement(inject) + andThen(Intersperse(None, inject, None)) + } + /** * Chunk up this stream into groups of elements received within a time window, * or limited by the given number of elements, whatever happens first. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index f50964f817..5fb321e93d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -233,16 +233,19 @@ object Source extends SourceApply { /** * Create a `Source` that will continually emit the given element. */ - def repeat[T](element: T): Source[T, Unit] = + def repeat[T](element: T): Source[T, Unit] = { + ReactiveStreamsCompliance.requireNonNullElement(element) new Source( new PublisherSource( SingleElementPublisher( new immutable.Iterable[T] { override val iterator: Iterator[T] = Iterator.continually(element) + override def toString: String = "repeat(" + element + ")" }, "RepeatSource"), DefaultAttributes.repeat, shape("RepeatSource"))).mapConcat(id) + } /** * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.