diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala index 96fb3e8d43..549bf1c22d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowForeachSpec.scala @@ -26,7 +26,7 @@ class FlowForeachSpec extends AkkaSpec { } "complete the future for an empty stream" in { - val mf = Source(Nil).foreach(testActor ! _) onSuccess { + Source.empty.foreach(testActor ! _) onSuccess { case _ ⇒ testActor ! "done" } expectMsg("done") @@ -34,7 +34,7 @@ class FlowForeachSpec extends AkkaSpec { "yield the first error" in { val p = StreamTestKit.PublisherProbe[Int]() - val mf = Source(p).foreach(testActor ! _) onFailure { + Source(p).foreach(testActor ! _) onFailure { case ex ⇒ testActor ! ex } val proc = p.expectSubscription diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowPrefixAndTailSpec.scala index aa78ba20d2..8f84a6bc06 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/FlowPrefixAndTailSpec.scala @@ -4,7 +4,6 @@ package akka.stream.scaladsl2 import scala.collection.immutable -import akka.stream.impl.EmptyPublisher import akka.stream.testkit.{ AkkaSpec, StreamTestKit } import org.reactivestreams.Publisher import scala.concurrent.Await @@ -29,7 +28,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { "work on empty input" in { val futureDrain = newFutureDrain - val fut = Source(Nil).prefixAndTail(10).runWith(futureDrain) + val fut = Source.empty.prefixAndTail(10).runWith(futureDrain) val (prefix, tailFlow) = Await.result(fut, 3.seconds) prefix should be(Nil) val tailSubscriber = SubscriberProbe[Int] diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/SourceSpec.scala new file mode 100644 index 0000000000..7a878e211a --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl2/SourceSpec.scala @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace +import akka.stream.testkit.AkkaSpec +import akka.stream.testkit.StreamTestKit + +class SourceSpec extends AkkaSpec { + + implicit val materializer = FlowMaterializer() + + "Singleton Source" must { + "produce element" in { + val p = Source.singleton(1).runWith(PublisherDrain()) + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + val sub = c.expectSubscription() + sub.request(1) + c.expectNext(1) + c.expectComplete() + } + + "produce elements to later subscriber" in { + val p = Source.singleton(1).runWith(PublisherDrain()) + val c1 = StreamTestKit.SubscriberProbe[Int]() + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c1) + + val sub1 = c1.expectSubscription() + sub1.request(1) + c1.expectNext(1) + c1.expectComplete() + p.subscribe(c2) + val sub2 = c2.expectSubscription() + sub2.request(3) + c2.expectNext(1) + c2.expectComplete() + } + + } + + "Empty Source" must { + "complete immediately" in { + val p = Source.empty.runWith(PublisherDrain()) + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + c.expectComplete() + + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c2) + c2.expectComplete() + } + } + + "Failed Source" must { + "emit error immediately" in { + val ex = new RuntimeException with NoStackTrace + val p = Source.failed(ex).runWith(PublisherDrain()) + val c = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c) + c.expectError(ex) + + val c2 = StreamTestKit.SubscriberProbe[Int]() + p.subscribe(c2) + c2.expectError(ex) + } + } +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala index df072ef750..dc1904721d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala @@ -4,14 +4,15 @@ package akka.stream.scaladsl2 import org.reactivestreams.{ Subscriber, Publisher } - import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration - import scala.language.higherKinds import scala.language.implicitConversions +import akka.stream.impl.SynchronousPublisherFromIterable +import akka.stream.impl.EmptyPublisher +import akka.stream.impl.ErrorPublisher /** * A `Source` is a set of stream processing steps that has one open output and an attached input. @@ -118,4 +119,21 @@ object Source { def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () ⇒ T): Source[T] = TickTap(initialDelay, interval, tick) + /** + * Create a `Source` with one element. + * Every connected `Sink` of this stream will see an individual stream consisting of one element. + */ + def singleton[T](element: T): Source[T] = apply(SynchronousPublisherFromIterable(List(element))) + + /** + * Create a `Source` with no elements, i.e. an empty stream that is completed immediately + * for every connected `Sink`. + */ + def empty[T](): Source[T] = apply(EmptyPublisher[T]) + + /** + * Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`. + */ + def failed[T](cause: Throwable): Source[T] = apply(ErrorPublisher(cause)) + }