parent
d7b45e0fc3
commit
e6e476d82a
146 changed files with 910 additions and 740 deletions
|
|
@ -6,11 +6,11 @@ import akka.http.scaladsl.model.Uri
|
|||
import akka.stream.scaladsl._
|
||||
import akka.stream._
|
||||
import akka.stream.stage.{ OutHandler, InHandler, GraphStageLogic, GraphStage }
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
import akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber }
|
||||
|
||||
import scala.concurrent.{ Future, ExecutionContext, Promise }
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
import scala.util.{ Failure, Random, Success, Try }
|
||||
|
||||
class MigrationsScala extends AkkaSpec {
|
||||
|
||||
|
|
@ -110,8 +110,26 @@ class MigrationsScala extends AkkaSpec {
|
|||
promise.trySuccess(Some(()))
|
||||
|
||||
val ticks = Source.tick(1.second, 3.seconds, "tick")
|
||||
|
||||
val pubSource = Source.fromPublisher(TestPublisher.manualProbe[Int]())
|
||||
|
||||
val itSource = Source.fromIterator(() => Iterator.continually(Random.nextGaussian))
|
||||
|
||||
val futSource = Source.fromFuture(Future.successful(42))
|
||||
|
||||
val subSource = Source.asSubscriber
|
||||
//#source-creators
|
||||
|
||||
//#sink-creators
|
||||
val subSink = Sink.fromSubscriber(TestSubscriber.manualProbe[Int]())
|
||||
//#sink-creators
|
||||
|
||||
//#sink-as-publisher
|
||||
val pubSink = Sink.asPublisher(fanout = false)
|
||||
|
||||
val pubSinkFanout = Sink.asPublisher(fanout = true)
|
||||
//#sink-as-publisher
|
||||
|
||||
//#flatMapConcat
|
||||
Flow[Source[Int, Any]].flatMapConcat(identity)
|
||||
//#flatMapConcat
|
||||
|
|
|
|||
|
|
@ -100,7 +100,7 @@ class FlowDocSpec extends AkkaSpec {
|
|||
Source(List(1, 2, 3))
|
||||
|
||||
// Create a source from a Future
|
||||
Source(Future.successful("Hello Streams!"))
|
||||
Source.fromFuture(Future.successful("Hello Streams!"))
|
||||
|
||||
// Create a source from a single element
|
||||
Source.single("only one element")
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ class GraphCyclesSpec extends AkkaSpec {
|
|||
implicit val materializer = ActorMaterializer()
|
||||
|
||||
"Cycle demonstration" must {
|
||||
val source = Source(() => Iterator.from(0))
|
||||
val source = Source.fromIterator(() => Iterator.from(0))
|
||||
|
||||
"include a deadlocked cycle" in {
|
||||
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ class RateTransformationDocSpec extends AkkaSpec {
|
|||
}
|
||||
//#conflate-summarize
|
||||
|
||||
val fut = Source(() => Iterator.continually(Random.nextGaussian))
|
||||
val fut = Source.fromIterator(() => Iterator.continually(Random.nextGaussian))
|
||||
.via(statsFlow)
|
||||
.grouped(10)
|
||||
.runWith(Sink.head)
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
|
|||
|
||||
val impl = new Fixture {
|
||||
override def tweets: Publisher[Tweet] =
|
||||
TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher(false))
|
||||
TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.asPublisher(false))
|
||||
|
||||
override def storage = TestSubscriber.manualProbe[Author]
|
||||
|
||||
|
|
@ -66,7 +66,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
|
|||
val storage = impl.storage
|
||||
|
||||
//#connect-all
|
||||
Source(tweets).via(authors).to(Sink(storage)).run()
|
||||
Source.fromPublisher(tweets).via(authors).to(Sink.fromSubscriber(storage)).run()
|
||||
//#connect-all
|
||||
|
||||
assertResult(storage)
|
||||
|
|
@ -92,7 +92,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
|
|||
|
||||
//#source-publisher
|
||||
val authorPublisher: Publisher[Author] =
|
||||
Source(tweets).via(authors).runWith(Sink.publisher(fanout = false))
|
||||
Source.fromPublisher(tweets).via(authors).runWith(Sink.asPublisher(fanout = false))
|
||||
|
||||
authorPublisher.subscribe(storage)
|
||||
//#source-publisher
|
||||
|
|
@ -107,8 +107,8 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
|
|||
|
||||
//#source-fanoutPublisher
|
||||
val authorPublisher: Publisher[Author] =
|
||||
Source(tweets).via(authors)
|
||||
.runWith(Sink.publisher(fanout = true))
|
||||
Source.fromPublisher(tweets).via(authors)
|
||||
.runWith(Sink.asPublisher(fanout = true))
|
||||
|
||||
authorPublisher.subscribe(storage)
|
||||
authorPublisher.subscribe(alert)
|
||||
|
|
@ -125,7 +125,7 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
|
|||
|
||||
//#sink-subscriber
|
||||
val tweetSubscriber: Subscriber[Tweet] =
|
||||
authors.to(Sink(storage)).runWith(Source.subscriber[Tweet])
|
||||
authors.to(Sink.fromSubscriber(storage)).runWith(Source.asSubscriber[Tweet])
|
||||
|
||||
tweets.subscribe(tweetSubscriber)
|
||||
//#sink-subscriber
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
|
|||
|
||||
// prepare graph elements
|
||||
val zip = b.add(Zip[Int, Int]())
|
||||
def ints = Source(() => Iterator.from(1))
|
||||
def ints = Source.fromIterator(() => Iterator.from(1))
|
||||
|
||||
// connect the graph
|
||||
ints.filter(_ % 2 != 0) ~> zip.in0
|
||||
|
|
|
|||
|
|
@ -19,8 +19,8 @@ class RecipeCollectingMetrics extends RecipeSpec {
|
|||
//
|
||||
// val loadPub = TestPublisher.manualProbe[Int]()
|
||||
// val tickPub = TestPublisher.manualProbe[Tick]()
|
||||
// val reportTicks = Source(tickPub)
|
||||
// val loadUpdates = Source(loadPub)
|
||||
// val reportTicks = Source.fromPublisher(tickPub)
|
||||
// val loadUpdates = Source.fromPublisher(loadPub)
|
||||
// val futureSink = Sink.head[immutable.Seq[String]]
|
||||
// val sink = Flow[String].grouped(10).to(futureSink)
|
||||
//
|
||||
|
|
|
|||
|
|
@ -13,15 +13,15 @@ class RecipeDroppyBroadcast extends RecipeSpec {
|
|||
"Recipe for a droppy broadcast" must {
|
||||
"work" in {
|
||||
val pub = TestPublisher.probe[Int]()
|
||||
val myElements = Source(pub)
|
||||
val myElements = Source.fromPublisher(pub)
|
||||
|
||||
val sub1 = TestSubscriber.manualProbe[Int]()
|
||||
val sub2 = TestSubscriber.manualProbe[Int]()
|
||||
val sub3 = TestSubscriber.probe[Int]()
|
||||
val futureSink = Sink.head[Seq[Int]]
|
||||
val mySink1 = Sink(sub1)
|
||||
val mySink2 = Sink(sub2)
|
||||
val mySink3 = Sink(sub3)
|
||||
val mySink1 = Sink.fromSubscriber(sub1)
|
||||
val mySink2 = Sink.fromSubscriber(sub2)
|
||||
val mySink3 = Sink.fromSubscriber(sub3)
|
||||
|
||||
//#droppy-bcast
|
||||
val graph = RunnableGraph.fromGraph(GraphDSL.create(mySink1, mySink2, mySink3)((_, _, _)) { implicit b =>
|
||||
|
|
|
|||
|
|
@ -94,15 +94,15 @@ class RecipeGlobalRateLimit extends RecipeSpec {
|
|||
// Use a large period and emulate the timer by hand instead
|
||||
val limiter = system.actorOf(Limiter.props(2, 100.days, 1), "limiter")
|
||||
|
||||
val source1 = Source(() => Iterator.continually("E1")).via(limitGlobal(limiter, 2.seconds))
|
||||
val source2 = Source(() => Iterator.continually("E2")).via(limitGlobal(limiter, 2.seconds))
|
||||
val source1 = Source.fromIterator(() => Iterator.continually("E1")).via(limitGlobal(limiter, 2.seconds))
|
||||
val source2 = Source.fromIterator(() => Iterator.continually("E2")).via(limitGlobal(limiter, 2.seconds))
|
||||
|
||||
val probe = TestSubscriber.manualProbe[String]()
|
||||
|
||||
RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
|
||||
import GraphDSL.Implicits._
|
||||
val merge = b.add(Merge[String](2))
|
||||
source1 ~> merge ~> Sink(probe)
|
||||
source1 ~> merge ~> Sink.fromSubscriber(probe)
|
||||
source2 ~> merge
|
||||
ClosedShape
|
||||
}).run()
|
||||
|
|
|
|||
|
|
@ -54,8 +54,8 @@ class RecipeHold extends RecipeSpec {
|
|||
|
||||
val pub = TestPublisher.probe[Int]()
|
||||
val sub = TestSubscriber.manualProbe[Int]()
|
||||
val source = Source(pub)
|
||||
val sink = Sink(sub)
|
||||
val source = Source.fromPublisher(pub)
|
||||
val sink = Sink.fromSubscriber(sub)
|
||||
|
||||
source.transform(() => new HoldWithInitial(0)).to(sink).run()
|
||||
|
||||
|
|
@ -84,8 +84,8 @@ class RecipeHold extends RecipeSpec {
|
|||
|
||||
val pub = TestPublisher.probe[Int]()
|
||||
val sub = TestSubscriber.manualProbe[Int]()
|
||||
val source = Source(pub)
|
||||
val sink = Sink(sub)
|
||||
val source = Source.fromPublisher(pub)
|
||||
val sink = Sink.fromSubscriber(sub)
|
||||
|
||||
source.transform(() => new HoldWithWait).to(sink).run()
|
||||
|
||||
|
|
|
|||
|
|
@ -14,8 +14,8 @@ class RecipeManualTrigger extends RecipeSpec {
|
|||
val elements = Source(List("1", "2", "3", "4"))
|
||||
val pub = TestPublisher.probe[Trigger]()
|
||||
val sub = TestSubscriber.manualProbe[Message]()
|
||||
val triggerSource = Source(pub)
|
||||
val sink = Sink(sub)
|
||||
val triggerSource = Source.fromPublisher(pub)
|
||||
val sink = Sink.fromSubscriber(sub)
|
||||
|
||||
//#manually-triggered-stream
|
||||
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
|
||||
|
|
@ -53,8 +53,8 @@ class RecipeManualTrigger extends RecipeSpec {
|
|||
val elements = Source(List("1", "2", "3", "4"))
|
||||
val pub = TestPublisher.probe[Trigger]()
|
||||
val sub = TestSubscriber.manualProbe[Message]()
|
||||
val triggerSource = Source(pub)
|
||||
val sink = Sink(sub)
|
||||
val triggerSource = Source.fromPublisher(pub)
|
||||
val sink = Sink.fromSubscriber(sub)
|
||||
|
||||
//#manually-triggered-stream-zipwith
|
||||
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
|
||||
|
|
|
|||
|
|
@ -15,8 +15,8 @@ class RecipeMissedTicks extends RecipeSpec {
|
|||
|
||||
val pub = TestPublisher.probe[Tick]()
|
||||
val sub = TestSubscriber.manualProbe[Int]()
|
||||
val tickStream = Source(pub)
|
||||
val sink = Sink(sub)
|
||||
val tickStream = Source.fromPublisher(pub)
|
||||
val sink = Sink.fromSubscriber(sub)
|
||||
|
||||
//#missed-ticks
|
||||
val missedTicks: Flow[Tick, Int, Unit] =
|
||||
|
|
|
|||
|
|
@ -22,8 +22,8 @@ class RecipeSimpleDrop extends RecipeSpec {
|
|||
|
||||
val pub = TestPublisher.probe[Message]()
|
||||
val sub = TestSubscriber.manualProbe[Message]()
|
||||
val messageSource = Source(pub)
|
||||
val sink = Sink(sub)
|
||||
val messageSource = Source.fromPublisher(pub)
|
||||
val sink = Sink.fromSubscriber(sub)
|
||||
|
||||
messageSource.via(realDroppyStream).to(sink).run()
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue