Merge remote-tracking branch 'akka/master' into scalatest310

This commit is contained in:
Mikhail Limansky 2020-01-17 16:03:32 +03:00
commit 52c01832da
17 changed files with 321 additions and 28 deletions

View file

@ -26,7 +26,7 @@ cache:
- $HOME/.jabba/jdk
# script for the default 'test' stage:
script: sbt -jvm-opts .jvmopts-travis -Dakka.build.scalaVersion=$TRAVIS_SCALA_VERSION ";update ;mimaReportBinaryIssues ;test:compile ;validateCompile"
script: sbt -jvm-opts .jvmopts-travis -Dakka.build.scalaVersion=$TRAVIS_SCALA_VERSION ";update ;mimaReportBinaryIssues ;test:compile ;validateCompile ;headerCheckAll"
jobs:
include:

View file

@ -0,0 +1,25 @@
/*
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor;
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
import org.junit.Test;
import org.scalatest.junit.JUnitSuite;
import java.util.Optional;
import static org.junit.Assert.assertEquals;
public class AddressTest extends JUnitSuite {
@Test
public void portAddressAccessible() {
Address address = new Address("akka", "MySystem", "localhost", 2525);
assertEquals(Optional.of(2525), address.getPort());
assertEquals(Optional.of("localhost"), address.getHost());
}
}

View file

@ -6,11 +6,12 @@ package akka.actor
import java.net.URI
import java.net.URISyntaxException
import java.net.MalformedURLException
import java.util.Optional
import scala.annotation.tailrec
import scala.collection.immutable
import akka.annotation.InternalApi
import scala.compat.java8.OptionConverters._
/**
* The address specifies the physical location under which an Actor can be
@ -31,6 +32,16 @@ final case class Address private (protocol: String, system: String, host: Option
def this(protocol: String, system: String) = this(protocol, system, None, None)
def this(protocol: String, system: String, host: String, port: Int) = this(protocol, system, Option(host), Some(port))
/**
* Java API: The hostname if specified or empty optional if not
*/
def getHost(): Optional[String] = host.asJava
/**
* Java API: The port if specified or empty optional if not
*/
def getPort(): Optional[Integer] = port.asJava.asInstanceOf[Optional[Integer]]
/**
* Returns true if this Address is only defined locally. It is not safe to send locally scoped addresses to remote
* hosts. See also [[akka.actor.Address#hasGlobalScope]].

View file

@ -43,7 +43,10 @@ trait ClusterMetricsCommonConfig extends MultiNodeConfig {
""")
// Activate slf4j logging along with test listener.
def customLogging = parseString("""akka.loggers=["akka.testkit.TestEventListener","akka.event.slf4j.Slf4jLogger"]""")
def customLogging = parseString("""
akka.loggers=["akka.testkit.TestEventListener","akka.event.slf4j.Slf4jLogger"]
akka.logger-startup-timeout = 15s
""")
}
object ClusterMetricsDisabledConfig extends ClusterMetricsCommonConfig {

View file

@ -4,18 +4,33 @@ Transform each input element into a `Source` whose elements are then flattened i
@ref[Nesting and flattening operators](../index.md#nesting-and-flattening-operators)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #flatMapMerge }
@@@
@apidoc[Flow.flatMapMerge](Flow) { scala="#flatMapMerge[T,M](breadth:Int,f:Out=%3Eakka.stream.Graph[akka.stream.SourceShape[T],M]):FlowOps.this.Repr[T]" java="#flatMapMerge(int,akka.japi.function.Function)" }
## Description
Transform each input element into a `Source` whose elements are then flattened into the output stream through
merging. The maximum number of merged sources has to be specified.
merging. The maximum number of merged sources has to be specified. When this is met `flatMapMerge` does not
request any more elements meaning that it back pressures until one of the existing `Source`s completes.
Order of the elements for each `Source` is preserved but there is no deterministic order between elements from
different active `Source`s.
See also: @ref:[flatMapConcat](flatMapConcat.md)
## Example
In the following example `flatMapMerge` is used to create a `Source` for each incoming customerId. This could, for example,
be a calculation or a query to a database. There can be `breadth` active sources at any given time so
events for different customers could interleave in any order but events for the same customer will be in the
order emitted by the underlying `Source`;
Scala
: @@snip [FlatMapMerge.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/FlatMapMerge.scala) { #flatmap-merge }
Java
: @@snip [FlatMapMerge.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/FlatMapMerge.java) { #flatmap-merge }
## Reactive Streams semantics
@ -23,7 +38,7 @@ merging. The maximum number of merged sources has to be specified.
**emits** when one of the currently consumed substreams has an element available
**backpressures** when downstream backpressures
**backpressures** when downstream backpressures or the max number of substreams is reached
**completes** when upstream completes and all consumed substreams complete

View file

@ -4,13 +4,9 @@ Provide a sliding window over the incoming stream and pass the windows as groups
@ref[Simple operators](../index.md#simple-operators)
@@@div { .group-scala }
## Signature
@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #sliding }
@@@
@apidoc[Flow.sliding](Flow) { scala="#sliding(n:Int,step:Int):FlowOps.this.Repr[scala.collection.immutable.Seq[Out]]" java="#sliding(int,int)" }
## Description
@ -18,6 +14,37 @@ Provide a sliding window over the incoming stream and pass the windows as groups
Note: the last window might be smaller than the requested size due to end of stream.
## Examples
In this first sample we just see the behavior of the operator itself, first with a window of 2 elements and @scala[the default
`step` which is 1]@java[a step value of 1].
Scala
: @@snip [Sliding.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Sliding.scala) { #sliding-1 }
Java
: @@snip [Sliding.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Sliding.java) { #sliding-1 }
If the stream stops without having seen enough elements to fill a window, the last window will have as many elements
was emitted before the stream ended. Here we also provide a step to move two elements forward for each window:
Scala
: @@snip [Sliding.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Sliding.scala) { #sliding-2 }
Java
: @@snip [Sliding.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Sliding.java) { #sliding-2 }
One use case for sliding is to implement a moving average, here we do that with a "period" of `5`:
Scala
: @@snip [Sliding.scala](/akka-docs/src/test/scala/docs/stream/operators/sourceorflow/Sliding.scala) { #moving-average }
Java
: @@snip [Sliding.java](/akka-docs/src/test/java/jdocs/stream/operators/sourceorflow/Sliding.java) { #moving-average }
Sliding can also be used to do simple windowing, see @ref[splitAfter](splitAfter.md).
## Reactive Streams semantics
@@@div { .callout }

View file

@ -30,3 +30,9 @@ For the corresponding operator for the Scala standard library `Future` see @ref:
@@@
## Example
Java
: @@snip [SourceFromCompletionStage.java](/akka-docs/src/test/java/jdocs/stream/operators/source/FromCompletionStage.java) { #sourceFromCompletionStage }
For the corresponding operator for the Scala standard library `Future` see @ref:[future](future.md).

View file

@ -1,4 +1,4 @@
# fromFuture
# future
Send the single value of the `Future` when it completes and there is demand.
@ -20,6 +20,7 @@ If the future fails the stream is failed with that exception.
For the corresponding operator for the Java standard library `CompletionStage` see @ref:[completionStage](completionStage.md).
## Example
Scala
: @@snip [SourceFromFuture.scala](/akka-docs/src/test/scala/docs/stream/operators/SourceOperators.scala) { #sourceFromFuture }

View file

@ -0,0 +1,34 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.source;
// #sourceFromCompletionStage
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CompletableFuture;
import akka.NotUsed;
import akka.Done;
import akka.actor.typed.ActorSystem;
import akka.stream.javadsl.*;
// #sourceFromCompletionStage
class FromCompletionStage {
public static void sourceFromCompletionStage() {
// Use one ActorSystem per application
ActorSystem system = null;
// #sourceFromCompletionStage
CompletionStage<Integer> stage = CompletableFuture.completedFuture(10);
Source<Integer, NotUsed> source = Source.completionStage(stage);
Sink<Integer, CompletionStage<Done>> sink = Sink.foreach(i -> System.out.println(i.toString()));
source.runWith(sink, system); // 10
// #sourceFromCompletionStage
}
}

View file

@ -0,0 +1,35 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.sourceorflow;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Source;
import java.util.Arrays;
public class FlatMapMerge {
private static ActorSystem system = null;
// #flatmap-merge
// e.g. could be a query to a database
private Source<String, NotUsed> lookupCustomerEvents(String customerId) {
return Source.from(Arrays.asList(customerId + "-evt-1", customerId + "-evt-2"));
}
// #flatmap-merge
void example() {
// #flatmap-merge
Source.from(Arrays.asList("customer-1", "customer-2"))
.flatMapMerge(10, this::lookupCustomerEvents)
.runForeach(System.out::println, system);
// prints - events from different customers could interleave
// customer-1-evt-1
// customer-2-evt-1
// customer-1-evt-2
// customer-2-evt-2
// #flatmap-merge
}
}

View file

@ -0,0 +1,56 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.stream.operators.sourceorflow;
import akka.NotUsed;
import akka.actor.typed.ActorSystem;
import akka.stream.javadsl.Source;
import java.util.Arrays;
import java.util.stream.Collectors;
public class Sliding {
private final ActorSystem<Void> system = null;
public void slidingExample1() {
// #sliding-1
Source<Integer, NotUsed> source = Source.range(1, 4);
source.sliding(2, 1).runForeach(n -> System.out.println(n), system);
// prints:
// [1, 2]
// [2, 3]
// [3, 4]
// #sliding-1
}
public void slidingExample2() {
// #sliding-2
Source<Integer, NotUsed> source = Source.range(1, 4);
source.sliding(3, 2).runForeach(n -> System.out.println(n), system);
// prints:
// Vector(1, 2, 3)
// [1, 2, 3]
// [3, 4] - shorter because stream ended before we got 3 elements
// #sliding-2
}
public void slidingExample3() {
// #moving-average
Source<Integer, NotUsed> numbers = Source.from(Arrays.asList(1, 3, 10, 2, 3, 4, 2, 10, 11));
Source<Float, NotUsed> movingAverage =
numbers
.sliding(5, 1)
.map(window -> ((float) window.stream().mapToInt(i -> i).sum()) / window.size());
movingAverage.runForeach(n -> System.out.println(n), system);
// prints
// 3.8 = average of 1, 3, 10, 2, 3
// 4.4 = average of 3, 10, 2, 3, 4
// 4.2 = average of 10, 2, 3, 4, 2
// 4.2 = average of 2, 3, 4, 2, 10
// 6.0 = average of 3, 4, 2, 10, 11
// #moving-average
}
}

View file

@ -0,0 +1,32 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
object FlatMapMerge {
implicit val system: ActorSystem = ActorSystem()
// #flatmap-merge
val source: Source[String, NotUsed] = Source(List("customer-1", "customer-2"))
// e.g. could b a query to a database
def lookupCustomerEvents(customerId: String): Source[String, NotUsed] = {
Source(List(s"$customerId-evt-1", s"$customerId-evt2"))
}
source.flatMapMerge(10, customerId => lookupCustomerEvents(customerId)).runForeach(println)
// prints - events from different customers could interleave
// customer-1-evt-1
// customer-2-evt-1
// customer-1-evt-2
// customer-2-evt-2
// #flatmap-merge
}

View file

@ -0,0 +1,48 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sourceorflow
import akka.stream.scaladsl.Source
import akka.actor.ActorSystem
object Sliding {
implicit val system: ActorSystem = ???
def slidingExample1(): Unit = {
//#sliding-1
val source = Source(1 to 4)
source.sliding(2).runForeach(println)
// prints:
// Vector(1, 2)
// Vector(2, 3)
// Vector(3, 4)
//#sliding-1
}
def slidingExample2(): Unit = {
//#sliding-2
val source = Source(1 to 4)
source.sliding(n = 3, step = 2).runForeach(println)
// prints:
// Vector(1, 2, 3)
// Vector(3, 4) - shorter because stream ended before we got 3 elements
//#sliding-2
}
def slidingExample3(): Unit = {
//#moving-average
val numbers = Source(1 :: 3 :: 10 :: 2 :: 3 :: 4 :: 2 :: 10 :: 11 :: Nil)
val movingAverage = numbers.sliding(5).map(window => window.sum.toFloat / window.size)
movingAverage.runForeach(println)
// prints
// 3.8 = average of 1, 3, 10, 2, 3
// 4.4 = average of 3, 10, 2, 3, 4
// 4.2 = average of 10, 2, 3, 4, 2
// 4.2 = average of 2, 3, 4, 2, 10
// 6.0 = average of 3, 4, 2, 10, 11
//#moving-average
}
}

View file

@ -4,7 +4,6 @@
package akka.stream.javadsl;
import akka.Done;
import akka.NotUsed;
import akka.japi.Pair;
import akka.stream.StreamTest;
@ -60,7 +59,9 @@ public class LazyAndFutureSourcesTest extends StreamTest {
CompletionStage<NotUsed> nestedMatVal = result.first();
CompletionStage<List<String>> list = result.second();
assertEquals(Arrays.asList("one"), list.toCompletableFuture().get(3, TimeUnit.SECONDS));
assertEquals(true, nestedMatVal.toCompletableFuture().isDone());
// Future adaption to completionstage of matval means we cannot count on matval future being
// completed just because stream is
nestedMatVal.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
@ -87,7 +88,9 @@ public class LazyAndFutureSourcesTest extends StreamTest {
CompletionStage<NotUsed> nestedMatVal = result.first();
CompletionStage<List<String>> list = result.second();
assertEquals(Arrays.asList("one"), list.toCompletableFuture().get(3, TimeUnit.SECONDS));
assertEquals(true, nestedMatVal.toCompletableFuture().isDone());
// Future adaption to completionstage of matval means we cannot count on matval future being
// completed just because stream is
nestedMatVal.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
@ -101,6 +104,8 @@ public class LazyAndFutureSourcesTest extends StreamTest {
CompletionStage<NotUsed> nestedMatVal = result.first();
CompletionStage<List<String>> list = result.second();
assertEquals(Arrays.asList("one"), list.toCompletableFuture().get(3, TimeUnit.SECONDS));
assertEquals(true, nestedMatVal.toCompletableFuture().isDone());
// flatMap/thenCompose of matval means we cannot count on matval future being completed just
// because stream is
nestedMatVal.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
}

View file

@ -70,7 +70,6 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"orElseGraph",
"divertToGraph",
"zipWithGraph",
"actorRefWithAck" // deprecated
)
// FIXME document these methods as well
@ -99,12 +98,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"actorSubscriber",
"foldAsync",
"newOnCompleteStage",
"actorRefWithAck" // deprecated
),
"ActorSink" -> Seq("actorRefWithAck" // deprecated
),
"ActorSource" -> Seq("actorRefWithAck" // deprecated
))
)
)
val ignore =
Set("equals", "hashCode", "notify", "notifyAll", "wait", "toString", "getClass") ++

View file

@ -23,5 +23,5 @@ addSbtPlugin("com.lightbend.akka" % "sbt-paradox-akka" % "0.29")
addSbtPlugin("com.lightbend" % "sbt-whitesource" % "0.1.18")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.4.0") // for maintenance of copyright file header
addSbtPlugin("com.hpe.sbt" % "sbt-pull-request-validator" % "1.0.0")
addSbtPlugin("net.bzzt" % "sbt-reproducible-builds" % "0.22")
addSbtPlugin("net.bzzt" % "sbt-reproducible-builds" % "0.23")
addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.0.0")