=str #17089 stream testkit

This commit is contained in:
Martynas Mickevičius 2015-04-24 11:45:03 +03:00
parent 7b4a640147
commit 8e2cc3e70f
96 changed files with 1411 additions and 1131 deletions

View file

@ -5,9 +5,8 @@ package docs.stream
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.Flow
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit._
import akka.stream.scaladsl.Sink
import akka.stream.testkit.StreamTestKit.SubscriberProbe
import akka.stream.scaladsl.Source
class ReactiveStreamsDocSpec extends AkkaSpec {
@ -45,12 +44,12 @@ class ReactiveStreamsDocSpec extends AkkaSpec {
override def tweets: Publisher[Tweet] =
TwitterStreamQuickstartDocSpec.tweets.runWith(Sink.publisher)
override def storage = SubscriberProbe[Author]
override def storage = TestSubscriber.manualProbe[Author]
override def alert = SubscriberProbe[Author]
override def alert = TestSubscriber.manualProbe[Author]
}
def assertResult(storage: SubscriberProbe[Author]): Unit = {
def assertResult(storage: TestSubscriber.ManualProbe[Author]): Unit = {
val sub = storage.expectSubscription()
sub.request(10)
storage.expectNext(Author("rolandkuhn"))

View file

@ -0,0 +1,38 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package docs.stream
import akka.stream._
import akka.stream.scaladsl._
import akka.stream.testkit._
import akka.stream.testkit.scaladsl._
class StreamTestKitDocSpec extends AkkaSpec {
implicit val mat = ActorFlowMaterializer()
"test source probe" in {
//#test-source-probe
TestSource.probe[Int]
.toMat(Sink.cancelled)(Keep.left)
.run()
.expectCancellation()
//#test-source-probe
}
"test sink probe" in {
//#test-sink-probe
Source(1 to 4)
.filter(_ % 2 == 0)
.map(_ * 2)
.runWith(TestSink.probe[Int])
.request(2)
.expectNext(4, 8)
.expectComplete()
//#test-sink-probe
}
}

View file

@ -2,8 +2,7 @@ package docs.stream.cookbook
import akka.stream.{ ActorFlowMaterializerSettings, ActorFlowMaterializer }
import akka.stream.scaladsl._
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe }
import akka.stream.testkit._
import scala.collection.immutable
import scala.concurrent.Await
@ -18,8 +17,8 @@ class RecipeCollectingMetrics extends RecipeSpec {
"work" in {
// type Tick = Unit
//
// val loadPub = PublisherProbe[Int]()
// val tickPub = PublisherProbe[Tick]()
// val loadPub = TestPublisher.manualProbe[Int]()
// val tickPub = TestPublisher.manualProbe[Tick]()
// val reportTicks = Source(tickPub)
// val loadUpdates = Source(loadPub)
// val futureSink = Sink.head[immutable.Seq[String]]

View file

@ -2,7 +2,7 @@ package docs.stream.cookbook
import akka.stream.OverflowStrategy
import akka.stream.scaladsl._
import akka.stream.testkit.StreamTestKit.SubscriberProbe
import akka.stream.testkit._
import scala.collection.immutable
import scala.concurrent.Await
@ -14,8 +14,8 @@ class RecipeDroppyBroadcast extends RecipeSpec {
"work" in {
val myElements = Source(immutable.Iterable.tabulate(100)(_ + 1))
val sub1 = SubscriberProbe[Int]()
val sub2 = SubscriberProbe[Int]()
val sub1 = TestSubscriber.manualProbe[Int]()
val sub2 = TestSubscriber.manualProbe[Int]()
val futureSink = Sink.head[Seq[Int]]
val mySink1 = Sink(sub1)
val mySink2 = Sink(sub2)

View file

@ -3,7 +3,7 @@ package docs.stream.cookbook
import akka.actor.{ Props, ActorRef, Actor }
import akka.actor.Actor.Receive
import akka.stream.scaladsl._
import akka.stream.testkit.StreamTestKit.SubscriberProbe
import akka.stream.testkit._
import scala.collection.immutable
import scala.concurrent.duration._
@ -95,7 +95,7 @@ class RecipeGlobalRateLimit extends RecipeSpec {
val source1 = Source(() => Iterator.continually("E1")).via(limitGlobal(limiter, 2.seconds))
val source2 = Source(() => Iterator.continually("E2")).via(limitGlobal(limiter, 2.seconds))
val probe = SubscriberProbe[String]()
val probe = TestSubscriber.manualProbe[String]()
FlowGraph.closed() { implicit b =>
import FlowGraph.Implicits._

View file

@ -1,8 +1,7 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe }
import akka.stream.testkit._
import scala.concurrent.duration._
@ -53,15 +52,13 @@ class RecipeHold extends RecipeSpec {
"work for version 1" in {
val pub = PublisherProbe[Int]()
val sub = SubscriberProbe[Int]()
val pub = TestPublisher.probe[Int]()
val sub = TestSubscriber.manualProbe[Int]()
val source = Source(pub)
val sink = Sink(sub)
source.transform(() => new HoldWithInitial(0)).to(sink).run()
val manualSource = new StreamTestKit.AutoPublisher(pub)
val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis)
@ -71,46 +68,44 @@ class RecipeHold extends RecipeSpec {
subscription.request(1)
sub.expectNext(0)
manualSource.sendNext(1)
manualSource.sendNext(2)
pub.sendNext(1)
pub.sendNext(2)
subscription.request(2)
sub.expectNext(2)
sub.expectNext(2)
manualSource.sendComplete()
pub.sendComplete()
subscription.request(1)
sub.expectComplete()
}
"work for version 2" in {
val pub = PublisherProbe[Int]()
val sub = SubscriberProbe[Int]()
val pub = TestPublisher.probe[Int]()
val sub = TestSubscriber.manualProbe[Int]()
val source = Source(pub)
val sink = Sink(sub)
source.transform(() => new HoldWithWait).to(sink).run()
val manualSource = new StreamTestKit.AutoPublisher(pub)
val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis)
subscription.request(1)
sub.expectNoMsg(100.millis)
manualSource.sendNext(1)
pub.sendNext(1)
sub.expectNext(1)
manualSource.sendNext(2)
manualSource.sendNext(3)
pub.sendNext(2)
pub.sendNext(3)
subscription.request(2)
sub.expectNext(3)
sub.expectNext(3)
manualSource.sendComplete()
pub.sendComplete()
subscription.request(1)
sub.expectComplete()
}

View file

@ -1,8 +1,7 @@
package docs.stream.cookbook
import akka.stream.scaladsl._
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe }
import akka.stream.testkit._
import akka.util.ByteString
class RecipeKeepAlive extends RecipeSpec {
@ -13,9 +12,9 @@ class RecipeKeepAlive extends RecipeSpec {
type Tick = Unit
val tickPub = PublisherProbe[Tick]()
val dataPub = PublisherProbe[ByteString]()
val sub = SubscriberProbe[ByteString]()
val tickPub = TestPublisher.probe[Tick]()
val dataPub = TestPublisher.probe[ByteString]()
val sub = TestSubscriber.manualProbe[ByteString]()
val ticks = Source(tickPub)
val dataStream = Source(dataPub)
@ -38,17 +37,14 @@ class RecipeKeepAlive extends RecipeSpec {
graph.run()
val manualTicks = new StreamTestKit.AutoPublisher(tickPub)
val manualData = new StreamTestKit.AutoPublisher(dataPub)
val subscription = sub.expectSubscription()
manualTicks.sendNext(())
tickPub.sendNext(())
// pending data will overcome the keepalive
manualData.sendNext(ByteString(1))
manualData.sendNext(ByteString(2))
manualData.sendNext(ByteString(3))
dataPub.sendNext(ByteString(1))
dataPub.sendNext(ByteString(2))
dataPub.sendNext(ByteString(3))
subscription.request(1)
sub.expectNext(ByteString(1))
@ -60,11 +56,11 @@ class RecipeKeepAlive extends RecipeSpec {
sub.expectNext(keepaliveMessage)
subscription.request(1)
manualTicks.sendNext(())
tickPub.sendNext(())
sub.expectNext(keepaliveMessage)
manualData.sendComplete()
manualTicks.sendComplete()
dataPub.sendComplete()
tickPub.sendComplete()
sub.expectComplete()

View file

@ -1,8 +1,7 @@
package docs.stream.cookbook
import akka.stream.scaladsl._
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe }
import akka.stream.testkit._
import scala.concurrent.duration._
class RecipeManualTrigger extends RecipeSpec {
@ -12,8 +11,8 @@ class RecipeManualTrigger extends RecipeSpec {
"work" in {
val elements = Source(List("1", "2", "3", "4"))
val pub = PublisherProbe[Trigger]()
val sub = SubscriberProbe[Message]()
val pub = TestPublisher.probe[Trigger]()
val sub = TestSubscriber.manualProbe[Message]()
val triggerSource = Source(pub)
val sink = Sink(sub)
@ -28,22 +27,21 @@ class RecipeManualTrigger extends RecipeSpec {
//#manually-triggered-stream
graph.run()
val manualSource = new StreamTestKit.AutoPublisher(pub)
sub.expectSubscription().request(1000)
sub.expectNoMsg(100.millis)
manualSource.sendNext(())
pub.sendNext(())
sub.expectNext("1")
sub.expectNoMsg(100.millis)
manualSource.sendNext(())
manualSource.sendNext(())
pub.sendNext(())
pub.sendNext(())
sub.expectNext("2")
sub.expectNext("3")
sub.expectNoMsg(100.millis)
manualSource.sendNext(())
pub.sendNext(())
sub.expectNext("4")
sub.expectComplete()
}
@ -51,8 +49,8 @@ class RecipeManualTrigger extends RecipeSpec {
"work with ZipWith" in {
val elements = Source(List("1", "2", "3", "4"))
val pub = PublisherProbe[Trigger]()
val sub = SubscriberProbe[Message]()
val pub = TestPublisher.probe[Trigger]()
val sub = TestSubscriber.manualProbe[Message]()
val triggerSource = Source(pub)
val sink = Sink(sub)
@ -68,22 +66,21 @@ class RecipeManualTrigger extends RecipeSpec {
//#manually-triggered-stream-zipwith
graph.run()
val manualSource = new StreamTestKit.AutoPublisher(pub)
sub.expectSubscription().request(1000)
sub.expectNoMsg(100.millis)
manualSource.sendNext(())
pub.sendNext(())
sub.expectNext("1")
sub.expectNoMsg(100.millis)
manualSource.sendNext(())
manualSource.sendNext(())
pub.sendNext(())
pub.sendNext(())
sub.expectNext("2")
sub.expectNext("3")
sub.expectNoMsg(100.millis)
manualSource.sendNext(())
pub.sendNext(())
sub.expectNext("4")
sub.expectComplete()
}

View file

@ -1,8 +1,7 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe }
import akka.stream.testkit._
import scala.concurrent.duration._
@ -13,8 +12,8 @@ class RecipeMissedTicks extends RecipeSpec {
"work" in {
type Tick = Unit
val pub = PublisherProbe[Tick]()
val sub = SubscriberProbe[Int]()
val pub = TestPublisher.probe[Tick]()
val sub = TestSubscriber.manualProbe[Int]()
val tickStream = Source(pub)
val sink = Sink(sub)
@ -26,12 +25,11 @@ class RecipeMissedTicks extends RecipeSpec {
//#missed-ticks
missedTicks.to(sink).run()
val manualSource = new StreamTestKit.AutoPublisher(pub)
manualSource.sendNext(())
manualSource.sendNext(())
manualSource.sendNext(())
manualSource.sendNext(())
pub.sendNext(())
pub.sendNext(())
pub.sendNext(())
pub.sendNext(())
val subscription = sub.expectSubscription()
subscription.request(1)
@ -40,10 +38,10 @@ class RecipeMissedTicks extends RecipeSpec {
subscription.request(1)
sub.expectNoMsg(100.millis)
manualSource.sendNext(())
pub.sendNext(())
sub.expectNext(0)
manualSource.sendComplete()
pub.sendComplete()
subscription.request(1)
sub.expectComplete()
}

View file

@ -1,8 +1,7 @@
package docs.stream.cookbook
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit.StreamTestKit.{ SubscriberProbe, PublisherProbe }
import akka.stream.testkit._
import scala.concurrent.duration._
@ -17,26 +16,24 @@ class RecipeSimpleDrop extends RecipeSpec {
Flow[Message].conflate(seed = identity)((lastMessage, newMessage) => newMessage)
//#simple-drop
val pub = PublisherProbe[Message]()
val sub = SubscriberProbe[Message]()
val pub = TestPublisher.probe[Message]()
val sub = TestSubscriber.manualProbe[Message]()
val messageSource = Source(pub)
val sink = Sink(sub)
messageSource.via(droppyStream).to(sink).run()
val manualSource = new StreamTestKit.AutoPublisher(pub)
val subscription = sub.expectSubscription()
sub.expectNoMsg(100.millis)
manualSource.sendNext("1")
manualSource.sendNext("2")
manualSource.sendNext("3")
pub.sendNext("1")
pub.sendNext("2")
pub.sendNext("3")
subscription.request(1)
sub.expectNext("3")
manualSource.sendComplete()
pub.sendComplete()
subscription.request(1)
sub.expectComplete()
}

View file

@ -8,11 +8,11 @@ import java.io.File
import akka.stream._
import akka.stream.io.SynchronousFileSource
import akka.stream.io.SynchronousFileSink
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.StreamTestKit
import akka.stream.testkit._
import akka.stream.testkit.Utils._
import akka.util.ByteString
class StreamFileDocSpec extends AkkaSpec(StreamTestKit.UnboundedMailboxConfig) {
class StreamFileDocSpec extends AkkaSpec(UnboundedMailboxConfig) {
implicit val ec = system.dispatcher
implicit val mat = ActorFlowMaterializer()