From 3ae85e8cd09d2fa8f0a2b6c2dc9a4aa98ca80398 Mon Sep 17 00:00:00 2001 From: Alex Date: Thu, 27 May 2021 10:53:18 +0400 Subject: [PATCH] Change tests to use new Source.queue api, #29801 (#30070) --- .../akka/remote/artery/SendQueueBenchmark.scala | 2 +- .../java/jdocs/stream/IntegrationDocTest.java | 4 ++-- .../scala/docs/stream/IntegrationDocSpec.scala | 5 +++-- .../java/akka/stream/javadsl/SourceTest.java | 15 +++++++-------- .../akka/stream/scaladsl/FlowGroupBySpec.scala | 2 +- .../scala/akka/stream/scaladsl/SourceSpec.scala | 16 ++++++++-------- .../javadsl/ActorSourceSinkCompileTest.java | 4 ++-- .../typed/scaladsl/ActorSourceSinkSpec.scala | 6 +++--- 8 files changed, 27 insertions(+), 27 deletions(-) diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala index cfe62fe035..31e367a2b9 100644 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala +++ b/akka-bench-jmh/src/main/scala/akka/remote/artery/SendQueueBenchmark.scala @@ -53,7 +53,7 @@ class SendQueueBenchmark { val N = 100000 val burstSize = 1000 - val source = Source.queue[Int](1024, OverflowStrategy.dropBuffer) + val source = Source.queue[Int](1024) val (queue, killSwitch) = source .viaMat(KillSwitches.single)(Keep.both) diff --git a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java index db8b8fc8a5..5ae3a48609 100644 --- a/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java +++ b/akka-docs/src/test/java/jdocs/stream/IntegrationDocTest.java @@ -749,8 +749,8 @@ public class IntegrationDocTest extends AbstractJavaTest { int bufferSize = 10; int elementsToProcess = 5; - SourceQueueWithComplete sourceQueue = - Source.queue(bufferSize, OverflowStrategy.backpressure()) + BoundedSourceQueue sourceQueue = + Source.queue(bufferSize) .throttle(elementsToProcess, Duration.ofSeconds(3)) .map(x -> x * x) .to(Sink.foreach(x -> System.out.println("got: " + x))) diff --git a/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala b/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala index 74a104fe0c..9833b7de49 100644 --- a/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala +++ b/akka-docs/src/test/scala/docs/stream/IntegrationDocSpec.scala @@ -22,6 +22,7 @@ import scala.concurrent.ExecutionContext import java.util.concurrent.atomic.AtomicInteger import akka.stream.scaladsl.Flow +import org.scalacheck.Gen.const object IntegrationDocSpec { import TwitterStreamQuickstartDocSpec._ @@ -469,7 +470,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { val elementsToProcess = 5 val queue = Source - .queue[Int](bufferSize, OverflowStrategy.backpressure) + .queue[Int](bufferSize) .throttle(elementsToProcess, 3.second) .map(x => x * x) .toMat(Sink.foreach(x => println(s"completed $x")))(Keep.left) @@ -479,7 +480,7 @@ class IntegrationDocSpec extends AkkaSpec(IntegrationDocSpec.config) { implicit val ec = system.dispatcher source - .mapAsync(1)(x => { + .map(x => { queue.offer(x).map { case QueueOfferResult.Enqueued => println(s"enqueued $x") case QueueOfferResult.Dropped => println(s"dropped $x") diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 8f16150472..7ae5dcabb0 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -636,9 +636,9 @@ public class SourceTest extends StreamTest { @Test public void mustBeAbleToUseQueue() throws Exception { - final Pair, CompletionStage>> x = - Flow.of(String.class).runWith(Source.queue(2, OverflowStrategy.fail()), Sink.seq(), system); - final SourceQueueWithComplete source = x.first(); + final Pair, CompletionStage>> x = + Flow.of(String.class).runWith(Source.queue(2), Sink.seq(), system); + final BoundedSourceQueue source = x.first(); final CompletionStage> result = x.second(); source.offer("hello"); source.offer("world"); @@ -833,20 +833,19 @@ public class SourceTest extends StreamTest { @Test public void mustBeAbleToCombineMat() throws Exception { final TestKit probe = new TestKit(system); - final Source> source1 = - Source.queue(1, OverflowStrategy.dropNew()); + final Source> source1 = Source.queue(2); final Source source2 = Source.from(Arrays.asList(2, 3)); - // compiler to check the correct materialized value of type = SourceQueueWithComplete + // compiler to check the correct materialized value of type = BoundedSourceQueue // available - final Source> combined = + final Source> combined = Source.combineMat( source1, source2, width -> Concat.create(width), Keep.left()); // Keep.left() (i.e. preserve queueSource's materialized value) - SourceQueueWithComplete queue = + BoundedSourceQueue queue = combined .toMat( Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), Keep.left()) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index e4837d747e..9b0154065f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -656,7 +656,7 @@ class FlowGroupBySpec extends StreamSpec(""" "not block all substreams when one is blocked but has a buffer in front" in assertAllStagesStopped { case class Elem(id: Int, substream: Int, f: () => Any) val queue = Source - .queue[Elem](3, OverflowStrategy.backpressure) + .queue[Elem](3) .groupBy(2, _.substream) .buffer(2, OverflowStrategy.backpressure) .map { _.f() } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 7359bcab32..67c52b3d24 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -171,11 +171,11 @@ class SourceSpec extends StreamSpec with DefaultTimeout { } "combine from two inputs with combinedMat and take a materialized value" in { - val queueSource = Source.queue[Int](1, OverflowStrategy.dropBuffer) + val queueSource = Source.queue[Int](3) val intSeqSource = Source(1 to 3) // compiler to check the correct materialized value of type = SourceQueueWithComplete[Int] available - val combined1: Source[Int, SourceQueueWithComplete[Int]] = + val combined1: Source[Int, BoundedSourceQueue[Int]] = Source.combineMat(queueSource, intSeqSource)(Concat(_))(Keep.left) //Keep.left (i.e. preserve queueSource's materialized value) val (queue1, sinkProbe1) = combined1.toMat(TestSink.probe[Int])(Keep.both).run() @@ -192,7 +192,7 @@ class SourceSpec extends StreamSpec with DefaultTimeout { sinkProbe1.expectNext(3) // compiler to check the correct materialized value of type = SourceQueueWithComplete[Int] available - val combined2: Source[Int, SourceQueueWithComplete[Int]] = + val combined2: Source[Int, BoundedSourceQueue[Int]] = //queueSource to be the second of combined source Source.combineMat(intSeqSource, queueSource)(Concat(_))(Keep.right) //Keep.right (i.e. preserve queueSource's materialized value) @@ -390,7 +390,7 @@ class SourceSpec extends StreamSpec with DefaultTimeout { } "allow for multiple downstream materialized sources" in { - val matValPoweredSource = Source.queue[String](Int.MaxValue, OverflowStrategy.fail) + val matValPoweredSource = Source.queue[String](Int.MaxValue) val (mat, src) = matValPoweredSource.preMaterialize() val probe1 = src.runWith(TestSink.probe[String]) @@ -398,25 +398,25 @@ class SourceSpec extends StreamSpec with DefaultTimeout { probe1.request(1) probe2.request(1) - mat.offer("One").futureValue + mat.offer("One") probe1.expectNext("One") probe2.expectNext("One") } "survive cancellations of downstream materialized sources" in { - val matValPoweredSource = Source.queue[String](Int.MaxValue, OverflowStrategy.fail) + val matValPoweredSource = Source.queue[String](Int.MaxValue) val (mat, src) = matValPoweredSource.preMaterialize() val probe1 = src.runWith(TestSink.probe[String]) src.runWith(Sink.cancelled) probe1.request(1) - mat.offer("One").futureValue + mat.offer("One") probe1.expectNext("One") } "propagate failures to downstream materialized sources" in { - val matValPoweredSource = Source.queue[String](Int.MaxValue, OverflowStrategy.fail) + val matValPoweredSource = Source.queue[String](Int.MaxValue) val (mat, src) = matValPoweredSource.preMaterialize() val probe1 = src.runWith(TestSink.probe[String]) diff --git a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java index 153fa0c58e..d11f735484 100644 --- a/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java +++ b/akka-stream-typed/src/test/java/akka/stream/typed/javadsl/ActorSourceSinkCompileTest.java @@ -34,7 +34,7 @@ public class ActorSourceSinkCompileTest { { final ActorRef ref = null; - Source.queue(10, OverflowStrategy.dropBuffer()) + Source.queue(10) .map(s -> s + "!") .to(ActorSink.actorRef(ref, "DONE", ex -> "FAILED: " + ex.getMessage())); } @@ -42,7 +42,7 @@ public class ActorSourceSinkCompileTest { { final ActorRef ref = null; - Source.queue(10, OverflowStrategy.dropBuffer()) + Source.queue(10) .to( ActorSink.actorRefWithBackpressure( ref, diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala index 33caf4981f..68a27946d4 100644 --- a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala +++ b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/ActorSourceSinkSpec.scala @@ -33,7 +33,7 @@ class ActorSourceSinkSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike val in = Source - .queue[String](10, OverflowStrategy.dropBuffer) + .queue[String](10) .map(_ + "!") .to(ActorSink.actorRef(p.ref, "DONE", ex => "FAILED: " + ex.getMessage)) .run() @@ -65,7 +65,7 @@ class ActorSourceSinkSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike val in = Source - .queue[String](10, OverflowStrategy.dropBuffer) + .queue[String](10) .to(ActorSink.actorRefWithBackpressure(pilotRef, Msg.apply, Init.apply, "ACK", Complete, _ => Failed)) .run() @@ -102,7 +102,7 @@ class ActorSourceSinkSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike val in = Source - .queue[String](10, OverflowStrategy.dropBuffer) + .queue[String](10) .to(ActorSink.actorRefWithBackpressure(pilotRef, Msg.apply, Init.apply, Complete, _ => Failed)) .run()