+str #15998 Add Source singleton, empty, failed factories

This commit is contained in:
Patrik Nordwall 2014-10-06 11:28:13 +02:00
parent afd45a09f4
commit 7f9b018fe7
4 changed files with 94 additions and 6 deletions

View file

@ -26,7 +26,7 @@ class FlowForeachSpec extends AkkaSpec {
} }
"complete the future for an empty stream" in { "complete the future for an empty stream" in {
val mf = Source(Nil).foreach(testActor ! _) onSuccess { Source.empty.foreach(testActor ! _) onSuccess {
case _ testActor ! "done" case _ testActor ! "done"
} }
expectMsg("done") expectMsg("done")
@ -34,7 +34,7 @@ class FlowForeachSpec extends AkkaSpec {
"yield the first error" in { "yield the first error" in {
val p = StreamTestKit.PublisherProbe[Int]() val p = StreamTestKit.PublisherProbe[Int]()
val mf = Source(p).foreach(testActor ! _) onFailure { Source(p).foreach(testActor ! _) onFailure {
case ex testActor ! ex case ex testActor ! ex
} }
val proc = p.expectSubscription val proc = p.expectSubscription

View file

@ -4,7 +4,6 @@
package akka.stream.scaladsl2 package akka.stream.scaladsl2
import scala.collection.immutable import scala.collection.immutable
import akka.stream.impl.EmptyPublisher
import akka.stream.testkit.{ AkkaSpec, StreamTestKit } import akka.stream.testkit.{ AkkaSpec, StreamTestKit }
import org.reactivestreams.Publisher import org.reactivestreams.Publisher
import scala.concurrent.Await import scala.concurrent.Await
@ -29,7 +28,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
"work on empty input" in { "work on empty input" in {
val futureDrain = newFutureDrain val futureDrain = newFutureDrain
val fut = Source(Nil).prefixAndTail(10).runWith(futureDrain) val fut = Source.empty.prefixAndTail(10).runWith(futureDrain)
val (prefix, tailFlow) = Await.result(fut, 3.seconds) val (prefix, tailFlow) = Await.result(fut, 3.seconds)
prefix should be(Nil) prefix should be(Nil)
val tailSubscriber = SubscriberProbe[Int] val tailSubscriber = SubscriberProbe[Int]

View file

@ -0,0 +1,71 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl2
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
class SourceSpec extends AkkaSpec {
implicit val materializer = FlowMaterializer()
"Singleton Source" must {
"produce element" in {
val p = Source.singleton(1).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
val sub = c.expectSubscription()
sub.request(1)
c.expectNext(1)
c.expectComplete()
}
"produce elements to later subscriber" in {
val p = Source.singleton(1).runWith(PublisherDrain())
val c1 = StreamTestKit.SubscriberProbe[Int]()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c1)
val sub1 = c1.expectSubscription()
sub1.request(1)
c1.expectNext(1)
c1.expectComplete()
p.subscribe(c2)
val sub2 = c2.expectSubscription()
sub2.request(3)
c2.expectNext(1)
c2.expectComplete()
}
}
"Empty Source" must {
"complete immediately" in {
val p = Source.empty.runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectComplete()
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c2)
c2.expectComplete()
}
}
"Failed Source" must {
"emit error immediately" in {
val ex = new RuntimeException with NoStackTrace
val p = Source.failed(ex).runWith(PublisherDrain())
val c = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c)
c.expectError(ex)
val c2 = StreamTestKit.SubscriberProbe[Int]()
p.subscribe(c2)
c2.expectError(ex)
}
}
}

View file

@ -4,14 +4,15 @@
package akka.stream.scaladsl2 package akka.stream.scaladsl2
import org.reactivestreams.{ Subscriber, Publisher } import org.reactivestreams.{ Subscriber, Publisher }
import scala.annotation.unchecked.uncheckedVariance import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable import scala.collection.immutable
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
import scala.language.higherKinds import scala.language.higherKinds
import scala.language.implicitConversions import scala.language.implicitConversions
import akka.stream.impl.SynchronousPublisherFromIterable
import akka.stream.impl.EmptyPublisher
import akka.stream.impl.ErrorPublisher
/** /**
* A `Source` is a set of stream processing steps that has one open output and an attached input. * A `Source` is a set of stream processing steps that has one open output and an attached input.
@ -118,4 +119,21 @@ object Source {
def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): Source[T] = def apply[T](initialDelay: FiniteDuration, interval: FiniteDuration, tick: () T): Source[T] =
TickTap(initialDelay, interval, tick) TickTap(initialDelay, interval, tick)
/**
* Create a `Source` with one element.
* Every connected `Sink` of this stream will see an individual stream consisting of one element.
*/
def singleton[T](element: T): Source[T] = apply(SynchronousPublisherFromIterable(List(element)))
/**
* Create a `Source` with no elements, i.e. an empty stream that is completed immediately
* for every connected `Sink`.
*/
def empty[T](): Source[T] = apply(EmptyPublisher[T])
/**
* Create a `Source` that immediately ends the stream with the `cause` error to every connected `Sink`.
*/
def failed[T](cause: Throwable): Source[T] = apply(ErrorPublisher(cause))
} }