diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala index 551a34c63c..394b270c79 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StreamRefSpec.scala @@ -167,7 +167,7 @@ abstract class StreamRefSpec extends MultiNodeSpec(StreamRefSpec) with MultiNode val ref = expectMsgType[ActorIdentity].ref.get ref ! RequestLogs(1337) val dataSourceRef = expectMsgType[LogsOffer].sourceRef - destinationForSource = dataSourceRef.runWith(TestSink.probe) + destinationForSource = dataSourceRef.runWith(TestSink()) destinationForSource.request(3).expectNext("elem-1").expectNext("elem-2").expectNext("elem-3") } runOn(second) { diff --git a/akka-docs/src/test/scala/docs/stream/GraphStageDocSpec.scala b/akka-docs/src/test/scala/docs/stream/GraphStageDocSpec.scala index 0bcd735f70..5d23d2014c 100644 --- a/akka-docs/src/test/scala/docs/stream/GraphStageDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/GraphStageDocSpec.scala @@ -328,8 +328,8 @@ class GraphStageDocSpec extends AkkaSpec { val switch = Promise[Unit]() val duplicator = Flow.fromGraph(new KillSwitch[Int](switch.future)) - val in = TestPublisher.probe[Int]() - val out = TestSubscriber.probe[Int]() + val in = TestPublisher.Probe[Int]() + val out = TestSubscriber.Probe[Int]() Source .fromPublisher(in) @@ -524,8 +524,8 @@ class GraphStageDocSpec extends AkkaSpec { Await.result(result1, 3.seconds) should ===(Vector(1, 2, 3)) - val subscriber = TestSubscriber.manualProbe[Int]() - val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.ManualProbe[Int]() + val publisher = TestPublisher.Probe[Int]() val flow2 = Source.fromPublisher(publisher).via(new TwoBuffer).to(Sink.fromSubscriber(subscriber)) diff --git a/akka-docs/src/test/scala/docs/stream/RateTransformationDocSpec.scala b/akka-docs/src/test/scala/docs/stream/RateTransformationDocSpec.scala index 8bdd4b585a..5d91a9349b 100644 --- a/akka-docs/src/test/scala/docs/stream/RateTransformationDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/RateTransformationDocSpec.scala @@ -88,7 +88,7 @@ class RateTransformationDocSpec extends AkkaSpec { case (d, _) => latch.countDown(); Iterator.from(1).map(d -> _) } - val (pub, sub) = TestSource.probe[Double].via(realDriftFlow).toMat(TestSink.probe[(Double, Int)])(Keep.both).run() + val (pub, sub) = TestSource.probe[Double].via(realDriftFlow).toMat(TestSink[(Double, Int)]())(Keep.both).run() sub.request(1) pub.sendNext(1.0) @@ -109,7 +109,7 @@ class RateTransformationDocSpec extends AkkaSpec { val latch = TestLatch(2) val realDriftFlow = Flow[Double].expand(d => { latch.countDown(); Iterator.from(0).map(d -> _) }) - val (pub, sub) = TestSource.probe[Double].via(realDriftFlow).toMat(TestSink.probe[(Double, Int)])(Keep.both).run() + val (pub, sub) = TestSource.probe[Double].via(realDriftFlow).toMat(TestSink[(Double, Int)]())(Keep.both).run() sub.request(1) pub.sendNext(1.0) diff --git a/akka-docs/src/test/scala/docs/stream/StreamTestKitDocSpec.scala b/akka-docs/src/test/scala/docs/stream/StreamTestKitDocSpec.scala index e4d5d46975..84af7bc629 100644 --- a/akka-docs/src/test/scala/docs/stream/StreamTestKitDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/StreamTestKitDocSpec.scala @@ -112,7 +112,7 @@ class StreamTestKitDocSpec extends AkkaSpec { //#test-sink-probe val sourceUnderTest = Source(1 to 4).filter(_ % 2 == 0).map(_ * 2) - sourceUnderTest.runWith(TestSink.probe[Int]).request(2).expectNext(4, 8).expectComplete() + sourceUnderTest.runWith(TestSink[Int]()).request(2).expectNext(4, 8).expectComplete() //#test-sink-probe } @@ -142,7 +142,7 @@ class StreamTestKitDocSpec extends AkkaSpec { pattern.after(10.millis * sleep, using = system.scheduler)(Future.successful(sleep)) } - val (pub, sub) = TestSource.probe[Int].via(flowUnderTest).toMat(TestSink.probe[Int])(Keep.both).run() + val (pub, sub) = TestSource.probe[Int].via(flowUnderTest).toMat(TestSink[Int]())(Keep.both).run() sub.request(n = 3) pub.sendNext(3) diff --git a/akka-docs/src/test/scala/docs/stream/cookbook/RecipeAdhocSource.scala b/akka-docs/src/test/scala/docs/stream/cookbook/RecipeAdhocSource.scala index 45d2c3e602..0e0d1a28ed 100644 --- a/akka-docs/src/test/scala/docs/stream/cookbook/RecipeAdhocSource.scala +++ b/akka-docs/src/test/scala/docs/stream/cookbook/RecipeAdhocSource.scala @@ -32,13 +32,13 @@ class RecipeAdhocSource extends RecipeSpec { "not start the source if there is no demand" taggedAs TimingTest in { val isStarted = new AtomicBoolean() adhocSource(Source.empty.mapMaterializedValue(_ => isStarted.set(true)), 200.milliseconds, 3) - .runWith(TestSink.probe[Int]) + .runWith(TestSink[Int]()) Thread.sleep(300) isStarted.get() should be(false) } "start the source when there is a demand" taggedAs TimingTest in { - val sink = adhocSource(Source.repeat("a"), 200.milliseconds, 3).runWith(TestSink.probe[String]) + val sink = adhocSource(Source.repeat("a"), 200.milliseconds, 3).runWith(TestSink[String]()) sink.requestNext("a") } @@ -46,7 +46,7 @@ class RecipeAdhocSource extends RecipeSpec { val shutdown = Promise[Done]() val sink = adhocSource(Source.repeat("a").watchTermination() { (_, term) => shutdown.completeWith(term) - }, 200.milliseconds, 3).runWith(TestSink.probe[String]) + }, 200.milliseconds, 3).runWith(TestSink[String]()) sink.requestNext("a") Thread.sleep(200) @@ -57,7 +57,7 @@ class RecipeAdhocSource extends RecipeSpec { val shutdown = Promise[Done]() val sink = adhocSource(Source.repeat("a").watchTermination() { (_, term) => shutdown.completeWith(term) - }, 200.milliseconds, 3).runWith(TestSink.probe[String]) + }, 200.milliseconds, 3).runWith(TestSink[String]()) sink.requestNext("a") Thread.sleep(100) @@ -81,7 +81,7 @@ class RecipeAdhocSource extends RecipeSpec { val sink = adhocSource(source.watchTermination() { (_, term) => shutdown.completeWith(term) - }, 200.milliseconds, 3).runWith(TestSink.probe[String]) + }, 200.milliseconds, 3).runWith(TestSink[String]()) sink.requestNext("a") startedCount.get() should be(1) @@ -97,7 +97,7 @@ class RecipeAdhocSource extends RecipeSpec { val sink = adhocSource(source.watchTermination() { (_, term) => shutdown.completeWith(term) - }, 200.milliseconds, 3).runWith(TestSink.probe[String]) + }, 200.milliseconds, 3).runWith(TestSink[String]()) sink.requestNext("a") startedCount.get() should be(1) diff --git a/akka-docs/src/test/scala/docs/stream/cookbook/RecipeDroppyBroadcast.scala b/akka-docs/src/test/scala/docs/stream/cookbook/RecipeDroppyBroadcast.scala index 94f6151675..1719c570cf 100644 --- a/akka-docs/src/test/scala/docs/stream/cookbook/RecipeDroppyBroadcast.scala +++ b/akka-docs/src/test/scala/docs/stream/cookbook/RecipeDroppyBroadcast.scala @@ -12,12 +12,12 @@ class RecipeDroppyBroadcast extends RecipeSpec { "Recipe for a droppy broadcast" must { "work" in { - val pub = TestPublisher.probe[Int]() + val pub = TestPublisher.Probe[Int]() val myElements = Source.fromPublisher(pub) - val sub1 = TestSubscriber.manualProbe[Int]() - val sub2 = TestSubscriber.manualProbe[Int]() - val sub3 = TestSubscriber.probe[Int]() + val sub1 = TestSubscriber.ManualProbe[Int]() + val sub2 = TestSubscriber.ManualProbe[Int]() + val sub3 = TestSubscriber.Probe[Int]() val futureSink = Sink.head[Seq[Int]] val mySink1 = Sink.fromSubscriber(sub1) val mySink2 = Sink.fromSubscriber(sub2) diff --git a/akka-stream-testkit/src/main/mima-filters/2.5.x.backwards.excludes/2.6.0.excludes b/akka-stream-testkit/src/main/mima-filters/2.5.x.backwards.excludes/2.6.0.excludes new file mode 100644 index 0000000000..f911ef8bc7 --- /dev/null +++ b/akka-stream-testkit/src/main/mima-filters/2.5.x.backwards.excludes/2.6.0.excludes @@ -0,0 +1,10 @@ +# Akka 2.6.0 changes to cancellation +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.stream.testkit.StreamTestKit#PublisherProbeSubscription.expectCancellation") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.testkit.TestPublisher#CancelSubscription.copy") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.testkit.TestPublisher#CancelSubscription.this") +ProblemFilters.exclude[MissingTypesProblem]("akka.stream.testkit.TestPublisher$CancelSubscription$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.testkit.TestPublisher#CancelSubscription.apply") +ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.testkit.TestPublisher#CancelSubscription.unapply") +# Throwable in errors +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.testkit.StreamTestKit#PublisherProbeSubscription.sendError") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.testkit.TestPublisher#Probe.sendError") diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index b1f72bed65..c2ac10efd2 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -13,8 +13,8 @@ import scala.collection.immutable import scala.concurrent.duration._ import scala.reflect.ClassTag +import akka.actor.ClassicActorSystemProvider import org.reactivestreams.{ Publisher, Subscriber, Subscription } - import akka.actor.{ ActorRef, ActorSystem, DeadLetterSuppression, NoSerializationVerificationNeeded } import akka.stream._ import akka.stream.impl._ @@ -75,6 +75,15 @@ object TestPublisher { def probe[T](initialPendingRequests: Long = 0)(implicit system: ActorSystem): Probe[T] = new Probe(initialPendingRequests) + object ManualProbe { + + /** + * Probe that implements [[org.reactivestreams.Publisher]] interface. + */ + def apply[T](autoOnSubscribe: Boolean = true)(implicit system: ClassicActorSystemProvider): ManualProbe[T] = + new ManualProbe(autoOnSubscribe)(system.classicSystem) + } + /** * Implementation of [[org.reactivestreams.Publisher]] that allows various assertions. * This probe does not track demand. Therefore you need to expect demand before sending @@ -209,6 +218,11 @@ object TestPublisher { def within[T](max: FiniteDuration)(f: => T): T = executeAfterSubscription { probe.within(max)(f) } } + object Probe { + def apply[T](initialPendingRequests: Long = 0)(implicit system: ClassicActorSystemProvider): Probe[T] = + new Probe(initialPendingRequests)(system.classicSystem) + } + /** * Single subscription and demand tracking for [[TestPublisher.ManualProbe]]. */ @@ -306,6 +320,11 @@ object TestSubscriber { def probe[T]()(implicit system: ActorSystem): Probe[T] = new Probe() + object ManualProbe { + def apply[T]()(implicit system: ClassicActorSystemProvider): ManualProbe[T] = + new ManualProbe()(system.classicSystem) + } + /** * Implementation of [[org.reactivestreams.Subscriber]] that allows various assertions. * @@ -785,6 +804,10 @@ object TestSubscriber { def onError(cause: Throwable): Unit = probe.ref ! OnError(cause) } + object Probe { + def apply[T]()(implicit system: ClassicActorSystemProvider): Probe[T] = new Probe()(system.classicSystem) + } + /** * Single subscription tracking for [[ManualProbe]]. */ diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSink.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSink.scala index 183a6c719c..b07cd051f9 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSink.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSink.scala @@ -5,6 +5,7 @@ package akka.stream.testkit.javadsl import akka.actor.ActorSystem +import akka.actor.ClassicActorSystemProvider import akka.stream.javadsl.Sink import akka.stream.testkit._ @@ -17,4 +18,10 @@ object TestSink { def probe[T](system: ActorSystem): Sink[T, TestSubscriber.Probe[T]] = new Sink(scaladsl.TestSink.probe[T](system)) + /** + * A Sink that materialized to a [[akka.stream.testkit.TestSubscriber.Probe]]. + */ + def create[T](system: ClassicActorSystemProvider): Sink[T, TestSubscriber.Probe[T]] = + probe(system.classicSystem) + } diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSource.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSource.scala index 2a6cfa9258..093edcaf4f 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSource.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/javadsl/TestSource.scala @@ -5,6 +5,7 @@ package akka.stream.testkit.javadsl import akka.actor.ActorSystem +import akka.actor.ClassicActorSystemProvider import akka.stream.javadsl.Source import akka.stream.testkit._ @@ -17,4 +18,10 @@ object TestSource { def probe[T](system: ActorSystem): Source[T, TestPublisher.Probe[T]] = new Source(scaladsl.TestSource.probe[T](system)) + /** + * A Source that materializes to a [[akka.stream.testkit.TestPublisher.Probe]]. + */ + def create[T](system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]] = + probe(system.classicSystem) + } diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala index d212414a94..dc455ca7bd 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSink.scala @@ -5,6 +5,7 @@ package akka.stream.testkit.scaladsl import akka.actor.ActorSystem +import akka.actor.ClassicActorSystemProvider import akka.stream._ import akka.stream.Attributes.none import akka.stream.scaladsl._ @@ -23,4 +24,10 @@ object TestSink { def probe[T](implicit system: ActorSystem): Sink[T, Probe[T]] = Sink.fromGraph[T, TestSubscriber.Probe[T]](new ProbeSink(none, SinkShape(Inlet("ProbeSink.in")))) + /** + * A Sink that materialized to a [[akka.stream.testkit.TestSubscriber.Probe]]. + */ + def apply[T]()(implicit system: ClassicActorSystemProvider): Sink[T, Probe[T]] = + probe(system.classicSystem) + } diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala index 6f8cb7d932..918a352e99 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/scaladsl/TestSource.scala @@ -5,6 +5,7 @@ package akka.stream.testkit.scaladsl import akka.actor.ActorSystem +import akka.actor.ClassicActorSystemProvider import akka.stream._ import akka.stream.Attributes.none import akka.stream.scaladsl._ @@ -19,7 +20,13 @@ object TestSource { /** * A Source that materializes to a [[akka.stream.testkit.TestPublisher.Probe]]. */ - def probe[T](implicit system: ActorSystem) = + def probe[T](implicit system: ActorSystem): Source[T, TestPublisher.Probe[T]] = Source.fromGraph[T, TestPublisher.Probe[T]](new ProbeSource(none, SourceShape(Outlet("ProbeSource.out")))) + /** + * A Source that materializes to a [[akka.stream.testkit.TestPublisher.Probe]]. + */ + def apply[T]()(implicit system: ClassicActorSystemProvider): Source[T, TestPublisher.Probe[T]] = + probe(system.classicSystem) + } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala index 32f5ad1630..90349e7fe3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphDSLCompileSpec.scala @@ -317,10 +317,10 @@ class GraphDSLCompileSpec extends StreamSpec { "build with implicits and variance" in { RunnableGraph.fromGraph(GraphDSL.create() { implicit b => - def appleSource = b.add(Source.fromPublisher(TestPublisher.manualProbe[Apple]())) - def fruitSource = b.add(Source.fromPublisher(TestPublisher.manualProbe[Fruit]())) - val outA = b.add(Sink.fromSubscriber(TestSubscriber.manualProbe[Fruit]())) - val outB = b.add(Sink.fromSubscriber(TestSubscriber.manualProbe[Fruit]())) + def appleSource = b.add(Source.fromPublisher(TestPublisher.ManualProbe[Apple]())) + def fruitSource = b.add(Source.fromPublisher(TestPublisher.ManualProbe[Fruit]())) + val outA = b.add(Sink.fromSubscriber(TestSubscriber.ManualProbe[Fruit]())) + val outB = b.add(Sink.fromSubscriber(TestSubscriber.ManualProbe[Fruit]())) val merge = b.add(Merge[Fruit](11)) val unzip = b.add(Unzip[Int, String]()) val whatever = b.add(Sink.asPublisher[Any](false)) diff --git a/build.sbt b/build.sbt index fc6a223a34..862f78aeac 100644 --- a/build.sbt +++ b/build.sbt @@ -420,7 +420,6 @@ lazy val streamTestkit = akkaModule("akka-stream-testkit") .settings(Dependencies.streamTestkit) .settings(AutomaticModuleName.settings("akka.stream.testkit")) .settings(OSGi.streamTestkit) - .disablePlugins(MimaPlugin) lazy val streamTests = akkaModule("akka-stream-tests") .configs(akka.Jdk9.TestJdk9)