From 78a357e0f5a72b3c9561677b4cf21bfdf3df3795 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Tue, 14 Jan 2020 11:22:20 +0100 Subject: [PATCH 1/9] Update sbt-reproducible-builds to 0.23 (#28485) --- project/plugins.sbt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/plugins.sbt b/project/plugins.sbt index 08171600e1..f1e8ae70f0 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -23,5 +23,5 @@ addSbtPlugin("com.lightbend.akka" % "sbt-paradox-akka" % "0.29") addSbtPlugin("com.lightbend" % "sbt-whitesource" % "0.1.18") addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.4.0") // for maintenance of copyright file header addSbtPlugin("com.hpe.sbt" % "sbt-pull-request-validator" % "1.0.0") -addSbtPlugin("net.bzzt" % "sbt-reproducible-builds" % "0.22") +addSbtPlugin("net.bzzt" % "sbt-reproducible-builds" % "0.23") addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.0.0") From 4c9c05cfde9b19bee61ea75654db6d0d095721e4 Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Tue, 14 Jan 2020 13:41:46 +0100 Subject: [PATCH 2/9] Check headers on travis (#28466) To check them on 2.13 early as well --- .travis.yml | 2 +- .../mima-filters/2.5.27.backwards.excludes/28459-update-sbt | 0 2 files changed, 1 insertion(+), 1 deletion(-) create mode 100644 akka-actor/src/main/mima-filters/2.5.27.backwards.excludes/28459-update-sbt diff --git a/.travis.yml b/.travis.yml index b579fd83e5..34f8aaf72c 100644 --- a/.travis.yml +++ b/.travis.yml @@ -26,7 +26,7 @@ cache: - $HOME/.jabba/jdk # script for the default 'test' stage: -script: sbt -jvm-opts .jvmopts-travis -Dakka.build.scalaVersion=$TRAVIS_SCALA_VERSION ";update ;mimaReportBinaryIssues ;test:compile ;validateCompile" +script: sbt -jvm-opts .jvmopts-travis -Dakka.build.scalaVersion=$TRAVIS_SCALA_VERSION ";update ;mimaReportBinaryIssues ;test:compile ;validateCompile ;headerCheckAll" jobs: include: diff --git a/akka-actor/src/main/mima-filters/2.5.27.backwards.excludes/28459-update-sbt b/akka-actor/src/main/mima-filters/2.5.27.backwards.excludes/28459-update-sbt new file mode 100644 index 0000000000..e69de29bb2 From 044e706eb9339a6055f15836b8b5e9c53efecf55 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 14 Jan 2020 13:53:05 +0100 Subject: [PATCH 3/9] Incorrect assumptions in LazyAndFutureSourcesTest (#28479) --- .../stream/javadsl/LazyAndFutureSourcesTest.java | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/LazyAndFutureSourcesTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/LazyAndFutureSourcesTest.java index 820a93fbb6..5e39ae4f34 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/LazyAndFutureSourcesTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/LazyAndFutureSourcesTest.java @@ -4,7 +4,6 @@ package akka.stream.javadsl; -import akka.Done; import akka.NotUsed; import akka.japi.Pair; import akka.stream.StreamTest; @@ -60,7 +59,9 @@ public class LazyAndFutureSourcesTest extends StreamTest { CompletionStage nestedMatVal = result.first(); CompletionStage> list = result.second(); assertEquals(Arrays.asList("one"), list.toCompletableFuture().get(3, TimeUnit.SECONDS)); - assertEquals(true, nestedMatVal.toCompletableFuture().isDone()); + // Future adaption to completionstage of matval means we cannot count on matval future being + // completed just because stream is + nestedMatVal.toCompletableFuture().get(3, TimeUnit.SECONDS); } @Test @@ -87,7 +88,9 @@ public class LazyAndFutureSourcesTest extends StreamTest { CompletionStage nestedMatVal = result.first(); CompletionStage> list = result.second(); assertEquals(Arrays.asList("one"), list.toCompletableFuture().get(3, TimeUnit.SECONDS)); - assertEquals(true, nestedMatVal.toCompletableFuture().isDone()); + // Future adaption to completionstage of matval means we cannot count on matval future being + // completed just because stream is + nestedMatVal.toCompletableFuture().get(3, TimeUnit.SECONDS); } @Test @@ -101,6 +104,8 @@ public class LazyAndFutureSourcesTest extends StreamTest { CompletionStage nestedMatVal = result.first(); CompletionStage> list = result.second(); assertEquals(Arrays.asList("one"), list.toCompletableFuture().get(3, TimeUnit.SECONDS)); - assertEquals(true, nestedMatVal.toCompletableFuture().isDone()); + // flatMap/thenCompose of matval means we cannot count on matval future being completed just + // because stream is + nestedMatVal.toCompletableFuture().get(3, TimeUnit.SECONDS); } } From 3ca7cb0f9492208c8e99fc442e3f627f6cd3bfb9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 14 Jan 2020 16:03:11 +0100 Subject: [PATCH 4/9] Examples for the sliding operator #25468 --- .../operators/Source-or-Flow/sliding.md | 37 ++++++++++-- .../operators/sourceorflow/Sliding.java | 56 +++++++++++++++++++ .../operators/sourceorflow/Sliding.scala | 48 ++++++++++++++++ 3 files changed, 136 insertions(+), 5 deletions(-) create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Sliding.java create mode 100644 akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Sliding.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/sliding.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/sliding.md index aca224c22d..83b17ecddb 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/sliding.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/sliding.md @@ -4,13 +4,9 @@ Provide a sliding window over the incoming stream and pass the windows as groups @ref[Simple operators](../index.md#simple-operators) -@@@div { .group-scala } - ## Signature -@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #sliding } - -@@@ +@apidoc[Flow.sliding](Flow) { scala="#sliding(n:Int,step:Int):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#sliding(int,int)" } ## Description @@ -18,6 +14,37 @@ Provide a sliding window over the incoming stream and pass the windows as groups Note: the last window might be smaller than the requested size due to end of stream. +## Examples + +In this first sample we just see the behavior of the operator itself, first with a window of 2 elements and @scala[the default +`step` which is 1]@java[a step value of 1]. + +Scala +: @@snip [Sliding.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Sliding.scala) { #sliding-1 } + +Java +: @@snip [Sliding.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Sliding.java) { #sliding-1 } + +If the stream stops without having seen enough elements to fill a window, the last window will have as many elements +was emitted before the stream ended. Here we also provide a step to move two elements forward for each window: + +Scala +: @@snip [Sliding.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Sliding.scala) { #sliding-2 } + +Java +: @@snip [Sliding.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Sliding.java) { #sliding-2 } + +One use case for sliding is to implement a moving average, here we do that with a "period" of `5`: + +Scala +: @@snip [Sliding.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Sliding.scala) { #moving-average } + +Java +: @@snip [Sliding.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Sliding.java) { #moving-average } + +Sliding can also be used to do simple windowing, see @ref[splitAfter](splitAfter.md). + + ## Reactive Streams semantics @@@div { .callout } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Sliding.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Sliding.java new file mode 100644 index 0000000000..2f8cc3060f --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Sliding.java @@ -0,0 +1,56 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package jdocs.stream.operators.sourceorflow; + +import akka.NotUsed; +import akka.actor.typed.ActorSystem; +import akka.stream.javadsl.Source; + +import java.util.Arrays; +import java.util.stream.Collectors; + +public class Sliding { + + private final ActorSystem system = null; + + public void slidingExample1() { + // #sliding-1 + Source source = Source.range(1, 4); + source.sliding(2, 1).runForeach(n -> System.out.println(n), system); + // prints: + // [1, 2] + // [2, 3] + // [3, 4] + // #sliding-1 + } + + public void slidingExample2() { + // #sliding-2 + Source source = Source.range(1, 4); + source.sliding(3, 2).runForeach(n -> System.out.println(n), system); + // prints: + // Vector(1, 2, 3) + // [1, 2, 3] + // [3, 4] - shorter because stream ended before we got 3 elements + // #sliding-2 + } + + public void slidingExample3() { + // #moving-average + Source numbers = Source.from(Arrays.asList(1, 3, 10, 2, 3, 4, 2, 10, 11)); + Source movingAverage = + numbers + .sliding(5, 1) + .map(window -> ((float) window.stream().mapToInt(i -> i).sum()) / window.size()); + movingAverage.runForeach(n -> System.out.println(n), system); + // prints + // 3.8 = average of 1, 3, 10, 2, 3 + // 4.4 = average of 3, 10, 2, 3, 4 + // 4.2 = average of 10, 2, 3, 4, 2 + // 4.2 = average of 2, 3, 4, 2, 10 + // 6.0 = average of 3, 4, 2, 10, 11 + // #moving-average + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Sliding.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Sliding.scala new file mode 100644 index 0000000000..a07d5c0d1a --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Sliding.scala @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import akka.stream.scaladsl.Source +import akka.actor.ActorSystem + +object Sliding { + implicit val system: ActorSystem = ??? + + def slidingExample1(): Unit = { + //#sliding-1 + val source = Source(1 to 4) + source.sliding(2).runForeach(println) + // prints: + // Vector(1, 2) + // Vector(2, 3) + // Vector(3, 4) + //#sliding-1 + } + + def slidingExample2(): Unit = { + //#sliding-2 + val source = Source(1 to 4) + source.sliding(n = 3, step = 2).runForeach(println) + // prints: + // Vector(1, 2, 3) + // Vector(3, 4) - shorter because stream ended before we got 3 elements + //#sliding-2 + } + + def slidingExample3(): Unit = { + //#moving-average + val numbers = Source(1 :: 3 :: 10 :: 2 :: 3 :: 4 :: 2 :: 10 :: 11 :: Nil) + val movingAverage = numbers.sliding(5).map(window => window.sum.toFloat / window.size) + movingAverage.runForeach(println) + // prints + // 3.8 = average of 1, 3, 10, 2, 3 + // 4.4 = average of 3, 10, 2, 3, 4 + // 4.2 = average of 10, 2, 3, 4, 2 + // 4.2 = average of 2, 3, 4, 2, 10 + // 6.0 = average of 3, 4, 2, 10, 11 + //#moving-average + } + +} From f891f17abdb4241184f59cba8c212140196e4542 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Wed, 15 Jan 2020 10:22:43 +0100 Subject: [PATCH 5/9] Increase logger startup timeout in cluster metric tests #27955 --- .../akka/cluster/metrics/ClusterMetricsExtensionSpec.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/ClusterMetricsExtensionSpec.scala b/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/ClusterMetricsExtensionSpec.scala index 5fe930fbac..869cec5193 100644 --- a/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/ClusterMetricsExtensionSpec.scala +++ b/akka-cluster-metrics/src/multi-jvm/scala/akka/cluster/metrics/ClusterMetricsExtensionSpec.scala @@ -43,7 +43,10 @@ trait ClusterMetricsCommonConfig extends MultiNodeConfig { """) // Activate slf4j logging along with test listener. - def customLogging = parseString("""akka.loggers=["akka.testkit.TestEventListener","akka.event.slf4j.Slf4jLogger"]""") + def customLogging = parseString(""" + akka.loggers=["akka.testkit.TestEventListener","akka.event.slf4j.Slf4jLogger"] + akka.logger-startup-timeout = 15s + """) } object ClusterMetricsDisabledConfig extends ClusterMetricsCommonConfig { From a8086e86e5cc30698a3c956efad3d12fb18a0ac3 Mon Sep 17 00:00:00 2001 From: Christopher Batey Date: Wed, 15 Jan 2020 12:20:41 +0000 Subject: [PATCH 6/9] Operator doc for flatmap merge (#28488) --- .../operators/Source-or-Flow/flatMapMerge.md | 29 +++++++++++---- .../operators/sourceorflow/FlatMapMerge.java | 35 +++++++++++++++++++ .../operators/sourceorflow/FlatMapMerge.scala | 32 +++++++++++++++++ 3 files changed, 89 insertions(+), 7 deletions(-) create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapMerge.java create mode 100644 akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapMerge.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapMerge.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapMerge.md index f944daac7a..44cbaffd30 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapMerge.md +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/flatMapMerge.md @@ -4,18 +4,33 @@ Transform each input element into a `Source` whose elements are then flattened i @ref[Nesting and flattening operators](../index.md#nesting-and-flattening-operators) -@@@div { .group-scala } - ## Signature -@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #flatMapMerge } - -@@@ +@apidoc[Flow.flatMapMerge](Flow) { scala="#flatMapMerge[T,M](breadth:Int,f:Out=%3Eakka.stream.Graph[akka.stream.SourceShape[T],M]):FlowOps.this.Repr[T]" java="#flatMapMerge(int,akka.japi.function.Function)" } ## Description Transform each input element into a `Source` whose elements are then flattened into the output stream through -merging. The maximum number of merged sources has to be specified. +merging. The maximum number of merged sources has to be specified. When this is met `flatMapMerge` does not +request any more elements meaning that it back pressures until one of the existing `Source`s completes. +Order of the elements for each `Source` is preserved but there is no deterministic order between elements from +different active `Source`s. + +See also: @ref:[flatMapConcat](flatMapConcat.md) + +## Example + +In the following example `flatMapMerge` is used to create a `Source` for each incoming customerId. This could, for example, +be a calculation or a query to a database. There can be `breadth` active sources at any given time so +events for different customers could interleave in any order but events for the same customer will be in the +order emitted by the underlying `Source`; + +Scala +: @@snip [FlatMapMerge.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapMerge.scala) { #flatmap-merge } + +Java +: @@snip [FlatMapMerge.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapMerge.java) { #flatmap-merge } + ## Reactive Streams semantics @@ -23,7 +38,7 @@ merging. The maximum number of merged sources has to be specified. **emits** when one of the currently consumed substreams has an element available -**backpressures** when downstream backpressures +**backpressures** when downstream backpressures or the max number of substreams is reached **completes** when upstream completes and all consumed substreams complete diff --git a/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapMerge.java b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapMerge.java new file mode 100644 index 0000000000..c0c8da7ecb --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapMerge.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package jdocs.stream.operators.sourceorflow; + +import akka.NotUsed; +import akka.actor.ActorSystem; +import akka.stream.javadsl.Source; + +import java.util.Arrays; + +public class FlatMapMerge { + private static ActorSystem system = null; + + // #flatmap-merge + // e.g. could be a query to a database + private Source lookupCustomerEvents(String customerId) { + return Source.from(Arrays.asList(customerId + "-evt-1", customerId + "-evt-2")); + } + // #flatmap-merge + + void example() { + // #flatmap-merge + Source.from(Arrays.asList("customer-1", "customer-2")) + .flatMapMerge(10, this::lookupCustomerEvents) + .runForeach(System.out::println, system); + // prints - events from different customers could interleave + // customer-1-evt-1 + // customer-2-evt-1 + // customer-1-evt-2 + // customer-2-evt-2 + // #flatmap-merge + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapMerge.scala b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapMerge.scala new file mode 100644 index 0000000000..33f6a3fe8f --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapMerge.scala @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package docs.stream.operators.sourceorflow + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source + +object FlatMapMerge { + + implicit val system: ActorSystem = ActorSystem() + + // #flatmap-merge + val source: Source[String, NotUsed] = Source(List("customer-1", "customer-2")) + + // e.g. could b a query to a database + def lookupCustomerEvents(customerId: String): Source[String, NotUsed] = { + Source(List(s"$customerId-evt-1", s"$customerId-evt2")) + } + + source.flatMapMerge(10, customerId => lookupCustomerEvents(customerId)).runForeach(println) + + // prints - events from different customers could interleave + // customer-1-evt-1 + // customer-2-evt-1 + // customer-1-evt-2 + // customer-2-evt-2 + // #flatmap-merge + +} From 087ed4b5a5d7c5c14d319a385484d45f32e9d87a Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Wed, 15 Jan 2020 15:04:15 +0100 Subject: [PATCH 7/9] Add source.completionStage example #25468 (#28338) --- .../operators/Source/completionStage.md | 6 ++++ .../paradox/stream/operators/Source/future.md | 3 +- .../operators/source/FromCompletionStage.java | 34 +++++++++++++++++++ 3 files changed, 42 insertions(+), 1 deletion(-) create mode 100644 akka-docs/src/test/java/jdocs/stream/operators/source/FromCompletionStage.java diff --git a/akka-docs/src/main/paradox/stream/operators/Source/completionStage.md b/akka-docs/src/main/paradox/stream/operators/Source/completionStage.md index 4c4358eb83..c95230ffe7 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/completionStage.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/completionStage.md @@ -30,3 +30,9 @@ For the corresponding operator for the Scala standard library `Future` see @ref: @@@ +## Example + +Java +: @@snip [SourceFromCompletionStage.java](/akka-docs/src/test/java/jdocs/stream/operators/source/FromCompletionStage.java) { #sourceFromCompletionStage } + +For the corresponding operator for the Scala standard library `Future` see @ref:[future](future.md). diff --git a/akka-docs/src/main/paradox/stream/operators/Source/future.md b/akka-docs/src/main/paradox/stream/operators/Source/future.md index 8cd087ecd9..f39bb5ca37 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/future.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/future.md @@ -1,4 +1,4 @@ -# fromFuture +# future Send the single value of the `Future` when it completes and there is demand. @@ -20,6 +20,7 @@ If the future fails the stream is failed with that exception. For the corresponding operator for the Java standard library `CompletionStage` see @ref:[completionStage](completionStage.md). ## Example + Scala : @@snip [SourceFromFuture.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #sourceFromFuture } diff --git a/akka-docs/src/test/java/jdocs/stream/operators/source/FromCompletionStage.java b/akka-docs/src/test/java/jdocs/stream/operators/source/FromCompletionStage.java new file mode 100644 index 0000000000..7623b8b693 --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/source/FromCompletionStage.java @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2018-2020 Lightbend Inc. + */ + +package jdocs.stream.operators.source; + +// #sourceFromCompletionStage +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CompletableFuture; + +import akka.NotUsed; +import akka.Done; +import akka.actor.typed.ActorSystem; +import akka.stream.javadsl.*; + +// #sourceFromCompletionStage + +class FromCompletionStage { + + public static void sourceFromCompletionStage() { + // Use one ActorSystem per application + ActorSystem system = null; + + // #sourceFromCompletionStage + CompletionStage stage = CompletableFuture.completedFuture(10); + + Source source = Source.completionStage(stage); + + Sink> sink = Sink.foreach(i -> System.out.println(i.toString())); + + source.runWith(sink, system); // 10 + // #sourceFromCompletionStage + } +} From 9b9972a854b2a59f4af60c1aa68d3499f1a6128c Mon Sep 17 00:00:00 2001 From: Arnout Engelen Date: Wed, 15 Jan 2020 16:40:06 +0100 Subject: [PATCH 8/9] Remove deprecated actorRefWithAck from 'pending' lists (#28337) As we don't plan to document this one anymore, so it's already ignored in the 'ignore' list --- project/StreamOperatorsIndexGenerator.scala | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala index 46efa2c928..d3f14b8452 100644 --- a/project/StreamOperatorsIndexGenerator.scala +++ b/project/StreamOperatorsIndexGenerator.scala @@ -70,7 +70,6 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "orElseGraph", "divertToGraph", "zipWithGraph", - "actorRefWithAck" // deprecated ) // FIXME document these methods as well @@ -99,12 +98,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "actorSubscriber", "foldAsync", "newOnCompleteStage", - "actorRefWithAck" // deprecated - ), - "ActorSink" -> Seq("actorRefWithAck" // deprecated - ), - "ActorSource" -> Seq("actorRefWithAck" // deprecated - )) + ) + ) val ignore = Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++ From 098118251da06b4173d7ef3ad6b6cc4a0cbea934 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 16 Jan 2020 02:45:38 +0100 Subject: [PATCH 9/9] Missing Java getters for optional fields on akka.actor.Address (#28493) --- .../src/test/java/akka/actor/AddressTest.java | 25 +++++++++++++++++++ .../src/main/scala/akka/actor/Address.scala | 13 +++++++++- 2 files changed, 37 insertions(+), 1 deletion(-) create mode 100644 akka-actor-tests/src/test/java/akka/actor/AddressTest.java diff --git a/akka-actor-tests/src/test/java/akka/actor/AddressTest.java b/akka-actor-tests/src/test/java/akka/actor/AddressTest.java new file mode 100644 index 0000000000..d5410b4cef --- /dev/null +++ b/akka-actor-tests/src/test/java/akka/actor/AddressTest.java @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package akka.actor; +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +import org.junit.Test; +import org.scalatest.junit.JUnitSuite; + +import java.util.Optional; + +import static org.junit.Assert.assertEquals; + +public class AddressTest extends JUnitSuite { + + @Test + public void portAddressAccessible() { + Address address = new Address("akka", "MySystem", "localhost", 2525); + assertEquals(Optional.of(2525), address.getPort()); + assertEquals(Optional.of("localhost"), address.getHost()); + } +} diff --git a/akka-actor/src/main/scala/akka/actor/Address.scala b/akka-actor/src/main/scala/akka/actor/Address.scala index 6482e26bef..6b57f7c4be 100644 --- a/akka-actor/src/main/scala/akka/actor/Address.scala +++ b/akka-actor/src/main/scala/akka/actor/Address.scala @@ -6,11 +6,12 @@ package akka.actor import java.net.URI import java.net.URISyntaxException import java.net.MalformedURLException +import java.util.Optional import scala.annotation.tailrec import scala.collection.immutable - import akka.annotation.InternalApi +import scala.compat.java8.OptionConverters._ /** * The address specifies the physical location under which an Actor can be @@ -31,6 +32,16 @@ final case class Address private (protocol: String, system: String, host: Option def this(protocol: String, system: String) = this(protocol, system, None, None) def this(protocol: String, system: String, host: String, port: Int) = this(protocol, system, Option(host), Some(port)) + /** + * Java API: The hostname if specified or empty optional if not + */ + def getHost(): Optional[String] = host.asJava + + /** + * Java API: The port if specified or empty optional if not + */ + def getPort(): Optional[Integer] = port.asJava.asInstanceOf[Optional[Integer]] + /** * Returns true if this Address is only defined locally. It is not safe to send locally scoped addresses to remote * hosts. See also [[akka.actor.Address#hasGlobalScope]].