=str #18501 make TestSink.probe consistent in style with Sink.head

Also known as, we do not use () for those
This commit is contained in:
Konrad Malawski 2015-09-17 13:10:09 +02:00
parent ab96ebfca0
commit dcfbaed206
9 changed files with 41 additions and 38 deletions

View file

@ -181,8 +181,11 @@ object TestSubscriber {
/**
* Implementation of [[org.reactivestreams.Subscriber]] that allows various assertions.
*
* All timeouts are dilated automatically, for more details about time dilation refer to [[akka.testkit.TestKit]].
*/
class ManualProbe[I] private[TestSubscriber] ()(implicit system: ActorSystem) extends Subscriber[I] {
import akka.testkit._
type Self <: ManualProbe[I]
@ -206,12 +209,18 @@ object TestSubscriber {
def expectEvent(): SubscriberEvent =
probe.expectMsgType[SubscriberEvent]
/**
* Expect and return [[SubscriberEvent]] (any of: `OnSubscribe`, `OnNext`, `OnError` or `OnComplete`).
*/
def expectEvent(max: FiniteDuration): SubscriberEvent =
probe.expectMsgType[SubscriberEvent](max.dilated)
/**
* Fluent DSL
*
* Expect [[SubscriberEvent]] (any of: `OnSubscribe`, `OnNext`, `OnError` or `OnComplete`).
*/
def expectEvent(event: SubscriberEvent): Self = { // TODO it's more "signal" than event, shall we rename? -- ktoso
def expectEvent(event: SubscriberEvent): Self = {
probe.expectMsg(event)
self
}
@ -492,20 +501,14 @@ object TestSubscriber {
val deadline = Deadline.now + atMost
val b = immutable.Seq.newBuilder[I]
def checkDeadline(): Unit = {
if (deadline.isOverdue())
throw new TimeoutException(s"toStrict did not drain the stream within $atMost! Accumulated elements: ${b.result()}")
}
@tailrec def drain(): immutable.Seq[I] =
self.expectEvent() match {
self.expectEvent(deadline.timeLeft) match {
case OnError(ex)
throw new AssertionError(s"toStrict received OnError($ex) while draining stream! Accumulated elements: ${b.result()}")
// TODO once on JDK7+ this could be made an AssertionError, since it can carry ex in its cause param
throw new AssertionError(s"toStrict received OnError(${ex.getMessage}) while draining stream! Accumulated elements: ${b.result()}")
case OnComplete
checkDeadline()
b.result()
case OnNext(i: I @unchecked)
checkDeadline()
b += i
drain()
}

View file

@ -14,6 +14,6 @@ object TestSink {
* A Sink that materialized to a [[TestSubscriber.Probe]].
*/
def probe[T](system: ActorSystem): Sink[T, TestSubscriber.Probe[T]] =
new Sink(scaladsl.TestSink.probe[T]()(system))
new Sink(scaladsl.TestSink.probe[T](system))
}

View file

@ -14,6 +14,6 @@ object TestSource {
* A Source that materializes to a [[TestPublisher.Probe]].
*/
def probe[T](system: ActorSystem): Source[T, TestPublisher.Probe[T]] =
new Source(scaladsl.TestSource.probe[T]()(system))
new Source(scaladsl.TestSource.probe[T](system))
}

View file

@ -18,7 +18,7 @@ object TestSink {
/**
* A Sink that materialized to a [[TestSubscriber.Probe]].
*/
def probe[T]()(implicit system: ActorSystem): Sink[T, Probe[T]] =
def probe[T](implicit system: ActorSystem): Sink[T, Probe[T]] =
new Sink[T, TestSubscriber.Probe[T]](new StreamTestKit.ProbeSink(none, SinkShape(Inlet("ProbeSink.in"))))
}

View file

@ -19,6 +19,6 @@ object TestSource {
/**
* A Source that materializes to a [[TestPublisher.Probe]].
*/
def probe[T]()(implicit system: ActorSystem) = new Source[T, TestPublisher.Probe[T]](new StreamTestKit.ProbeSource(none, SourceShape(Outlet("ProbeSource.out"))))
def probe[T](implicit system: ActorSystem) = new Source[T, TestPublisher.Probe[T]](new StreamTestKit.ProbeSource(none, SourceShape(Outlet("ProbeSource.out"))))
}

View file

@ -17,7 +17,7 @@ class StreamTestKitSpec extends AkkaSpec {
"A TestSink Probe" must {
"#toStrict" in {
Source(1 to 4).runWith(TestSink.probe())
Source(1 to 4).runWith(TestSink.probe)
.toStrict(300.millis) should ===(List(1, 2, 3, 4))
}
@ -33,7 +33,7 @@ class StreamTestKitSpec extends AkkaSpec {
case n n
}
}
}).runWith(TestSink.probe())
}).runWith(TestSink.probe)
.toStrict(300.millis)
}.getMessage
@ -42,40 +42,40 @@ class StreamTestKitSpec extends AkkaSpec {
}
"#toStrict when subscription was already obtained" in {
val p = Source(1 to 4).runWith(TestSink.probe())
val p = Source(1 to 4).runWith(TestSink.probe)
p.expectSubscription()
p.toStrict(300.millis) should ===(List(1, 2, 3, 4))
}
"#expectNextOrError with right element" in {
Source(1 to 4).runWith(TestSink.probe())
Source(1 to 4).runWith(TestSink.probe)
.request(4)
.expectNextOrError(1, ex)
}
"#expectNextOrError with right exception" in {
Source.failed[Int](ex).runWith(TestSink.probe())
Source.failed[Int](ex).runWith(TestSink.probe)
.request(4)
.expectNextOrError(1, ex)
}
"#expectNextOrError fail if the next element is not the expected one" in {
intercept[AssertionError] {
Source(1 to 4).runWith(TestSink.probe())
Source(1 to 4).runWith(TestSink.probe)
.request(4)
.expectNextOrError(100, ex)
}.getMessage should include("OnNext(1)")
}
"#expectError" in {
Source.failed[Int](ex).runWith(TestSink.probe())
Source.failed[Int](ex).runWith(TestSink.probe)
.request(1)
.expectError() should ===(ex)
}
"#expectError fail if no error signalled" in {
intercept[AssertionError] {
Source(1 to 4).runWith(TestSink.probe())
Source(1 to 4).runWith(TestSink.probe)
.request(1)
.expectError()
}.getMessage should include("OnNext")
@ -83,7 +83,7 @@ class StreamTestKitSpec extends AkkaSpec {
"#expectComplete should fail if error signalled" in {
intercept[AssertionError] {
Source.failed[Int](ex).runWith(TestSink.probe())
Source.failed[Int](ex).runWith(TestSink.probe)
.request(1)
.expectComplete()
}.getMessage should include("OnError")
@ -91,33 +91,33 @@ class StreamTestKitSpec extends AkkaSpec {
"#expectComplete should fail if next element signalled" in {
intercept[AssertionError] {
Source(1 to 4).runWith(TestSink.probe())
Source(1 to 4).runWith(TestSink.probe)
.request(1)
.expectComplete()
}.getMessage should include("OnNext")
}
"#expectNextOrComplete with right element" in {
Source(1 to 4).runWith(TestSink.probe())
Source(1 to 4).runWith(TestSink.probe)
.request(4)
.expectNextOrComplete(1)
}
"#expectNextOrComplete with completion" in {
Source.single(1).runWith(TestSink.probe())
Source.single(1).runWith(TestSink.probe)
.request(4)
.expectNextOrComplete(1)
.expectNextOrComplete(1337)
}
"#expectNextN given a number of elements" in {
Source(1 to 4).runWith(TestSink.probe())
Source(1 to 4).runWith(TestSink.probe)
.request(4)
.expectNextN(4) should ===(List(1, 2, 3, 4))
}
"#expectNextN given specific elements" in {
Source(1 to 4).runWith(TestSink.probe())
Source(1 to 4).runWith(TestSink.probe)
.request(4)
.expectNextN(4) should ===(List(1, 2, 3, 4))
}

View file

@ -172,7 +172,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
implicit val timeout = Timeout(500.millis)
try {
val p = SynchronousFileSource(manyLines).runWith(TestSink.probe())(mat)
val p = SynchronousFileSource(manyLines).runWith(TestSink.probe)(mat)
mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get
@ -188,7 +188,7 @@ class SynchronousFileSourceSpec extends AkkaSpec(UnboundedMailboxConfig) {
try {
val p = SynchronousFileSource(manyLines)
.withAttributes(ActorAttributes.dispatcher("akka.actor.default-dispatcher"))
.runWith(TestSink.probe())(mat)
.runWith(TestSink.probe)(mat)
mat.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "File").get

View file

@ -4,14 +4,14 @@
package akka.stream.scaladsl
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Supervision}
import akka.stream.{ ActorAttributes, ActorMaterializer, ActorMaterializerSettings, Supervision }
import akka.stream.testkit.AkkaSpec
import akka.stream.testkit.Utils._
import scala.collection.immutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.concurrent.forkjoin.ThreadLocalRandom.{current => random}
import scala.concurrent.forkjoin.ThreadLocalRandom.{ current random }
class FlowScanSpec extends AkkaSpec {
@ -41,8 +41,8 @@ class FlowScanSpec extends AkkaSpec {
}
"emit values promptly" in {
Source.single(1).concat(Source.lazyEmpty).scan(0)(_ + _).grouped(2).runWith(TestSink.probe())
.toStrict(1.second) should ===(Seq(0, 1))
val f = Source.single(1).concat(Source.lazyEmpty).scan(0)(_ + _).grouped(2).runWith(Sink.head)
Await.result(f, 1.second) should ===(Seq(0, 1))
}
"fail properly" in {
@ -51,7 +51,7 @@ class FlowScanSpec extends AkkaSpec {
require(current > 0)
old + current
}.withAttributes(supervisionStrategy(Supervision.restartingDecider))
Source(List(1, 3, -1, 5, 7)).via(scan).runWith(TestSink.probe())
Source(List(1, 3, -1, 5, 7)).via(scan).runWith(TestSink.probe)
.toStrict(1.second) should ===(Seq(0, 1, 4, 0, 5, 12))
}
}

View file

@ -300,7 +300,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
emit(Iterator(elem, elem), ctx)
}
})
.runWith(TestSink.probe[Int]())
.runWith(TestSink.probe[Int])
.request(1000)
.expectNext(1)
.cancel()
@ -444,7 +444,7 @@ class FlowStageSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug
terminationEmit(Iterator("byebye"), ctx)
}
})
.runWith(TestSink.probe[String]())
.runWith(TestSink.probe[String])
.request(1)
.expectNext("hi1")
.request(2)