Merge pull request #30373 from akka/wip-state-merge-patriknw

merge master into dev-durable-state
This commit is contained in:
Patrik Nordwall 2021-07-08 15:54:39 +02:00 committed by GitHub
commit 6c74dd7b04
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
86 changed files with 874 additions and 652 deletions

63
.github/workflows/publish.yml vendored Normal file
View file

@ -0,0 +1,63 @@
name: Publish
on:
push:
branches:
- master
# for testing the GH Action without merging to master,
# in some cases
- test-publish-snapshots
tags: ["*"]
jobs:
sbt:
name: sbt publish
runs-on: ubuntu-18.04
steps:
- name: Checkout
uses: actions/checkout@v2
with:
# we don't know what commit the last tag was it's safer to get entire repo so previousStableVersion resolves
fetch-depth: 0
- name: Set up JDK 11
uses: olafurpg/setup-scala@v10
with:
java-version: adopt@1.11.0-9
- name: Publish
run: |-
sudo apt-get install graphviz
sbt +mimaReportBinaryIssues
sbt whitesourceCheckPolicies
sbt ci-release
env:
PGP_PASSPHRASE: ${{ secrets.PGP_PASSPHRASE }}
PGP_SECRET: ${{ secrets.PGP_SECRET }}
SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }}
SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }}
WHITESOURCE_PASSWORD: ${{ secrets.WHITESOURCE_PASSWORD }}
# TODO publish gradle from here as well
documentation:
name: Documentation
runs-on: ubuntu-18.04
steps:
- name: Checkout
uses: actions/checkout@v2
with:
# we don't know what commit the last tag was it's safer to get entire repo so previousStableVersion resolves
fetch-depth: 0
- name: Set up JDK 11
uses: olafurpg/setup-scala@v10
with:
java-version: adopt@1.11.0-9
- name: Publish
run: |-
eval "$(ssh-agent -s)"
echo $SCP_SECRET | base64 -d > /tmp/id_rsa
chmod 600 /tmp/id_rsa
ssh-add /tmp/id_rsa
# using Scala 2.13 here to avoid the infamous problem with missing AskSupport in classpath
sbt -Dakka.build.scalaVersion=2.13.0 -Dakka.genjavadoc.enabled=true publishRsync
env:
SCP_SECRET: ${{ secrets.SCP_SECRET }}

View file

@ -38,7 +38,7 @@ jobs:
- stage: scala3
name: scala3
# separate job since only a few modules compile with Scala 3 yet
script: jabba install adopt@1.11-0 && jabba use adopt@1.11-0 && sbt -Dakka.build.scalaVersion=3.0 "akka-actor-tests/test:compile" akka-actor-typed/compile
script: jabba install adopt@1.11-0 && jabba use adopt@1.11-0 && sbt -Dakka.build.scalaVersion=3.0 "akka-actor-tests/test:compile" akka-actor-typed/compile akka-stream/compile
stages:
- name: whitesource

View file

@ -12,7 +12,7 @@ For resilience, we adopt the "Let it crash" model which the telecom industry has
Actors also provide the abstraction for transparent distribution and the basis for truly scalable and fault-tolerant applications.
Learn more at [akka.io](http://akka.io/).
Learn more at [akka.io](https://akka.io/).
Reference Documentation
-----------------------

View file

@ -1,5 +1,13 @@
# Releasing
Create a new issue from the [Release Train Issue Template](scripts/release-train-issue-template.md):
```
$ sh ./scripts/create-release-issue.sh 0.x.y
```
# Manually
## Prerequisites
### JDK 8 and JDK 11
@ -45,20 +53,14 @@ your `~/.sbt/1.0/private-credentials.sbt`.
[Graphvis](https://graphviz.gitlab.io/download/) is needed for the
scaladoc generation build task, which is part of the release.
### Release script instructions
Make sure you have completed the setup in `project/scripts/release`.
## Snapshot releases
Nightly snapshot releases are created from master and published to
https://repo.akka.io/snapshots by https://jenkins.akka.io:8498/job/akka-publish-nightly/
Snapshot releases are created from master and published to
https://oss.sonatype.org/content/repositories/snapshots/com/typesafe/akka/
To create snapshot versions manually, use `sbt clean publish`.
The release artifacts are created in `akka-*/target/repository` and can be
copied over to a maven server. If you have access, the Jenkins job at
https://jenkins.akka.io:8498/job/akka-publish-wip/ can be used to publish
a snapshot to https://repo.akka.io/snapshots from any branch.
To create snapshot versions manually, use `sbt clean publishLocal`.
If you have access, you can use `+publishSigned` to publish them to
sonatype.
## Releasing only updated docs
@ -80,55 +82,3 @@ It is possible to release a revised documentation to the already existing releas
sbt akka-docs/publishRsync
```
1. Do not forget to push the new branch back to GitHub.
## Release steps
* Tag the release: `git tag -am "Version 2.6.x" v2.6.x`
* Do a `project/scripts/release` to build and release to sonatype
* Log into sonatype, 'close' the staging repo.
* Test the artifacts by adding `resolvers += "Staging Repo" at "https://oss.sonatype.org/content/repositories/comtypesafe-xxxx"` to a test project
* If all is well, 'release' the staging repo.
* Push the release tag to github
## Announcing
* Prepare milestone on github:
* go to the [Milestones tab](https://github.com/akka/akka/milestones)
* move all open issues so that this milestone contains completed work only
* close that milestone
* create a new milestone for next patch version
* In case of a new minor release:
* update the branch descriptions at CONTRIBUTING.md#branches-summary
* write blog post for akka.io and lightbend.com
* Create an announcement as a PR against akka/akka.github.com .
* credits can be generated with `scripts/authors.scala v2.3.5 v2.3.6`
* also update the `latest` variable in `_config.yml`.
Now wait until all artifacts have been properly propagated. Then:
* Update `MiMa.latestPatchOf` and PR that change (`project/MiMa.scala`)
* Change the symbolic links from 'current': `ssh akkarepo@gustav.akka.io ./update-akka-current-version.sh <x.y.z>`
* Publish the release announcement
* Tweet about it
* Post about it on Discuss
* Post about it on Gitter
* Create a GitHub 'release' for the tag via https://github.com/akka/akka/releases with a link to the release notes
## Update references
Update the versions used in:
* https://github.com/akka/akka-samples
* https://github.com/lightbend/lightbend-platform-docs/blob/master/docs/modules/getting-help/examples/build.sbt (this populates https://developer.lightbend.com/docs/lightbend-platform/introduction/getting-help/build-dependencies.html#_akka)
These are autoupdated by latest stable on maven central:
* https://github.com/akka/akka-quickstart-java.g8
* https://github.com/akka/akka-quickstart-scala.g8
* https://github.com/akka/akka-http-quickstart-java.g8
* https://github.com/akka/akka-http-quickstart-scala.g8
* https://github.com/akka/akka-grpc-quickstart-java.g8
* https://github.com/akka/akka-grpc-quickstart-scala.g8

View file

@ -203,7 +203,7 @@ private[io] final class AsyncDnsResolver(
* INTERNAL API
*/
@InternalApi
private[io] object AsyncDnsResolver {
private[akka] object AsyncDnsResolver {
private val ipv4Address =
"""^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$""".r
@ -211,10 +211,15 @@ private[io] object AsyncDnsResolver {
private val ipv6Address =
"""^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$""".r
private def isInetAddress(name: String): Boolean =
ipv4Address.findAllMatchIn(name).nonEmpty ||
private[akka] def isIpv4Address(name: String): Boolean =
ipv4Address.findAllMatchIn(name).nonEmpty
private[akka] def isIpv6Address(name: String): Boolean =
ipv6Address.findAllMatchIn(name).nonEmpty
private def isInetAddress(name: String): Boolean =
isIpv4Address(name) || isIpv6Address(name)
private val Empty =
Future.successful(Answer(-1, immutable.Seq.empty[ResourceRecord], immutable.Seq.empty[ResourceRecord]))

View file

@ -15,12 +15,10 @@ import akka.util.ccompat.JavaConverters._
import java.util.{ Set => JSet }
import akka.actor.typed.Behavior
import akka.annotation.ApiMayChange
import akka.cluster.sharding.typed.internal.EntityTypeKeyImpl
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.ReplicationId.Separator
@ApiMayChange
object ReplicatedEntityProvider {
/**
@ -138,7 +136,6 @@ object ReplicatedEntityProvider {
*
* @tparam M The type of messages the replicated entity accepts
*/
@ApiMayChange
final class ReplicatedEntityProvider[M] private (
val replicas: immutable.Seq[(ReplicatedEntity[M], String)],
val directReplication: Boolean) {
@ -155,7 +152,6 @@ final class ReplicatedEntityProvider[M] private (
}
@ApiMayChange
object ReplicatedEntity {
/**
@ -181,5 +177,4 @@ object ReplicatedEntity {
* Settings for a specific replica id in replicated sharding
* Currently only Entity's with ShardingEnvelope are supported but this may change in the future
*/
@ApiMayChange
final class ReplicatedEntity[M] private (val replicaId: ReplicaId, val entity: Entity[M, ShardingEnvelope[M]])

View file

@ -7,7 +7,6 @@ package akka.cluster.sharding.typed
import akka.actor.typed.ActorSystem
import akka.actor.typed.Extension
import akka.actor.typed.ExtensionId
import akka.annotation.ApiMayChange
import akka.annotation.DoNotInherit
import akka.cluster.sharding.typed.internal.ReplicatedShardingExtensionImpl
import akka.cluster.sharding.typed.scaladsl.EntityRef
@ -18,7 +17,6 @@ import java.util.{ Map => JMap }
* Extension for running Replicated Event Sourcing in sharding by starting one separate instance of sharding per replica.
* The sharding instances can be confined to datacenters or cluster roles or run on the same set of cluster nodes.
*/
@ApiMayChange
object ReplicatedShardingExtension extends ExtensionId[ReplicatedShardingExtension] {
override def createExtension(system: ActorSystem[_]): ReplicatedShardingExtension =
@ -32,7 +30,6 @@ object ReplicatedShardingExtension extends ExtensionId[ReplicatedShardingExtensi
* Not for user extension.
*/
@DoNotInherit
@ApiMayChange
trait ReplicatedShardingExtension extends Extension {
/**
@ -61,7 +58,6 @@ trait ReplicatedShardingExtension extends Extension {
* Not for user extension.
*/
@DoNotInherit
@ApiMayChange
trait ReplicatedSharding[M] {
/**

View file

@ -87,7 +87,7 @@ akka.cluster.sharding {
state-store-mode = "ddata"
# The shard saves persistent snapshots after this number of persistent
# events. Snapshots are used to reduce recovery times.
# events. Snapshots are used to reduce recovery times. A snapshot trigger might be delayed if a batch of updates is processed.
# Only used when state-store-mode=persistence
snapshot-after = 1000
@ -95,7 +95,8 @@ akka.cluster.sharding {
# keeping this number of old persistent batches.
# Batch is of size `snapshot-after`.
# When set to 0 after snapshot is successfully done all events with equal or lower sequence number will be deleted.
# Default value of 2 leaves last maximum 2*`snapshot-after` events and 3 snapshots (2 old ones + latest snapshot)
# Default value of 2 leaves last maximum 2*`snapshot-after` events and 3 snapshots (2 old ones + latest snapshot).
# If larger than 0, one additional batch of journal messages is kept when state-store-mode=persistence to include messages from delayed snapshots.
keep-nr-of-batches = 2
# Settings for LeastShardAllocationStrategy.

View file

@ -96,13 +96,17 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(
(if (started.nonEmpty) EntitiesStarted(started) :: Nil else Nil) :::
(if (stopped.nonEmpty) EntitiesStopped(stopped) :: Nil else Nil)
var left = events.size
var saveSnap = false
def persistEventsAndHandleComplete(evts: List[StateChange]): Unit = {
persistAll(evts) { _ =>
left -= 1
saveSnap = saveSnap || isSnapshotNeeded
if (left == 0) {
sender() ! RememberEntitiesShardStore.UpdateDone(started, stopped)
state = state.copy(state.entities.union(started).diff(stopped))
saveSnapshotWhenNeeded()
if (saveSnap) {
saveSnapshot()
}
}
}
}
@ -126,7 +130,9 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(
case DeleteMessagesSuccess(toSequenceNr) =>
val deleteTo = toSequenceNr - 1
val deleteFrom = math.max(0, deleteTo - (keepNrOfBatches * snapshotAfter))
// keeping one additional batch of messages in case snapshotAfter has been delayed to the end of a processed batch
val keepNrOfBatchesWithSafetyBatch = if (keepNrOfBatches == 0) 0 else keepNrOfBatches + 1
val deleteFrom = math.max(0, deleteTo - (keepNrOfBatchesWithSafetyBatch * snapshotAfter))
log.debug(
"Messages to [{}] deleted successfully. Deleting snapshots from [{}] to [{}]",
toSequenceNr,
@ -151,10 +157,17 @@ private[akka] final class EventSourcedRememberEntitiesShardStore(
}
def saveSnapshotWhenNeeded(): Unit = {
if (lastSequenceNr % snapshotAfter == 0 && lastSequenceNr != 0) {
log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr)
saveSnapshot(state)
if (isSnapshotNeeded) {
saveSnapshot()
}
}
private def saveSnapshot(): Unit = {
log.debug("Saving snapshot, sequence number [{}]", snapshotSequenceNr)
saveSnapshot(state)
}
private def isSnapshotNeeded = {
lastSequenceNr % snapshotAfter == 0 && lastSequenceNr != 0
}
}

View file

@ -973,6 +973,10 @@ class ClusterSingletonManager(singletonProps: Props, terminationMessage: Any, se
case None =>
tryAcquireLease()
}
case Event(HandOverDone, _) =>
// nothing to do
stay()
}
when(WasOldest) {

View file

@ -31,6 +31,5 @@ These are the current complete modules marked as **may change**:
* @ref:[Multi Node Testing](../multi-node-testing.md)
* @ref:[Reliable Delivery](../typed/reliable-delivery.md)
* @ref:[Sharded Daemon Process](../typed/cluster-sharded-daemon-process.md)
* @ref:[Replicated Event Sourcing](../typed/replicated-eventsourcing.md)

View file

@ -32,12 +32,7 @@ All Akka releases are published via Sonatype to Maven Central, see
## Snapshots Repository
Nightly builds are available in [https://repo.akka.io/snapshots](https://repo.akka.io/snapshots/) as both `SNAPSHOT` and
timestamped versions.
For timestamped versions, pick a timestamp from
[https://repo.akka.io/snapshots/com/typesafe/akka/](https://repo.akka.io/snapshots/com/typesafe/akka/).
All Akka modules that belong to the same build have the same timestamp.
Snapshot builds are available at [https://oss.sonatype.org/content/repositories/snapshots/com/typesafe/akka/](https://oss.sonatype.org/content/repositories/snapshots/com/typesafe/akka/). All Akka modules that belong to the same build have the same version.
@@@ warning
@ -50,14 +45,14 @@ The use of Akka SNAPSHOTs, nightlies and milestone releases is discouraged unles
Make sure that you add the repository to the sbt resolvers:
```
resolvers += "Akka Snapshots" at "https://repo.akka.io/snapshots/"
resolvers += Resolver.sonatypeRepo("snapshots")
```
Define the library dependencies with the timestamp as version. For example:
Define the library dependencies with the complete version. For example:
@@@vars
```
libraryDependencies += "com.typesafe.akka" % "akka-remote_$scala.binary.version$" % "2.5-20170510-230859"
libraryDependencies += "com.typesafe.akka" % "akka-remote_$scala.binary.version$" % "2.6.14+72-53943d99-SNAPSHOT"
```
@@@
@ -68,10 +63,12 @@ Make sure that you add the repository to the Maven repositories in pom.xml:
```
<repositories>
<repository>
<id>akka-snapshots</id>
<name>Akka Snapshots</name>
<url>https://repo.akka.io/snapshots/</url>
<id>oss-sonatype</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<layout>default</layout>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
```
@ -84,7 +81,7 @@ Define the library dependencies with the timestamp as version. For example:
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-remote_$scala.binary.version$</artifactId>
<version>2.5-20170510-230859</version>
<version>2.6.14+72-53943d99-SNAPSHOT</version>
</dependency>
</dependencies>
```

View file

@ -6,11 +6,7 @@ Combine several sinks into one using a user specified strategy
## Signature
@apidoc[Sink.combine](Sink$) { scala="
#combine[T,U](first:akka.stream.scaladsl.Sink[U,_],second:akka.stream.scaladsl.Sink[U,_],rest:akka.stream.scaladsl.Sink[U,_]*)(
strategy:Int=&gt;akka.stream.Graph[akka.stream.UniformFanOutShape[T,U],akka.NotUsed]):
akka.stream.scaladsl.Sink[T,akka.NotUsed]" java="#combine(
akka.stream.javadsl.Sink,akka.stream.javadsl.Sink,java.util.List,akka.japi.function.Function)" }
@apidoc[Sink.combine](Sink$) { scala="#combine[T,U](first:akka.stream.scaladsl.Sink[U,_],second:akka.stream.scaladsl.Sink[U,_],rest:akka.stream.scaladsl.Sink[U,_]*)(strategy:Int=&gt;akka.stream.Graph[akka.stream.UniformFanOutShape[T,U],akka.NotUsed]):akka.stream.scaladsl.Sink[T,akka.NotUsed]" java="#combine(akka.stream.javadsl.Sink,akka.stream.javadsl.Sink,java.util.List,akka.japi.function.Function)" }
## Description
@ -22,12 +18,10 @@ This example shows how to combine multiple sinks with a Fan-out Junction.
Scala
:
@@snip [StreamPartialGraphDSLDocSpec.scala](/akka-docs/src/test/scala/docs/stream/StreamPartialGraphDSLDocSpec.scala) {
#sink-combine }
@@snip [StreamPartialGraphDSLDocSpec.scala](/akka-docs/src/test/scala/docs/stream/StreamPartialGraphDSLDocSpec.scala) {#sink-combine }
Java
: @@snip [StreamPartialGraphDSLDocTest.java](/akka-docs/src/test/java/jdocs/stream/StreamPartialGraphDSLDocTest.java)
{ #sink-combine }
: @@snip [StreamPartialGraphDSLDocTest.java](/akka-docs/src/test/java/jdocs/stream/StreamPartialGraphDSLDocTest.java) { #sink-combine }
## Reactive Streams semantics

View file

@ -55,8 +55,8 @@ parameters.
Please note that when combining a `Flow` using that method, the termination signals are not carried
"through" as the `Sink` and `Source` are assumed to be fully independent. If however you want to construct
a `Flow` like this but need the termination events to trigger "the other side" of the composite flow, you can use
`Flow.fromSinkAndSourceCoupled` or `Flow.fromSinkAndSourceCoupledMat` which does just that. For example the cancelation of the composite flows
source-side will then lead to completion of its sink-side. Read @apidoc[Flow]'s API documentation for a
`Flow.fromSinkAndSourceCoupled` or `Flow.fromSinkAndSourceCoupledMat` which does just that. For example the cancellation
of the composite flows source-side will then lead to completion of its sink-side. Read @apidoc[Flow]'s API documentation for a
detailed explanation how this works.
The example `BidiFlow` demonstrates that internally a module can be of arbitrary complexity, and the exposed
@ -88,7 +88,7 @@ Java
: @@snip [CompositionDocTest.java](/akka-docs/src/test/java/jdocs/stream/CompositionDocTest.java) { #non-nested-flow }
It is clear however that there is no nesting present in our first attempt, since the library cannot figure out
It is clear however that there is no nesting present in our first attempt. Since the library cannot figure out
where we intended to put composite module boundaries, it is our responsibility to do that. If we are using the
DSL provided by the `Flow`, `Source`, `Sink` classes then nesting can be achieved by calling one of the
methods `withAttributes()` or `named()` (where the latter is a shorthand for adding a name attribute).
@ -311,15 +311,20 @@ of combining materialized values without nesting and hierarchy involved.
## Attributes
We have seen that we can use `named()` to introduce a nesting level in the fluid DSL (and also explicit nesting by using
`create()` from `GraphDSL`). Apart from having the effect of adding a nesting level, `named()` is actually
a shorthand for calling `withAttributes(Attributes.name("someName"))`. Attributes provide a way to fine-tune certain
aspects of the materialized running entity. For example buffer sizes for asynchronous operators can be controlled via
attributes (see @ref:[Buffers for asynchronous operators](stream-rate.md#async-stream-buffers)). When it comes to hierarchic composition, attributes are inherited
by nested modules, unless they override them with a custom value.
We have seen that we can use `named()` to introduce a nesting level in the fluid DSL and also explicit nesting by using
`create()` from `GraphDSL`. Apart from having the effect of adding a nesting level, `named()` is actually
a shorthand for calling `addAttributes(Attributes.name("someName"))`, adding the `name` attribute to the graph.
Attributes provide a way to fine-tune certain aspects of the materialized running entity. Attributes are inherited by
nested modules, unless they override them with a custom value. This means the attribute specified closest to the operator
in the graph will be the one that is in effect for that operator.
Another example of an attribute is the `inputBuffer` attribute which has the main purpose to provide control over buffer sizes
for asynchronous boundaries (see @ref:[Buffers for asynchronous operators](stream-rate.md#async-stream-buffers)).
The code below, a modification of an earlier example sets the `inputBuffer` attribute on certain modules, but not
on others:
on others. _Note_ that this is only to show how attributes inheritance works, the actual `inputBuffer` attribute does not
have any specific effect when running these streams:
Scala
: @@snip [CompositionDocSpec.scala](/akka-docs/src/test/scala/docs/stream/CompositionDocSpec.scala) { #attributes-inheritance }

View file

@ -284,6 +284,8 @@ the entity when it's supposed to stop itself due to rebalance or passivation. If
it will be stopped automatically without receiving a specific message. It can be useful to define a custom stop
message if the entity needs to perform some asynchronous cleanup or interactions before stopping.
The stop message is only sent locally, from the shard to the entity so does not require an entity id to end up in the right actor. When using a custom `ShardingMessageExtractor` without envelopes, the extractor will still have to handle the stop message type to please the compiler, even though it will never actually be passed to the extractor.
### Automatic Passivation
The entities are automatically passivated if they haven't received a message within the duration configured in

View file

@ -148,8 +148,8 @@ Please refer to its documentation for more details.
#### Joining configured seed nodes
When a new node is started it sends a message to all seed nodes and then sends join command to the one that
answers first. If no one of the seed nodes replied (might not be started yet)
When a new node is started it sends a message to all seed nodes and then sends a join command to the one that
answers first. If none of the seed nodes replies (might not be started yet)
it retries this procedure until success or shutdown.
You can define the seed nodes in the @ref:[configuration](#configuration) file (application.conf):
@ -169,7 +169,7 @@ This can also be defined as Java system properties when starting the JVM using t
When a new node is started it sends a message to all configured `seed-nodes` and then sends a join command to the
one that answers first. If none of the seed nodes replied (might not be started yet) it retries this procedure
one that answers first. If none of the seed nodes replies (might not be started yet) it retries this procedure
until successful or shutdown.
The seed nodes can be started in any order. It is not necessary to have all
@ -369,14 +369,14 @@ configuration descriptions, default values and options.
A common use case is to start actors after the cluster has been initialized,
members have joined, and the cluster has reached a certain size.
With a configuration option you can define required number of members
With a configuration option you can define the required number of members
before the leader changes member status of 'Joining' members to 'Up'.:
```
akka.cluster.min-nr-of-members = 3
```
In a similar way you can define required number of members of a certain role
In a similar way you can define the required number of members of a certain role
before the leader changes member status of 'Joining' members to 'Up'.:
```

View file

@ -1,13 +1,5 @@
# Replicated Event Sourcing
@@@ warning
This module is marked as @ref:[may change](../common/may-change.md) because it is a new feature that
needs feedback from real usage before finalizing the API. This means that API or semantics can change without
warning or deprecation period. It is also not recommended to use this module in production just yet.
@@@
@ref[Event Sourcing](./persistence.md) with `EventSourcedBehavior`s is based on the single writer principle, which means that there can only be one active instance of a `EventSourcedBehavior`
with a given `persistenceId`. Otherwise, multiple instances would store interleaving events based on different states, and when these events would later be replayed it would not be possible to reconstruct the correct state.

View file

@ -4,10 +4,13 @@
package docs.akka.persistence.typed
import scala.annotation.nowarn
import akka.actor.typed.ActorSystem
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.ReplicationId
import akka.persistence.typed.scaladsl.{ EventSourcedBehavior, ReplicatedEventSourcing }
import scala.annotation.nowarn
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.scaladsl.ReplicatedEventSourcing
@nowarn("msg=never used")
object ReplicatedEventSourcingCompileOnlySpec {
@ -24,21 +27,36 @@ object ReplicatedEventSourcingCompileOnlySpec {
trait State
trait Event
//#factory-shared
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("entityTypeHint", "entityId", DCA),
AllReplicas,
queryPluginId) { context =>
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
object Shared {
//#factory-shared
def apply(
system: ActorSystem[_],
entityId: String,
replicaId: ReplicaId): EventSourcedBehavior[Command, State, Event] = {
ReplicatedEventSourcing.commonJournalConfig(
ReplicationId("MyReplicatedEntity", entityId, replicaId),
AllReplicas,
queryPluginId) { replicationContext =>
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
}
}
//#factory-shared
}
//#factory-shared
//#factory
ReplicatedEventSourcing.perReplicaJournalConfig(
ReplicationId("entityTypeHint", "entityId", DCA),
Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) { context =>
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
object PerReplica {
//#factory
def apply(
system: ActorSystem[_],
entityId: String,
replicaId: ReplicaId): EventSourcedBehavior[Command, State, Event] = {
ReplicatedEventSourcing.perReplicaJournalConfig(
ReplicationId("MyReplicatedEntity", entityId, replicaId),
Map(DCA -> "journalForDCA", DCB -> "journalForDCB")) { replicationContext =>
EventSourcedBehavior[Command, State, Event](???, ???, ???, ???)
}
}
//#factory
}
//#factory
}

View file

@ -3,9 +3,7 @@
*/
package akka.persistence.typed.crdt
import akka.annotation.ApiMayChange
@ApiMayChange
object Counter {
val empty: Counter = Counter(0)
@ -23,7 +21,6 @@ object Counter {
}
}
@ApiMayChange
final case class Counter(value: BigInt) extends OpCrdt[Counter.Updated] {
override type T = Counter

View file

@ -3,14 +3,12 @@
*/
package akka.persistence.typed.crdt
import akka.annotation.ApiMayChange
import akka.persistence.typed.ReplicaId
/**
* Utility class for comparing timestamp replica
* identifier when implementing last-writer wins.
*/
@ApiMayChange
final case class LwwTime(timestamp: Long, originReplica: ReplicaId) {
/**

View file

@ -7,12 +7,11 @@ package akka.persistence.typed.crdt
import scala.annotation.tailrec
import scala.collection.immutable
import akka.util.HashCode
import akka.annotation.{ ApiMayChange, InternalApi }
import akka.annotation.InternalApi
import akka.persistence.typed.ReplicaId
import akka.persistence.typed.crdt.ORSet.DeltaOp
import akka.persistence.typed.internal.{ ManyVersionVector, OneVersionVector, VersionVector }
@ApiMayChange
object ORSet {
def empty[A](originReplica: ReplicaId): ORSet[A] = new ORSet(originReplica.id, Map.empty, VersionVector.empty)
def apply[A](originReplica: ReplicaId): ORSet[A] = empty(originReplica)
@ -274,7 +273,6 @@ object ORSet {
*
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
@ApiMayChange
final class ORSet[A] private[akka] (
val originReplica: String,
private[akka] val elementsMap: Map[A, ORSet.Dot],

View file

@ -4,9 +4,8 @@
package akka.persistence.typed.crdt
import akka.annotation.{ ApiMayChange, DoNotInherit }
import akka.annotation.DoNotInherit
@ApiMayChange
@DoNotInherit
trait OpCrdt[Operation] { self =>
type T <: OpCrdt[Operation] { type T = self.T }

View file

@ -305,7 +305,6 @@ object ReplicatedEventMetadata {
* For a journal supporting Replicated Event Sourcing needing to add test coverage, use this instance as metadata and defer
* to the built in serializer for serialization format
*/
@ApiMayChange
def instanceForJournalTest: Any = ReplicatedEventMetadata(ReplicaId("DC-A"), 1L, VersionVector.empty + "DC-A", true)
}
@ -328,7 +327,6 @@ object ReplicatedSnapshotMetadata {
* For a snapshot store supporting Replicated Event Sourcing needing to add test coverage, use this instance as metadata and defer
* to the built in serializer for serialization format
*/
@ApiMayChange
def instanceForSnapshotStoreTest: Any =
ReplicatedSnapshotMetadata(
VersionVector.empty + "DC-B" + "DC-A",

View file

@ -268,7 +268,7 @@ private[akka] object Running {
state: Running.RunningState[S],
envelope: ReplicatedEventEnvelope[E],
replication: ReplicationSetup): Behavior[InternalProtocol] = {
setup.internalLogger.infoN(
setup.internalLogger.debugN(
"Replica {} received replicated event. Replica seqs nrs: {}. Envelope {}",
setup.replication,
state.seenPerReplica,

View file

@ -9,14 +9,12 @@ import java.util.Optional
import akka.actor.typed.BackoffSupervisorStrategy
import akka.actor.typed.Behavior
import akka.actor.typed.TypedActorContext
import akka.annotation.ApiMayChange
import akka.annotation.InternalApi
import akka.persistence.typed.internal.ReplicationContextImpl
/**
* Base class for replicated event sourced behaviors.
*/
@ApiMayChange
abstract class ReplicatedEventSourcedBehavior[Command, Event, State](
replicationContext: ReplicationContext,
onPersistFailure: Optional[BackoffSupervisorStrategy])

View file

@ -134,7 +134,9 @@ public interface StashingExample {
private Effect<Event, State> onEndTask(State state, EndTask command) {
if (state.taskIdInProgress.isPresent()) {
if (state.taskIdInProgress.get().equals(command.taskId))
return Effect().persist(new TaskCompleted(command.taskId));
return Effect()
.persist(new TaskCompleted(command.taskId))
.thenUnstashAll(); // continue with next task
else return Effect().stash(); // other task in progress, wait with new task until later
} else {
return Effect().unhandled();

View file

@ -496,7 +496,7 @@ private[remote] class ReliableDeliverySupervisor(
// If we have not confirmed the remote UID we cannot transfer the system message at this point just buffer it.
// GotUid will kick resendAll() causing the messages to be properly written.
// Flow control by not sending more when we already have many outstanding.
if (uidConfirmed && resendBuffer.nonAcked.size <= settings.SysResendLimit)
if (uidConfirmed && resendBuffer.nonAcked.length <= settings.SysResendLimit)
writer ! sequencedSend
} else writer ! send
@ -1130,7 +1130,7 @@ private[remote] class EndpointReader(
override def receive: Receive = {
case Disassociated(info) => handleDisassociated(info)
case InboundPayload(p) if p.size <= transport.maximumPayloadBytes =>
case InboundPayload(p) if p.length <= transport.maximumPayloadBytes =>
val (ackOption, msgOption) = tryDecodeMessageAndAck(p)
for (ack <- ackOption; reliableDelivery <- reliableDeliverySupervisor) reliableDelivery ! ack
@ -1180,7 +1180,7 @@ private[remote] class EndpointReader(
case StopReading(writer, replyTo) =>
replyTo ! StoppedReading(writer)
case InboundPayload(p) if p.size <= transport.maximumPayloadBytes =>
case InboundPayload(p) if p.length <= transport.maximumPayloadBytes =>
val (ackOption, msgOption) = tryDecodeMessageAndAck(p)
for (ack <- ackOption; reliableDelivery <- reliableDeliverySupervisor) reliableDelivery ! ack

View file

@ -20,6 +20,7 @@ import akka.util.Helpers.Requiring
import akka.util.Helpers.toRootLowerCase
import akka.util.WildcardIndex
import akka.util.ccompat.JavaConverters._
import akka.io.dns.internal.AsyncDnsResolver
/** INTERNAL API */
private[akka] final class ArterySettings private (config: Config) {
@ -292,7 +293,11 @@ private[akka] object ArterySettings {
def getHostname(key: String, config: Config): String = config.getString(key) match {
case "<getHostAddress>" => InetAddress.getLocalHost.getHostAddress
case "<getHostName>" => InetAddress.getLocalHost.getHostName
case other => other
case other =>
if (other.startsWith("[") && other.endsWith("]")) other
else if (AsyncDnsResolver.isIpv6Address(other)) {
"[" + other + "]"
} else other
}
sealed trait Transport {

View file

@ -69,7 +69,7 @@ import akka.util.ByteString
case object ReadMagic extends Step {
override def parse(reader: ByteReader): ParseResult[EnvelopeBuffer] = {
val magic = reader.take(TcpFraming.Magic.size)
val magic = reader.take(TcpFraming.Magic.length)
if (magic == TcpFraming.Magic)
ParseResult(None, ReadStreamId)
else

View file

@ -32,6 +32,7 @@ import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.time.Duration;
@ -290,7 +291,6 @@ public class FlowTest extends StreamTest {
probe.expectMsgEquals(6);
}
@SuppressWarnings("unchecked")
@Test
public void mustBeAbleToUseGroupBy() throws Exception {
final Iterable<String> input = Arrays.asList("Aaa", "Abb", "Bcc", "Cdd", "Cee");
@ -308,22 +308,14 @@ public class FlowTest extends StreamTest {
final CompletionStage<List<List<String>>> future =
Source.from(input).via(flow).limit(10).runWith(Sink.seq(), system);
final Object[] result = future.toCompletableFuture().get(1, TimeUnit.SECONDS).toArray();
Arrays.sort(
result,
(Comparator<Object>)
(Object)
new Comparator<List<String>>() {
@Override
public int compare(List<String> o1, List<String> o2) {
return o1.get(0).charAt(0) - o2.get(0).charAt(0);
}
});
final List<List<String>> result =
future.toCompletableFuture().get(1, TimeUnit.SECONDS).stream()
.sorted(Comparator.comparingInt(list -> list.get(0).charAt(0)))
.collect(Collectors.toList());
assertArrayEquals(
new Object[] {
Arrays.asList("Aaa", "Abb"), Arrays.asList("Bcc"), Arrays.asList("Cdd", "Cee")
},
assertEquals(
Arrays.asList(
Arrays.asList("Aaa", "Abb"), Arrays.asList("Bcc"), Arrays.asList("Cdd", "Cee")),
result);
}

View file

@ -36,6 +36,7 @@ import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@ -177,7 +178,6 @@ public class SourceTest extends StreamTest {
probe.expectMsgEquals(6);
}
@SuppressWarnings("unchecked")
@Test
public void mustBeAbleToUseGroupBy() throws Exception {
final Iterable<String> input = Arrays.asList("Aaa", "Abb", "Bcc", "Cdd", "Cee");
@ -195,22 +195,14 @@ public class SourceTest extends StreamTest {
final CompletionStage<List<List<String>>> future =
source.grouped(10).runWith(Sink.head(), system);
final Object[] result = future.toCompletableFuture().get(1, TimeUnit.SECONDS).toArray();
Arrays.sort(
result,
(Comparator<Object>)
(Object)
new Comparator<List<String>>() {
@Override
public int compare(List<String> o1, List<String> o2) {
return o1.get(0).charAt(0) - o2.get(0).charAt(0);
}
});
final List<List<String>> result =
future.toCompletableFuture().get(1, TimeUnit.SECONDS).stream()
.sorted(Comparator.comparingInt(list -> list.get(0).charAt(0)))
.collect(Collectors.toList());
assertArrayEquals(
new Object[] {
Arrays.asList("Aaa", "Abb"), Arrays.asList("Bcc"), Arrays.asList("Cdd", "Cee")
},
assertEquals(
Arrays.asList(
Arrays.asList("Aaa", "Abb"), Arrays.asList("Bcc"), Arrays.asList("Cdd", "Cee")),
result);
}

View file

@ -29,7 +29,8 @@ class DslFactoriesConsistencySpec extends AnyWordSpec with Matchers {
"lazyFutureFlow", // lazyCompletionStageFlow
"futureFlow", // completionStageFlow
"futureSink", // completionStageSink
"lazyFutureSink" // lazyCompletionStageSink
"lazyFutureSink", // lazyCompletionStageSink
"createGraph" // renamed/overload of create for getting type inference working in Scala 3
)
val javaIgnore =

View file

@ -0,0 +1,106 @@
/*
* Copyright (C) 2020-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl.io
import akka.stream.TLSClientAuth
import akka.stream.TLSProtocol.NegotiateNewSession
import org.scalatest.matchers.should.Matchers
import javax.net.ssl.{ SSLContext, SSLEngine, SSLParameters }
import org.scalatest.wordspec.AnyWordSpecLike
class TLSUtilsSpec extends AnyWordSpecLike with Matchers {
"TlsUtils.applySessionParameters" must {
"use defaults if not requested otherwise" in {
// this test confirms the default expectation that forms the basis
// of the follow-up tests
val sslEngine: SSLEngine = SSLContext.getDefault.createSSLEngine()
// by default, need client auth is false
sslEngine.getSSLParameters.getNeedClientAuth shouldBe false
val sslParams: SSLParameters = new SSLParameters()
sslParams.getNeedClientAuth shouldBe false // by default is false
val negotiableSession: NegotiateNewSession =
// NegotiateNewSession requested nothing
NegotiateNewSession(None, None, None, Some(sslParams))
TlsUtils.applySessionParameters(sslEngine, negotiableSession)
// NeedClientAuth stick to defaults
sslEngine.getSSLParameters.getNeedClientAuth shouldBe false
}
"set NeedClientAuth to true by applying SSLParameters" in {
val sslEngine: SSLEngine = SSLContext.getDefault.createSSLEngine()
val sslParams: SSLParameters = new SSLParameters()
sslParams.setNeedClientAuth(true)
val negotiableSession: NegotiateNewSession =
// NegotiateNewSession requested nothing, so sslParams should prevail
NegotiateNewSession(None, None, None, Some(sslParams))
TlsUtils.applySessionParameters(sslEngine, negotiableSession)
sslEngine.getSSLParameters.getNeedClientAuth shouldBe true
}
"set NeedClientAuth to true when TLSClientAuth.Need is requested" in {
val sslEngine: SSLEngine = SSLContext.getDefault.createSSLEngine()
val sslParams: SSLParameters = new SSLParameters()
val negotiableSession: NegotiateNewSession =
NegotiateNewSession(None, None, Some(TLSClientAuth.Need), Some(sslParams))
TlsUtils.applySessionParameters(sslEngine, negotiableSession)
// true because explicitly asked when setting TLSClientAuth.Need
sslEngine.getSSLParameters.getNeedClientAuth shouldBe true
}
"set NeedClientAuth to false when TLSClientAuth.None is requested" in {
val sslEngine: SSLEngine = SSLContext.getDefault.createSSLEngine()
val sslParams: SSLParameters = new SSLParameters()
sslParams.setNeedClientAuth(true)
{
val negotiableSession: NegotiateNewSession =
// NegotiateNewSession requested nothing, so sslParams should prevail
NegotiateNewSession(None, None, None, Some(sslParams))
TlsUtils.applySessionParameters(sslEngine, negotiableSession)
// NeedClientAuth is true because ssl param is applied
sslEngine.getSSLParameters.getNeedClientAuth shouldBe true
}
{
val negotiableSession: NegotiateNewSession =
// NegotiateNewSession requested TLSClientAuth.None, sslParams is overridden
NegotiateNewSession(None, None, Some(TLSClientAuth.None), Some(sslParams))
TlsUtils.applySessionParameters(sslEngine, negotiableSession)
// despite ssl params set it to true,
// NegotiateNewSession explicitly sets it to false
sslEngine.getSSLParameters.getNeedClientAuth shouldBe false
}
}
"set WantClientAuth to true when TLSClientAuth.Want is requested" in {
val sslEngine: SSLEngine = SSLContext.getDefault.createSSLEngine()
val sslParams: SSLParameters = new SSLParameters()
val negotiableSession: NegotiateNewSession =
NegotiateNewSession(None, None, Some(TLSClientAuth.Want), Some(sslParams))
TlsUtils.applySessionParameters(sslEngine, negotiableSession)
// true because explicitly asked when setting TLSClientAuth.Want
sslEngine.getSSLParameters.getWantClientAuth shouldBe true
sslEngine.getSSLParameters.getNeedClientAuth shouldBe false
}
}
}

View file

@ -22,7 +22,7 @@ private[stream] abstract class GraphCreate {
*/
def create[S1 <: Shape, S <: Shape, M](g1: Graph[S1, M],
block: function.Function2[GraphDSL.Builder[M], S1, S]): Graph[S, M] =
scaladsl.GraphDSL.create(g1) { b => s => block.apply(b.asJava, s) }
scaladsl.GraphDSL.createGraph(g1) { b => s => block.apply(b.asJava, s) }
/**
* Creates a new [[Graph]] by importing the given graphs and passing their [[Shape]]s
@ -30,7 +30,7 @@ private[stream] abstract class GraphCreate {
*/
def create[S1 <: Shape, S2 <: Shape, S <: Shape, M1, M2, M](g1: Graph[S1, M1], g2: Graph[S2, M2], combineMat: function.Function2[M1, M2, M],
block: function.Function3[GraphDSL.Builder[M], S1, S2, S]): Graph[S, M] =
scaladsl.GraphDSL.create(g1, g2)(combineMat.apply) { b => (s1, s2) => block.apply(b.asJava, s1, s2) }
scaladsl.GraphDSL.createGraph(g1, g2)(combineMat.apply) { b => (s1, s2) => block.apply(b.asJava, s1, s2) }
[3..21#/**
* Creates a new [[Graph]] by importing the given graphs and passing their [[Shape]]s
@ -38,7 +38,7 @@ private[stream] abstract class GraphCreate {
*/
def create1[[#S1 <: Shape#], S <: Shape, [#M1#], M]([#g1: Graph[S1, M1]#], combineMat: function.Function1[[#M1#], M],
block: function.Function2[GraphDSL.Builder[M], [#S1#], S]): Graph[S, M] =
scaladsl.GraphDSL.create([#g1#])(combineMat.apply) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) }#
scaladsl.GraphDSL.createGraph([#g1#])(combineMat.apply) { b => ([#s1#]) => block.apply(b.asJava, [#s1#]) }#
]
}

View file

@ -21,6 +21,8 @@ trait GraphApply {
/**
* Creates a new [[Graph]] by importing the given graph `g1` and passing its [[Shape]]
* along with the [[GraphDSL.Builder]] to the given create function.
*
* Deprecated: this method signature does not work with Scala 3 type inference, kept for binary compatiblity. Use createGraph instead.
*/
def create[S <: Shape, Mat](g1: Graph[Shape, Mat])(buildBlock: GraphDSL.Builder[Mat] => (g1.Shape) => S): Graph[S, Mat] = {
val builder = new GraphDSL.Builder
@ -30,12 +32,26 @@ trait GraphApply {
createGraph(s, builder)
}
/**
* Creates a new [[Graph]] by importing the given graph `g1` and passing its [[Shape]]
* along with the [[GraphDSL.Builder]] to the given create function.
*/
def createGraph[S <: Shape, S1 <: Shape, Mat](g1: Graph[S1, Mat])(buildBlock: GraphDSL.Builder[Mat] => S1 => S): Graph[S, Mat] = {
val builder = new GraphDSL.Builder
val s1 = builder.add(g1, Keep.right)
val s = buildBlock(builder)(s1)
createGraph(s, builder)
}
[2..#
/**
* Creates a new [[Graph]] by importing the given graphs and passing their [[Shape]]s
* along with the [[GraphDSL.Builder]] to the given create function.
*
* Deprecated: this method signature does not work with Scala 3 type inference, kept for binary compatiblity. Use createGraph instead.
*/
def create[S <: Shape, Mat, [#M1#]]([#g1: Graph[Shape, M1]#])(combineMat: ([#M1#]) => Mat)(buildBlock: GraphDSL.Builder[Mat] => ([#g1.Shape#]) => S): Graph[S, Mat] = {
val builder = new GraphDSL.Builder
@ -46,8 +62,22 @@ trait GraphApply {
val s = buildBlock(builder)([#s1#])
createGraph(s, builder)
}#
}
/**
* Creates a new [[Graph]] by importing the given graphs and passing their [[Shape]]s
* along with the [[GraphDSL.Builder]] to the given create function.
*/
def createGraph[S <: Shape, Mat, [#M1#], [#S1 <: Shape#]]([#g1: Graph[S1, M1]#])(combineMat: ([#M1#]) => Mat)(buildBlock: GraphDSL.Builder[Mat] => ([#S1#]) => S): Graph[S, Mat] = {
val builder = new GraphDSL.Builder
val curried = combineMat.curried
val s##1 = builder.add(g##1, (m##1: M##1) => curried(m##1))
[2..#val s1 = builder.add(g1, (f: M1 => Any, m1: M1) => f(m1))#
]
val s = buildBlock(builder)([#s1#])
createGraph(s, builder)
}#
]
private def createGraph[S <: Shape, Mat](shape: S, graphBuilder: GraphDSL.Builder[Mat]): Graph[S, Mat] =

View file

@ -5,6 +5,7 @@
package akka.stream.scaladsl
import akka.stream._
import akka.stream.impl.ContextPropagation
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
trait ZipLatestWithApply {
@ -34,7 +35,8 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[F
// Without this field the completion signalling would take one extra pull
private var willShutDown = false
[#val inlet0 = new ZipLatestInlet(in0)#
private val contextPropagation = ContextPropagation()
[#private val inlet0 = new ZipLatestInlet(in0)#
]
private var waitingForTuple = false
private var staleTupleValues = true
@ -68,7 +70,8 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[F
private def hasAllValues = [#inlet0.hasValue#&&]
private def pushOutput(): Unit = {
push(out, zipper([#inlet0.value#,]))
contextPropagation.resumeContext()
push(out, zipper([#inlet0.value#]))
if (willShutDown) completeStage()
staleTupleValues = true
}
@ -85,8 +88,10 @@ class ZipLatestWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[F
var hasValue = false
override def onPush() = {
value = outer.grab(in)
hasValue = true
value = outer.grab(in)
// Only one context can be propagated.
if (outer.hasAllValues) contextPropagation.suspendContext()
outer.staleTupleValues = false
if (outer.waitingForTuple && outer.hasAllValues) {
outer.pushOutput()

View file

@ -5,6 +5,7 @@
package akka.stream.scaladsl
import akka.stream._
import akka.stream.impl.ContextPropagation
import akka.stream.stage._
trait ZipWithApply {
@ -34,8 +35,10 @@ class ZipWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[FanInSh
var pending = ##0
// Without this field the completion signalling would take one extra pull
var willShutDown = false
private val contextPropagation = ContextPropagation()
private def pushAll(): Unit = {
contextPropagation.resumeContext()
push(out, zipper([#grab(in0)#]))
if (willShutDown) completeStage()
else {
@ -51,6 +54,8 @@ class ZipWith1[[#A1#], O] (val zipper: ([#A1#]) => O) extends GraphStage[FanInSh
[#setHandler(in0, new InHandler {
override def onPush(): Unit = {
// Only one context can be propagated. Picked the first element as an arbitrary but deterministic choice.
if (0 == ##0) contextPropagation.suspendContext()
pending -= ##1
if (pending == ##0) pushAll()
}

View file

@ -0,0 +1,8 @@
# internal
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.MergeHub#MergedSourceLogic.enqueue")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.BoundedBuffer#FixedQueue.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.BoundedBuffer#DynamicQueue.this")
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.impl.PhasedFusingActorMaterializer.makeLogger")
# Effectively internal
ProblemFilters.exclude[IncompatibleSignatureProblem]("akka.stream.MaterializerLoggingProvider.makeLogger")

View file

@ -87,6 +87,39 @@ akka {
# slightly more bytes than this limit (at most one element more). It can be set to 0
# to disable the usage of the buffer.
write-buffer-size = 16 KiB
# In addition to the buffering described for property write-buffer-size, try to collect
# more consecutive writes from the upstream stream producers.
#
# The rationale is to increase write efficiency by avoiding separate small
# writes to the network which is expensive to do. Merging those writes together
# (up to `write-buffer-size`) improves throughput for small writes.
#
# The idea is that a running stream may produce multiple small writes consecutively
# in one go without waiting for any external input. To probe the stream for
# data, this features delays sending a write immediately by probing the stream
# for more writes. This works by rescheduling the TCP connection stage via the
# actor mailbox of the underlying actor. Thus, before the stage is reactivated
# the upstream gets another opportunity to emit writes.
#
# When the stage is reactivated and if new writes are detected another round-trip
# is scheduled. The loop repeats until either the number of round trips given in this
# setting is reached, the buffer reaches `write-buffer-size`, or no new writes
# were detected during the last round-trip.
#
# This mechanism ensures that a write is guaranteed to be sent when the remaining stream
# becomes idle waiting for external signals.
#
# In most cases, the extra latency this mechanism introduces should be negligible,
# but depending on the stream setup it may introduce a noticeable delay,
# if the upstream continuously produces small amounts of writes in a
# blocking (CPU-bound) way.
#
# In that case, the feature can either be disabled, or the producing CPU-bound
# work can be taken off-stream to avoid excessive delays (e.g. using `mapAsync` instead of `map`).
#
# A value of 0 disables this feature.
coalesce-writes = 10
}
# Time to wait for async materializer creation before throwing an exception

View file

@ -0,0 +1,35 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl
import akka.actor.ActorSystem
import akka.actor.ExtensionId
import akka.annotation.DoNotInherit
import akka.stream.scaladsl.Tcp
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import scala.annotation.nowarn
/*
* Some extensions here provide an apply that takes an implicit actor system which needs different slightly syntax to define
* on Scala 2 and Scala 3
*/
/**
* Not for user extension
*/
@DoNotInherit
trait TcpImplicitExtensionIdApply extends ExtensionId[Tcp] {
def apply()(implicit system: ActorSystem): Tcp = super.apply(system)
}
/**
* Not for user extension
*/
@DoNotInherit
@nowarn("msg=deprecated")
trait AkkaSSLConfigExtensionIdApply extends ExtensionId[AkkaSSLConfig] {
def apply()(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system)
}

View file

@ -0,0 +1,35 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl
import akka.actor.ActorSystem
import akka.actor.ExtensionId
import akka.annotation.DoNotInherit
import akka.stream.scaladsl.Tcp
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import scala.annotation.nowarn
/*
* Some extensions here provide an apply that takes an implicit actor system which needs different slightly syntax to define
* on Scala 2 and Scala 3
*/
/**
* Not for user extension
*/
@DoNotInherit
trait TcpImplicitExtensionIdApply extends ExtensionId[Tcp] {
def apply()(implicit system: ActorSystem): Tcp = super.apply(system)
}
/**
* Not for user extension
*/
@DoNotInherit
@nowarn("msg=deprecated")
trait AkkaSSLConfigExtensionIdApply extends ExtensionId[AkkaSSLConfig] {
def apply()(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system)
}

View file

@ -0,0 +1,35 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl
import akka.actor.ActorSystem
import akka.actor.ExtensionId
import akka.annotation.DoNotInherit
import akka.stream.scaladsl.Tcp
import com.typesafe.sslconfig.akka.AkkaSSLConfig
import scala.annotation.nowarn
/*
* Some extensions here provide an apply that takes an implicit actor system which needs different slightly syntax to define
* on Scala 2 and Scala 3
*/
/**
* Not for user extension
*/
@DoNotInherit
trait TcpImplicitExtensionIdApply extends ExtensionId[Tcp] {
override def apply(implicit system: ActorSystem): Tcp = super.apply(system)
}
/**
* Not for user extension
*/
@DoNotInherit
@nowarn("msg=deprecated")
trait AkkaSSLConfigExtensionIdApply extends ExtensionId[AkkaSSLConfig] {
override def apply(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system)
}

View file

@ -778,7 +778,9 @@ object IOSettings {
"Use setting 'akka.stream.materializer.io.tcp.write-buffer-size' or attribute TcpAttributes.writeBufferSize instead",
"2.6.0")
def apply(config: Config): IOSettings =
new IOSettings(tcpWriteBufferSize = math.min(Int.MaxValue, config.getBytes("tcp.write-buffer-size")).toInt)
new IOSettings(
tcpWriteBufferSize = math.min(Int.MaxValue, config.getBytes("tcp.write-buffer-size")).toInt,
coalesceWrites = config.getInt("tcp.coalesce-writes"))
@deprecated(
"Use setting 'akka.stream.materializer.io.tcp.write-buffer-size' or attribute TcpAttributes.writeBufferSize instead",
@ -809,19 +811,30 @@ object IOSettings {
@nowarn("msg=deprecated")
final class IOSettings private (
@deprecated("Use attribute 'TcpAttributes.TcpWriteBufferSize' to read the concrete setting value", "2.6.0")
val tcpWriteBufferSize: Int) {
val tcpWriteBufferSize: Int,
val coalesceWrites: Int) {
// constructor for binary compatibility with version 2.6.15 and earlier
@deprecated("Use attribute 'TcpAttributes.TcpWriteBufferSize' to read the concrete setting value", "2.6.0")
def this(tcpWriteBufferSize: Int) = this(tcpWriteBufferSize, coalesceWrites = 10)
def withTcpWriteBufferSize(value: Int): IOSettings = copy(tcpWriteBufferSize = value)
private def copy(tcpWriteBufferSize: Int): IOSettings = new IOSettings(tcpWriteBufferSize = tcpWriteBufferSize)
def withCoalesceWrites(value: Int): IOSettings = copy(coalesceWrites = value)
private def copy(tcpWriteBufferSize: Int = tcpWriteBufferSize, coalesceWrites: Int = coalesceWrites): IOSettings =
new IOSettings(tcpWriteBufferSize, coalesceWrites)
override def equals(other: Any): Boolean = other match {
case s: IOSettings => s.tcpWriteBufferSize == tcpWriteBufferSize
case s: IOSettings => s.tcpWriteBufferSize == tcpWriteBufferSize && s.coalesceWrites == coalesceWrites
case _ => false
}
override def hashCode(): Int =
31 * tcpWriteBufferSize + coalesceWrites
override def toString =
s"""IoSettings(${tcpWriteBufferSize})"""
s"""IoSettings($tcpWriteBufferSize,$coalesceWrites)"""
}
object StreamSubscriptionTimeoutSettings {

View file

@ -4,14 +4,15 @@
package akka.stream
import akka.annotation.DoNotInherit
import akka.event.LoggingAdapter
/**
* SPI intended only to be extended by custom [[Materializer]] implementations,
* that also want to provide operators they materialize with specialized [[akka.event.LoggingAdapter]] instances.
* Not for user extension
*/
@DoNotInherit
trait MaterializerLoggingProvider { this: Materializer =>
def makeLogger(logSource: Class[_]): LoggingAdapter
def makeLogger(logSource: Class[Any]): LoggingAdapter
}

View file

@ -233,7 +233,7 @@ import akka.util.unused
protected def downstreamRunning: Actor.Receive = {
case SubscribePending =>
subscribePending(exposedPublisher.takePendingSubscribers())
subscribePending(exposedPublisher.takePendingSubscribers().asInstanceOf[Seq[Subscriber[Any]]])
case RequestMore(_, elements) =>
if (elements < 1) {
error(ReactiveStreamsCompliance.numberOfElementsInRequestMustBePositiveException)

View file

@ -5,8 +5,7 @@
package akka.stream.impl
import java.{ util => ju }
import akka.annotation.InternalApi
import akka.annotation.{ InternalApi, InternalStableApi }
import akka.stream._
/**
@ -35,7 +34,7 @@ private[akka] object Buffer {
def apply[T](size: Int, effectiveAttributes: Attributes): Buffer[T] =
apply(size, effectiveAttributes.mandatoryAttribute[ActorAttributes.MaxFixedBufferSize].size)
def apply[T](size: Int, max: Int): Buffer[T] =
@InternalStableApi def apply[T](size: Int, max: Int): Buffer[T] =
if (size < FixedQueueSize || size < max) FixedSizeBuffer(size)
else new BoundedBuffer(size)
}
@ -54,7 +53,7 @@ private[akka] object Buffer {
*
* Returns a specialized instance for power-of-two sized buffers.
*/
@InternalApi private[akka] def apply[T](size: Int): FixedSizeBuffer[T] =
@InternalStableApi private[akka] def apply[T](size: Int): FixedSizeBuffer[T] =
if (size < 1) throw new IllegalArgumentException("size must be positive")
else if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size)
else new ModuloFixedSizeBuffer(size)
@ -144,27 +143,42 @@ private[akka] object Buffer {
*/
@InternalApi private[akka] final class BoundedBuffer[T](val capacity: Int) extends Buffer[T] {
import BoundedBuffer._
def used: Int = q.used
def isFull: Boolean = q.isFull
def isEmpty: Boolean = q.isEmpty
def nonEmpty: Boolean = q.nonEmpty
def enqueue(elem: T): Unit = q.enqueue(elem)
def dequeue(): T = q.dequeue()
def peek(): T = q.peek()
def clear(): Unit = q.clear()
def dropHead(): Unit = q.dropHead()
def dropTail(): Unit = q.dropTail()
private final class FixedQueue extends Buffer[T] {
private var q: Buffer[T] = new FixedQueue[T](capacity, newBuffer => q = newBuffer)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object BoundedBuffer {
private final class FixedQueue[T](override val capacity: Int, switchBuffer: Buffer[T] => Unit) extends Buffer[T] {
import Buffer._
private val queue = new Array[AnyRef](FixedQueueSize)
private var head = 0
private var tail = 0
override def capacity = BoundedBuffer.this.capacity
override def used = tail - head
override def isFull = used == capacity
override def isEmpty = tail == head
@ -172,11 +186,11 @@ private[akka] object Buffer {
override def enqueue(elem: T): Unit =
if (tail - head == FixedQueueSize) {
val queue = new DynamicQueue()
val queue = new DynamicQueue[T](capacity)
while (nonEmpty) {
queue.enqueue(dequeue())
}
q = queue
switchBuffer(queue)
queue.enqueue(elem)
} else {
queue(tail & FixedQueueMask) = elem.asInstanceOf[AnyRef]
@ -204,8 +218,7 @@ private[akka] object Buffer {
}
}
private final class DynamicQueue() extends ju.LinkedList[T] with Buffer[T] {
override def capacity = BoundedBuffer.this.capacity
private final class DynamicQueue[T](override val capacity: Int) extends ju.LinkedList[T] with Buffer[T] {
override def used = size
override def isFull = size == capacity
override def nonEmpty = !isEmpty()
@ -216,6 +229,4 @@ private[akka] object Buffer {
override def dropHead(): Unit = remove()
override def dropTail(): Unit = removeLast()
}
private var q: Buffer[T] = new FixedQueue
}

View file

@ -0,0 +1,31 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.impl
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
@InternalApi private[akka] trait ContextPropagation {
def suspendContext(): Unit
def resumeContext(): Unit
}
/**
* INTERNAL API
*/
@InternalApi private[akka] object ContextPropagation {
/**
* INTERNAL API
*/
@InternalApi def apply(): ContextPropagation = new ContextPropagationImpl
}
private[akka] final class ContextPropagationImpl extends ContextPropagation {
def suspendContext(): Unit = ()
def resumeContext(): Unit = ()
}

View file

@ -610,7 +610,7 @@ private final case class SavedIslandData(
}
}
override def makeLogger(logSource: Class[_]): LoggingAdapter =
override def makeLogger(logSource: Class[Any]): LoggingAdapter =
Logging(system, logSource)
/**

View file

@ -7,7 +7,7 @@ package akka.stream.impl
import scala.annotation.tailrec
import scala.util.control.NoStackTrace
import ResizableMultiReaderRingBuffer._
import ResizableMultiReaderRingBuffer.{ Cursor, Cursors, NothingToReadException }
import akka.annotation.InternalApi

View file

@ -299,8 +299,6 @@ import akka.util.ccompat._
require(maxConcurrentPulls > 0, "Max concurrent pulls must be greater than 0")
type Requested[E] = Promise[Option[E]]
val in = Inlet[T]("queueSink.in")
override def initialAttributes = DefaultAttributes.queueSink
override val shape: SinkShape[T] = SinkShape.of(in)
@ -309,14 +307,13 @@ import akka.util.ccompat._
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stageLogic = new GraphStageLogic(shape) with InHandler with SinkQueueWithCancel[T] {
type Received[E] = Try[Option[E]]
val maxBuffer = inheritedAttributes.get[InputBuffer](InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
// Allocates one additional element to hold stream closed/failure indicators
val buffer: Buffer[Received[T]] = Buffer(maxBuffer + 1, inheritedAttributes)
val currentRequests: Buffer[Requested[T]] = Buffer(maxConcurrentPulls, inheritedAttributes)
val buffer: Buffer[Try[Option[T]]] = Buffer(maxBuffer + 1, inheritedAttributes)
val currentRequests: Buffer[Promise[Option[T]]] = Buffer(maxConcurrentPulls, inheritedAttributes)
override def preStart(): Unit = {
setKeepGoing(true)
@ -324,7 +321,7 @@ import akka.util.ccompat._
}
private val callback = getAsyncCallback[Output[T]] {
case QueueSink.Pull(pullPromise) =>
case QueueSink.Pull(pullPromise: Promise[Option[T]] @unchecked) =>
if (currentRequests.isFull)
pullPromise.failure(
new IllegalStateException(s"Too many concurrent pulls. Specified maximum is $maxConcurrentPulls. " +
@ -337,7 +334,7 @@ import akka.util.ccompat._
case QueueSink.Cancel => completeStage()
}
def sendDownstream(promise: Requested[T]): Unit = {
def sendDownstream(promise: Promise[Option[T]]): Unit = {
val e = buffer.dequeue()
promise.complete(e)
e match {
@ -445,17 +442,19 @@ import akka.util.ccompat._
@InternalApi private[akka] final class MutableCollectorState[T, R](
collector: java.util.stream.Collector[T, Any, R],
accumulator: java.util.function.BiConsumer[Any, T],
val accumulated: Any)
_accumulated: Any)
extends CollectorState[T, R] {
override def accumulated(): Any = _accumulated
override def update(elem: T): CollectorState[T, R] = {
accumulator.accept(accumulated, elem)
accumulator.accept(_accumulated, elem)
this
}
override def finish(): R = {
// only called if completed without elements
collector.finisher().apply(accumulated)
collector.finisher().apply(_accumulated)
}
}

View file

@ -40,7 +40,7 @@ import akka.util.NanoTimeTokenBucket
private val nanosBetweenTokens = per.toNanos / cost
// 100 ms is a realistic minimum between tokens, otherwise the maximumBurst is adjusted
// to be able to support higher rates
val effectiveMaximumBurst =
val effectiveMaximumBurst: Long =
if (maximumBurst == Throttle.AutomaticMaximumBurst) math.max(1, ((100 * 1000 * 1000) / nanosBetweenTokens))
else maximumBurst
require(!(mode == ThrottleMode.Enforcing && effectiveMaximumBurst < 0), "maximumBurst must be > 0 in Enforcing mode")

View file

@ -230,6 +230,7 @@ import akka.stream.stage._
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
private var nextDeadline: Long = System.nanoTime + timeout.toNanos
private val contextPropagation = ContextPropagation()
setHandlers(in, out, this)
@ -242,6 +243,8 @@ import akka.stream.stage._
if (isAvailable(out)) {
push(out, grab(in))
pull(in)
} else {
contextPropagation.suspendContext()
}
}
@ -251,6 +254,7 @@ import akka.stream.stage._
override def onPull(): Unit = {
if (isAvailable(in)) {
contextPropagation.resumeContext()
push(out, grab(in))
if (isClosed(in)) completeStage()
else pull(in)

View file

@ -109,7 +109,7 @@ import akka.stream.stage._
* when this accidentally leaks onto threads that are not stopped when this
* class should be unloaded.
*/
override def initialValue = new Array(1)
override def initialValue: Array[AnyRef] = new Array(1)
}
/**

View file

@ -18,7 +18,7 @@ import akka.dispatch.ExecutionContexts
import akka.event.Logging
import akka.stream.{ Shape, _ }
import akka.stream.FlowMonitorState._
import akka.stream.impl.{ LinearTraversalBuilder, ReactiveStreamsCompliance }
import akka.stream.impl.{ ContextPropagation, LinearTraversalBuilder, ReactiveStreamsCompliance }
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.StreamLayout._
import akka.stream.scaladsl._
@ -82,11 +82,14 @@ import akka.stream.stage._
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private val contextPropagation = ContextPropagation()
def onPush(): Unit = {
if (isAvailable(out)) {
push(out, grab(in))
tryPull(in)
} else {
contextPropagation.suspendContext()
}
}
@ -96,6 +99,7 @@ import akka.stream.stage._
def onPull(): Unit = {
if (isAvailable(in)) {
contextPropagation.resumeContext()
push(out, grab(in))
if (isClosed(in)) completeStage()
else pull(in)

View file

@ -15,7 +15,7 @@ import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.OverflowStrategies._
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ ReactiveStreamsCompliance, Buffer => BufferImpl }
import akka.stream.impl.{ ReactiveStreamsCompliance, Buffer => BufferImpl, ContextPropagation }
import akka.stream.scaladsl.{ DelayStrategy, Source }
import akka.stream.stage._
import akka.stream.{ Supervision, _ }
@ -78,6 +78,7 @@ import akka.util.ccompat._
def decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
private var buffer: OptionVal[T] = OptionVal.none
private val contextPropagation = ContextPropagation()
override def preStart(): Unit = pull(in)
override def onPush(): Unit =
@ -87,9 +88,10 @@ import akka.util.ccompat._
if (isAvailable(out)) {
push(out, elem)
pull(in)
} else
} else {
buffer = OptionVal.Some(elem)
else pull(in)
contextPropagation.suspendContext()
} else pull(in)
} catch {
case NonFatal(ex) =>
decider(ex) match {
@ -101,6 +103,7 @@ import akka.util.ccompat._
override def onPull(): Unit =
buffer match {
case OptionVal.Some(value) =>
contextPropagation.resumeContext()
push(out, value)
buffer = OptionVal.none
if (!isClosed(in)) pull(in)
@ -1059,6 +1062,7 @@ private[stream] object Collect {
private var agg: Out = null.asInstanceOf[Out]
private var left: Long = max
private var pending: In = null.asInstanceOf[In]
private val contextPropagation = ContextPropagation()
private def flush(): Unit = {
if (agg != null) {
@ -1089,6 +1093,7 @@ private[stream] object Collect {
def onPush(): Unit = {
val elem = grab(in)
val cost = costFn(elem)
contextPropagation.suspendContext()
if (agg == null) {
try {
@ -1133,6 +1138,7 @@ private[stream] object Collect {
if (isClosed(in)) completeStage()
else if (!hasBeenPulled(in)) pull(in)
} else if (isClosed(in)) {
contextPropagation.resumeContext()
push(out, agg)
if (pending == null) completeStage()
else {
@ -1151,6 +1157,7 @@ private[stream] object Collect {
pending = null.asInstanceOf[In]
}
} else {
contextPropagation.resumeContext()
flush()
if (!hasBeenPulled(in)) pull(in)
}
@ -1182,12 +1189,14 @@ private[stream] object Collect {
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler {
private var iterator: Iterator[Out] = Iterator.empty
private var expanded = false
private val contextPropagation = ContextPropagation()
override def preStart(): Unit = pull(in)
def onPush(): Unit = {
iterator = extrapolate(grab(in))
if (iterator.hasNext) {
contextPropagation.suspendContext()
if (isAvailable(out)) {
expanded = true
pull(in)
@ -1203,6 +1212,7 @@ private[stream] object Collect {
def onPull(): Unit = {
if (iterator.hasNext) {
contextPropagation.resumeContext()
if (!expanded) {
expanded = true
if (isClosed(in)) {
@ -1765,6 +1775,7 @@ private[stream] object Collect {
private var totalWeight = 0L
private var totalNumber = 0
private var hasElements = false
private val contextPropagation = ContextPropagation()
override def preStart() = {
scheduleWithFixedDelay(GroupedWeightedWithin.groupedWeightedWithinTimer, interval, interval)
@ -1823,6 +1834,7 @@ private[stream] object Collect {
private def emitGroup(): Unit = {
groupEmitted = true
contextPropagation.resumeContext()
push(out, buf.result())
buf.clear()
if (!finished) startNewGroup()
@ -1849,6 +1861,7 @@ private[stream] object Collect {
}
override def onPush(): Unit = {
contextPropagation.suspendContext()
if (pending == null) nextElement(grab(in)) // otherwise keep the element for next round
}
@ -2072,6 +2085,7 @@ private[stream] object Collect {
override def toString = s"Reduce.Logic(aggregator=$aggregator)"
private var aggregator: T = _
private val empty: T = aggregator
private def decider =
inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
@ -2100,7 +2114,7 @@ private[stream] object Collect {
decider(ex) match {
case Supervision.Stop => failStage(ex)
case Supervision.Restart =>
aggregator = _: T
aggregator = empty
setInitialInHandler()
case _ => ()
@ -2199,15 +2213,20 @@ private[akka] final class StatefulMapConcat[In, Out](val f: () => In => Iterable
lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
var currentIterator: Iterator[Out] = _
var plainFun = f()
val contextPropagation = ContextPropagation()
def hasNext = if (currentIterator != null) currentIterator.hasNext else false
setHandlers(in, out, this)
def pushPull(): Unit =
def pushPull(shouldResumeContext: Boolean): Unit =
if (hasNext) {
if (shouldResumeContext) contextPropagation.resumeContext()
push(out, currentIterator.next())
if (!hasNext && isClosed(in)) completeStage()
if (hasNext) {
// suspend context for the next element
contextPropagation.suspendContext()
} else if (isClosed(in)) completeStage()
} else if (!isClosed(in))
pull(in)
else completeStage()
@ -2217,13 +2236,13 @@ private[akka] final class StatefulMapConcat[In, Out](val f: () => In => Iterable
override def onPush(): Unit =
try {
currentIterator = plainFun(grab(in)).iterator
pushPull()
pushPull(shouldResumeContext = false)
} catch handleException
override def onUpstreamFinish(): Unit = onFinish()
override def onPull(): Unit =
try pushPull()
try pushPull(shouldResumeContext = true)
catch handleException
private def handleException: Catcher[Unit] = {

View file

@ -185,8 +185,8 @@ import akka.util.ByteString
private[this] var off = 0
def hasRemaining: Boolean = off < input.size
def remainingSize: Int = input.size - off
def hasRemaining: Boolean = off < input.length
def remainingSize: Int = input.length - off
def currentOffset: Int = off

View file

@ -425,7 +425,7 @@ import akka.util.ByteString
case BUFFER_OVERFLOW =>
flushToUser()
transportInChoppingBlock.putBack(transportInBuffer)
case s => fail(new IllegalStateException(s"unexpected status $s in doUnwrap()"))
case null => fail(new IllegalStateException(s"unexpected status 'null' in doUnwrap()"))
}
}
@ -497,14 +497,15 @@ import akka.util.ByteString
def applySessionParameters(engine: SSLEngine, sessionParameters: NegotiateNewSession): Unit = {
sessionParameters.enabledCipherSuites.foreach(cs => engine.setEnabledCipherSuites(cs.toArray))
sessionParameters.enabledProtocols.foreach(p => engine.setEnabledProtocols(p.toArray))
sessionParameters.sslParameters.foreach(engine.setSSLParameters)
sessionParameters.clientAuth match {
case Some(TLSClientAuth.None) => engine.setNeedClientAuth(false)
case Some(TLSClientAuth.Want) => engine.setWantClientAuth(true)
case Some(TLSClientAuth.Need) => engine.setNeedClientAuth(true)
case _ => // do nothing
}
sessionParameters.sslParameters.foreach(engine.setSSLParameters)
}
def cloneParameters(old: SSLParameters): SSLParameters = {

View file

@ -11,7 +11,6 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong }
import scala.collection.immutable
import scala.concurrent.{ Future, Promise }
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.annotation.nowarn
import akka.{ Done, NotUsed }
@ -178,7 +177,7 @@ import akka.util.ByteString
private def unbindCompleted(): Unit = {
stageActor.unwatch(listener)
unbindPromise.trySuccess(Done)
unbindPromise.trySuccess(())
if (connectionFlowsAwaitingInitialization.get() == 0) completeStage()
else scheduleOnce(BindShutdownTimer, bindShutdownTimeout)
}
@ -193,7 +192,7 @@ import akka.util.ByteString
override def postStop(): Unit = {
// a bit unexpected to succeed here rather than fail with abrupt stage termination
// but there was an existing test case covering this behavior
unbindPromise.trySuccess(Done)
unbindPromise.trySuccess(())
bindingPromise.tryFailure(new NoSuchElementException("Binding was unbound before it was completely finished"))
}
}
@ -214,6 +213,9 @@ private[stream] object ConnectionSourceStage {
@InternalApi private[stream] object TcpConnectionStage {
case object WriteAck extends Tcp.Event
private case object WriteDelayAck extends Tcp.Event
private val WriteDelayMessage = Write(ByteString.empty, WriteDelayAck)
trait TcpRole {
def halfClose: Boolean
}
@ -253,8 +255,7 @@ private[stream] object ConnectionSourceStage {
@nowarn("msg=deprecated")
private val writeBufferSize = inheritedAttributes
.get[TcpAttributes.TcpWriteBufferSize](
TcpAttributes.TcpWriteBufferSize(
ActorMaterializerHelper.downcast(eagerMaterializer).settings.ioSettings.tcpWriteBufferSize))
TcpAttributes.TcpWriteBufferSize(eagerMaterializer.settings.ioSettings.tcpWriteBufferSize))
.size
private var writeBuffer = ByteString.empty
@ -264,6 +265,12 @@ private[stream] object ConnectionSourceStage {
// upstream already finished but are still writing the last data to the connection
private var connectionClosePending = false
@nowarn("msg=deprecated")
private val coalesceWrites = eagerMaterializer.settings.ioSettings.coalesceWrites
private def coalesceWritesDisabled = coalesceWrites == 0
private var writeDelayCountDown = 0
private var previousWriteBufferSize = 0
// No reading until role have been decided
setHandler(bytesOut, new OutHandler {
override def onPull(): Unit = ()
@ -309,6 +316,23 @@ private[stream] object ConnectionSourceStage {
}
}
private def sendWriteBuffer(): Unit = {
connection ! Write(writeBuffer, WriteAck)
writeInProgress = true
writeBuffer = ByteString.empty
}
/*
* Coalesce more frames by collecting more frames while waiting for round trip to the
* connection actor. WriteDelayMessage is an empty Write message and WriteDelayAck will
* be sent back as reply.
*/
private def sendWriteDelay(): Unit = {
previousWriteBufferSize = writeBuffer.length
writeInProgress = true
connection ! WriteDelayMessage
}
// Used for both inbound and outbound connections
private def connected(evt: (ActorRef, Any)): Unit = {
val msg = evt._2
@ -318,13 +342,24 @@ private[stream] object ConnectionSourceStage {
if (isClosed(bytesOut)) connection ! ResumeReading
else push(bytesOut, data)
case WriteDelayAck =>
// Immediately flush the write buffer if no more frames have been collected during the WriteDelayMessage
// round trip to the connection actor, or if reaching the configured maximum number of round trips, or
// if writeBuffer capacity has been exceeded.
writeDelayCountDown -= 1
if (writeDelayCountDown == 0 || previousWriteBufferSize == writeBuffer.length || writeBuffer.length >= writeBufferSize)
sendWriteBuffer()
else
sendWriteDelay()
case WriteAck =>
if (writeBuffer.isEmpty)
writeInProgress = false
else if (coalesceWritesDisabled || writeBuffer.length >= writeBufferSize)
sendWriteBuffer()
else {
connection ! Write(writeBuffer, WriteAck)
writeInProgress = true
writeBuffer = ByteString.empty
writeDelayCountDown = coalesceWrites
sendWriteDelay()
}
if (!writeInProgress && connectionClosePending) {
@ -417,12 +452,15 @@ private[stream] object ConnectionSourceStage {
ReactiveStreamsCompliance.requireNonNullElement(elem)
if (writeInProgress) {
writeBuffer = writeBuffer ++ elem
} else if (coalesceWritesDisabled || writeBuffer.length >= writeBufferSize) {
writeBuffer = writeBuffer ++ elem
sendWriteBuffer()
} else {
connection ! Write(writeBuffer ++ elem, WriteAck)
writeInProgress = true
writeBuffer = ByteString.empty
writeBuffer = writeBuffer ++ elem
writeDelayCountDown = coalesceWrites
sendWriteDelay()
}
if (writeBuffer.size < writeBufferSize)
if (writeBuffer.length < writeBufferSize)
pull(bytesIn)
}

View file

@ -19,12 +19,12 @@ import akka.util.{ ByteString, ByteStringBuilder }
protected lazy val deflater = new Deflater(level, nowrap)
override final def compressAndFlush(input: ByteString): ByteString = {
val buffer = newTempBuffer(input.size)
val buffer = newTempBuffer(input.length)
compressWithBuffer(input, buffer) ++ flushWithBuffer(buffer)
}
override final def compressAndFinish(input: ByteString): ByteString = {
val buffer = newTempBuffer(input.size)
val buffer = newTempBuffer(input.length)
compressWithBuffer(input, buffer) ++ finishWithBuffer(buffer)
}

View file

@ -12,7 +12,7 @@ import akka.stream.StreamRefSettings
/** INTERNAL API */
@InternalApi
private[akka] final case class StreamRefSettingsImpl private (
private[akka] final case class StreamRefSettingsImpl(
override val bufferCapacity: Int,
override val demandRedeliveryInterval: FiniteDuration,
override val subscriptionTimeout: FiniteDuration,

View file

@ -25,8 +25,8 @@ object BidiFlow {
*/
def fromGraph[I1, O1, I2, O2, M](g: Graph[BidiShape[I1, O1, I2, O2], M]): BidiFlow[I1, O1, I2, O2, M] =
g match {
case bidi: BidiFlow[I1, O1, I2, O2, M] => bidi
case other => new BidiFlow(scaladsl.BidiFlow.fromGraph(other))
case bidi: BidiFlow[I1, O1, I2, O2, M] @unchecked => bidi
case other => new BidiFlow(scaladsl.BidiFlow.fromGraph(other))
}
/**

View file

@ -12,6 +12,8 @@ import java.util.concurrent.CompletionStage
import akka.stream.{ javadsl, scaladsl, IOResult }
import akka.util.ByteString
import akka.util.ccompat.JavaConverters._
import akka.stream.scaladsl.SinkToCompletionStage
import akka.stream.scaladsl.SourceToCompletionStage
/**
* Java API: Factories to create sinks and sources from files

View file

@ -64,8 +64,8 @@ object Flow {
*/
def fromGraph[I, O, M](g: Graph[FlowShape[I, O], M]): Flow[I, O, M] =
g match {
case f: Flow[I, O, M] => f
case other => new Flow(scaladsl.Flow.fromGraph(other))
case f: Flow[I, O, M] @unchecked => f
case other => new Flow(scaladsl.Flow.fromGraph(other))
}
/**
@ -1679,9 +1679,9 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def recoverWith(
clazz: Class[_ <: Throwable],
supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): javadsl.Flow[In, Out, Mat] =
recoverWith {
recoverWith({
case elem if clazz.isInstance(elem) => supplier.get()
}
}: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]])
/**
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after

View file

@ -668,15 +668,15 @@ object GraphDSL extends GraphCreate {
def to[I, O](j: UniformFanInShape[I, O]): ReverseOps[I] = new ReverseOps(findIn(delegate, j, 0))
def to[I, O](j: UniformFanOutShape[I, O]): ReverseOps[I] = new ReverseOps(j.in)
final class ForwardOps[T](out: Outlet[T]) {
def toInlet(in: Inlet[_ >: T]): Builder[Mat] = { out ~> in; self }
def to(dst: SinkShape[_ >: T]): Builder[Mat] = { out ~> dst; self }
def toFanIn[U](j: UniformFanInShape[_ >: T, U]): Builder[Mat] = { out ~> j; self }
def toFanOut[U](j: UniformFanOutShape[_ >: T, U]): Builder[Mat] = { out ~> j; self }
def via[U](f: FlowShape[_ >: T, U]): ForwardOps[U] = from((out ~> f).outlet)
def viaFanIn[U](j: UniformFanInShape[_ >: T, U]): ForwardOps[U] = from((out ~> j).outlet)
def viaFanOut[U](j: UniformFanOutShape[_ >: T, U]): ForwardOps[U] = from((out ~> j).outlet)
def out(): Outlet[T] = out
final class ForwardOps[T](_out: Outlet[T]) {
def toInlet(in: Inlet[_ >: T]): Builder[Mat] = { _out ~> in; self }
def to(dst: SinkShape[_ >: T]): Builder[Mat] = { _out ~> dst; self }
def toFanIn[U](j: UniformFanInShape[_ >: T, U]): Builder[Mat] = { _out ~> j; self }
def toFanOut[U](j: UniformFanOutShape[_ >: T, U]): Builder[Mat] = { _out ~> j; self }
def via[U](f: FlowShape[_ >: T, U]): ForwardOps[U] = from((_out ~> f).outlet)
def viaFanIn[U](j: UniformFanInShape[_ >: T, U]): ForwardOps[U] = from((_out ~> j).outlet)
def viaFanOut[U](j: UniformFanOutShape[_ >: T, U]): ForwardOps[U] = from((_out ~> j).outlet)
def out(): Outlet[T] = _out
}
final class ReverseOps[T](out: Inlet[T]) {

View file

@ -25,6 +25,7 @@ import akka.japi.function
import akka.japi.function.Creator
import akka.stream.{ javadsl, scaladsl, _ }
import akka.stream.impl.LinearTraversalBuilder
import akka.stream.scaladsl.SinkToCompletionStage
/** Java API */
object Sink {
@ -313,8 +314,8 @@ object Sink {
*/
def fromGraph[T, M](g: Graph[SinkShape[T], M]): Sink[T, M] =
g match {
case s: Sink[T, M] => s
case other => new Sink(scaladsl.Sink.fromGraph(other))
case s: Sink[T, M] @unchecked => s
case other => new Sink(scaladsl.Sink.fromGraph(other))
}
/**

View file

@ -622,7 +622,7 @@ object Source {
*/
def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] =
g match {
case s: Source[T, M] => s
case s: Source[T, M] @unchecked => s
case s if s eq scaladsl.Source.empty => empty().asInstanceOf[Source[T, M]]
case other => new Source(scaladsl.Source.fromGraph(other))
}
@ -2131,9 +2131,9 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def recoverWith(
clazz: Class[_ <: Throwable],
supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] =
recoverWith {
recoverWith({
case elem if clazz.isInstance(elem) => supplier.get()
}
}: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]])
/**
* RecoverWithRetries allows to switch to alternative Source on flow failure. It will stay in effect after
@ -2195,7 +2195,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
supplier: Supplier[Graph[SourceShape[Out], NotUsed]]): Source[Out, Mat] =
recoverWithRetries(attempts, {
case elem if clazz.isInstance(elem) => supplier.get()
})
}: PartialFunction[Throwable, Graph[SourceShape[Out], NotUsed]])
/**
* Transform each input element into an `Iterable` of output elements that is

View file

@ -17,6 +17,8 @@ import akka.japi.function
import akka.stream.{ javadsl, scaladsl }
import akka.stream.IOResult
import akka.util.ByteString
import akka.stream.scaladsl.SinkToCompletionStage
import akka.stream.scaladsl.SourceToCompletionStage
/**
* Converters for interacting with the blocking `java.io` streams APIs and Java 8 Streams

View file

@ -232,8 +232,8 @@ object BidiFlow {
*/
def fromGraph[I1, O1, I2, O2, Mat](graph: Graph[BidiShape[I1, O1, I2, O2], Mat]): BidiFlow[I1, O1, I2, O2, Mat] =
graph match {
case bidi: BidiFlow[I1, O1, I2, O2, Mat] => bidi
case bidi: javadsl.BidiFlow[I1, O1, I2, O2, Mat] => bidi.asScala
case bidi: BidiFlow[I1, O1, I2, O2, Mat] => bidi
case bidi: javadsl.BidiFlow[I1, O1, I2, O2, Mat] @unchecked => bidi.asScala
case other =>
new BidiFlow(other.traversalBuilder, other.shape)
}

View file

@ -379,7 +379,7 @@ object Flow {
def fromGraph[I, O, M](g: Graph[FlowShape[I, O], M]): Flow[I, O, M] =
g match {
case f: Flow[I, O, M] => f
case f: javadsl.Flow[I, O, M] => f.asScala
case f: javadsl.Flow[I, O, M] @unchecked => f.asScala
case g: GraphStageWithMaterializedValue[FlowShape[I, O], M] =>
// move these from the operator itself to make the returned source
// behave as it is the operator with regards to attributes
@ -464,7 +464,7 @@ object Flow {
*/
def fromSinkAndSourceMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(
combine: (M1, M2) => M): Flow[I, O, M] =
fromGraph(GraphDSL.create(sink, source)(combine) { _ => (in, out) =>
fromGraph(GraphDSL.createGraph(sink, source)(combine) { _ => (in, out) =>
FlowShape(in.in, out.out)
})
@ -560,7 +560,7 @@ object Flow {
def fromSinkAndSourceCoupledMat[I, O, M1, M2, M](sink: Graph[SinkShape[I], M1], source: Graph[SourceShape[O], M2])(
combine: (M1, M2) => M): Flow[I, O, M] =
// format: OFF
Flow.fromGraph(GraphDSL.create(sink, source)(combine) { implicit b => (i, o) =>
Flow.fromGraph(GraphDSL.createGraph(sink, source)(combine) { implicit b => (i, o) =>
import GraphDSL.Implicits._
val bidi = b.add(new CoupledTerminationBidi[I, O])
/* bidi.in1 ~> */ bidi.out1 ~> i; o ~> bidi.in2 /* ~> bidi.out2 */
@ -2699,7 +2699,7 @@ trait FlowOps[+Out, +Mat] {
}
protected def zipGraph[U, M](that: Graph[SourceShape[U], M]): Graph[FlowShape[Out @uncheckedVariance, (Out, U)], M] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
val zip = b.add(Zip[Out, U]())
r ~> zip.in1
FlowShape(zip.in0, zip.out)
@ -2725,7 +2725,7 @@ trait FlowOps[+Out, +Mat] {
protected def zipLatestGraph[U, M](
that: Graph[SourceShape[U], M]): Graph[FlowShape[Out @uncheckedVariance, (Out, U)], M] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
val zip = b.add(ZipLatest[Out, U]())
r ~> zip.in1
FlowShape(zip.in0, zip.out)
@ -2748,7 +2748,7 @@ trait FlowOps[+Out, +Mat] {
protected def zipWithGraph[Out2, Out3, M](that: Graph[SourceShape[Out2], M])(
combine: (Out, Out2) => Out3): Graph[FlowShape[Out @uncheckedVariance, Out3], M] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
val zip = b.add(ZipWith[Out, Out2, Out3](combine))
r ~> zip.in1
FlowShape(zip.in0, zip.out)
@ -2776,7 +2776,7 @@ trait FlowOps[+Out, +Mat] {
protected def zipLatestWithGraph[Out2, Out3, M](that: Graph[SourceShape[Out2], M])(
combine: (Out, Out2) => Out3): Graph[FlowShape[Out @uncheckedVariance, Out3], M] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
val zip = b.add(ZipLatestWith[Out, Out2, Out3](combine))
r ~> zip.in1
FlowShape(zip.in0, zip.out)
@ -2858,7 +2858,7 @@ trait FlowOps[+Out, +Mat] {
that: Graph[SourceShape[U], M],
segmentSize: Int,
eagerClose: Boolean = false): Graph[FlowShape[Out @uncheckedVariance, U], M] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
val interleave = b.add(Interleave[U](2, segmentSize, eagerClose))
r ~> interleave.in(1)
FlowShape(interleave.in(0), interleave.out)
@ -2882,7 +2882,7 @@ trait FlowOps[+Out, +Mat] {
protected def mergeGraph[U >: Out, M](
that: Graph[SourceShape[U], M],
eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], M] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
val merge = b.add(Merge[U](2, eagerComplete))
r ~> merge.in(1)
FlowShape(merge.in(0), merge.out)
@ -2904,7 +2904,7 @@ trait FlowOps[+Out, +Mat] {
protected def mergeLatestGraph[U >: Out, M](
that: Graph[SourceShape[U], M],
eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, immutable.Seq[U]], M] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
val merge = b.add(MergeLatest[U](2, eagerComplete))
r ~> merge.in(1)
FlowShape(merge.in(0), merge.out)
@ -2929,7 +2929,7 @@ trait FlowOps[+Out, +Mat] {
that: Graph[SourceShape[U], M],
priority: Boolean,
eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], M] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
val merge = b.add(MergePreferred[U](1, eagerComplete))
r ~> merge.in(if (priority) 0 else 1)
FlowShape(merge.in(if (priority) 1 else 0), merge.out)
@ -2956,7 +2956,7 @@ trait FlowOps[+Out, +Mat] {
leftPriority: Int,
rightPriority: Int,
eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], M] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
val merge = b.add(MergePrioritized[U](Seq(leftPriority, rightPriority), eagerComplete))
r ~> merge.in(1)
FlowShape(merge.in(0), merge.out)
@ -2982,7 +2982,7 @@ trait FlowOps[+Out, +Mat] {
protected def mergeSortedGraph[U >: Out, M](that: Graph[SourceShape[U], M])(
implicit ord: Ordering[U]): Graph[FlowShape[Out @uncheckedVariance, U], M] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
val merge = b.add(new MergeSorted[U])
r ~> merge.in1
FlowShape(merge.in0, merge.out)
@ -3017,7 +3017,7 @@ trait FlowOps[+Out, +Mat] {
protected def concatGraph[U >: Out, Mat2](
that: Graph[SourceShape[U], Mat2],
detached: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
val merge = b.add(Concat[U](2, detached))
r ~> merge.in(1)
FlowShape(merge.in(0), merge.out)
@ -3088,7 +3088,7 @@ trait FlowOps[+Out, +Mat] {
protected def prependGraph[U >: Out, Mat2](
that: Graph[SourceShape[U], Mat2],
detached: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
val merge = b.add(Concat[U](2, detached))
r ~> merge.in(0)
FlowShape(merge.in(1), merge.out)
@ -3144,7 +3144,7 @@ trait FlowOps[+Out, +Mat] {
protected def orElseGraph[U >: Out, Mat2](
secondary: Graph[SourceShape[U], Mat2]): Graph[FlowShape[Out @uncheckedVariance, U], Mat2] =
GraphDSL.create(secondary) { implicit b => secondary =>
GraphDSL.createGraph(secondary) { implicit b => secondary =>
val orElse = b.add(OrElse[U]())
secondary ~> orElse.in(1)
@ -3200,7 +3200,7 @@ trait FlowOps[+Out, +Mat] {
def alsoTo(that: Graph[SinkShape[Out], _]): Repr[Out] = via(alsoToGraph(that))
protected def alsoToGraph[M](that: Graph[SinkShape[Out], M]): Graph[FlowShape[Out @uncheckedVariance, Out], M] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
import GraphDSL.Implicits._
val bcast = b.add(Broadcast[Out](2, eagerCancel = true))
bcast.out(1) ~> r
@ -3224,7 +3224,7 @@ trait FlowOps[+Out, +Mat] {
protected def divertToGraph[M](
that: Graph[SinkShape[Out], M],
when: Out => Boolean): Graph[FlowShape[Out @uncheckedVariance, Out], M] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
import GraphDSL.Implicits._
val partition = b.add(new Partition[Out](2, out => if (when(out)) 1 else 0, true))
partition.out(1) ~> r
@ -3250,7 +3250,7 @@ trait FlowOps[+Out, +Mat] {
def wireTap(that: Graph[SinkShape[Out], _]): Repr[Out] = via(wireTapGraph(that))
protected def wireTapGraph[M](that: Graph[SinkShape[Out], M]): Graph[FlowShape[Out @uncheckedVariance, Out], M] =
GraphDSL.create(that) { implicit b => r =>
GraphDSL.createGraph(that) { implicit b => r =>
import GraphDSL.Implicits._
val bcast = b.add(WireTap[Out]())
bcast.out1 ~> r

View file

@ -265,27 +265,27 @@ object Framing {
s"Read ${possibleMatchPos - previous} bytes " +
s"which is more than $maximumLineBytes without seeing a line terminator"))
} else if (possibleMatchPos == -1) {
if (buffer.size - previous > maximumLineBytes)
if (buffer.length - previous > maximumLineBytes)
failStage(
new FramingException(
s"Read ${buffer.size - previous} bytes " +
s"Read ${buffer.length - previous} bytes " +
s"which is more than $maximumLineBytes without seeing a line terminator"))
else {
// No matching character, we need to accumulate more bytes into the buffer
nextPossibleMatch = buffer.size
nextPossibleMatch = buffer.length
doParse()
}
} else if (possibleMatchPos + separatorBytes.size > buffer.size) {
} else if (possibleMatchPos + separatorBytes.length > buffer.length) {
// We have found a possible match (we found the first character of the terminator
// sequence) but we don't have yet enough bytes. We remember the position to
// retry from next time.
nextPossibleMatch = possibleMatchPos
doParse()
} else if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.size) == separatorBytes) {
} else if (buffer.slice(possibleMatchPos, possibleMatchPos + separatorBytes.length) == separatorBytes) {
// Found a match, mark start and end position and iterate if possible
indices += (previous -> possibleMatchPos)
nextPossibleMatch = possibleMatchPos + separatorBytes.size
if (nextPossibleMatch == buffer.size || indices.isFull) {
nextPossibleMatch = possibleMatchPos + separatorBytes.length
if (nextPossibleMatch == buffer.length || indices.isFull) {
doParse()
} else {
searchIndices()

View file

@ -1203,10 +1203,13 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[U
// Without this field the completion signalling would take one extra pull
var willShutDown = false
private val contextPropagation = ContextPropagation()
val grabInlet = grab[A] _
val pullInlet = pull[A] _
private def pushAll(): Unit = {
contextPropagation.resumeContext()
push(out, zipper(shape.inlets.map(grabInlet)))
if (willShutDown) completeStage()
else shape.inlets.foreach(pullInlet)
@ -1216,20 +1219,22 @@ class ZipWithN[A, O](zipper: immutable.Seq[A] => O)(n: Int) extends GraphStage[U
shape.inlets.foreach(pullInlet)
}
shape.inlets.foreach(in => {
setHandler(in, new InHandler {
override def onPush(): Unit = {
pending -= 1
if (pending == 0) pushAll()
}
shape.inlets.zipWithIndex.foreach {
case (in, i) =>
setHandler(in, new InHandler {
override def onPush(): Unit = {
// Only one context can be propagated. Picked the first element as an arbitrary but deterministic choice.
if (i == 0) contextPropagation.suspendContext()
pending -= 1
if (pending == 0) pushAll()
}
override def onUpstreamFinish(): Unit = {
if (!isAvailable(in)) completeStage()
willShutDown = true
}
})
})
override def onUpstreamFinish(): Unit = {
if (!isAvailable(in)) completeStage()
willShutDown = true
}
})
}
def onPull(): Unit = {
pending += n

View file

@ -239,7 +239,7 @@ private[akka] class MergeHub[T](perProducerBufferSize: Int, drainingEnabled: Boo
def isDraining: Boolean = drainingEnabled && draining
// External API
def enqueue(ev: Event): Unit = {
private[MergeHub] def enqueue(ev: Event): Unit = {
queue.add(ev)
/*
* Simple volatile var is enough, there is no need for a CAS here. The first important thing to note

View file

@ -139,7 +139,7 @@ object Sink {
def fromGraph[T, M](g: Graph[SinkShape[T], M]): Sink[T, M] =
g match {
case s: Sink[T, M] => s
case s: javadsl.Sink[T, M] => s.asScala
case s: javadsl.Sink[T, M] @unchecked => s.asScala
case g: GraphStageWithMaterializedValue[SinkShape[T], M] =>
// move these from the stage itself to make the returned source
// behave as it is the stage with regards to attributes
@ -161,7 +161,7 @@ object Sink {
def fromMaterializer[T, M](factory: (Materializer, Attributes) => Sink[T, M]): Sink[T, Future[M]] =
Flow
.fromMaterializer({ (mat, attr) =>
Flow.fromGraph(GraphDSL.create(factory(mat, attr)) { b => sink =>
Flow.fromGraph(GraphDSL.createGraph(factory(mat, attr)) { b => sink =>
FlowShape(sink.in, b.materializedValue.outlet)
})
})
@ -579,7 +579,7 @@ object Sink {
onInitMessage: Any,
ackMessage: Any,
onCompleteMessage: Any,
onFailureMessage: (Throwable) => Any = Status.Failure): Sink[T, NotUsed] =
onFailureMessage: (Throwable) => Any = Status.Failure.apply): Sink[T, NotUsed] =
actorRefWithAck(ref, _ => identity, _ => onInitMessage, Some(ackMessage), onCompleteMessage, onFailureMessage)
/**

View file

@ -315,7 +315,7 @@ object Source {
*/
def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = g match {
case s: Source[T, M] => s
case s: javadsl.Source[T, M] => s.asScala
case s: javadsl.Source[T, M] @unchecked => s.asScala
case g: GraphStageWithMaterializedValue[SourceShape[T], M] =>
// move these from the stage itself to make the returned source
// behave as it is the stage with regards to attributes
@ -783,7 +783,7 @@ object Source {
*/
def combineMat[T, U, M1, M2, M](first: Source[T, M1], second: Source[T, M2])(
strategy: Int => Graph[UniformFanInShape[T, U], NotUsed])(matF: (M1, M2) => M): Source[U, M] = {
val secondPartiallyCombined = GraphDSL.create(second) { implicit b => secondShape =>
val secondPartiallyCombined = GraphDSL.createGraph(second) { implicit b => secondShape =>
import GraphDSL.Implicits._
val c = b.add(strategy(2))
secondShape ~> c.in(1)

View file

@ -135,7 +135,7 @@ object StreamConverters {
if (parallelism == 1) javaCollector[T, R](collectorFactory)
else {
Sink
.fromGraph(GraphDSL.create(Sink.head[R]) { implicit b => sink =>
.fromGraph(GraphDSL.createGraph(Sink.head[R]) { implicit b => sink =>
import GraphDSL.Implicits._
val factory = collectorFactory.asInstanceOf[() => Collector[T, Any, R]]
val balance = b.add(Balance[T](parallelism))

View file

@ -9,7 +9,6 @@ import java.util.concurrent.TimeoutException
import javax.net.ssl.SSLContext
import javax.net.ssl.SSLEngine
import javax.net.ssl.SSLSession
import scala.collection.immutable
import scala.concurrent.Future
import scala.concurrent.duration.Duration
@ -17,9 +16,7 @@ import scala.concurrent.duration.FiniteDuration
import scala.util.Success
import scala.util.Try
import scala.util.control.NoStackTrace
import scala.annotation.nowarn
import akka.Done
import akka.NotUsed
import akka.actor._
@ -30,6 +27,7 @@ import akka.io.Inet.SocketOption
import akka.stream._
import akka.stream.Attributes.Attribute
import akka.stream.TLSProtocol.NegotiateNewSession
import akka.stream.impl.TcpImplicitExtensionIdApply
import akka.stream.impl.fusing.GraphStages.detacher
import akka.stream.impl.io.ConnectionSourceStage
import akka.stream.impl.io.OutgoingConnectionStage
@ -38,7 +36,7 @@ import akka.util.ByteString
import akka.util.JavaDurationConverters._
import akka.util.unused
object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
object Tcp extends ExtensionId[Tcp] with TcpImplicitExtensionIdApply with ExtensionIdProvider {
/**
* Represents a successful TCP server binding.
@ -79,8 +77,6 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
*/
final case class OutgoingConnection(remoteAddress: InetSocketAddress, localAddress: InetSocketAddress)
def apply()(implicit system: ActorSystem): Tcp = super.apply(system)
override def get(system: ActorSystem): Tcp = super.get(system)
override def get(system: ClassicActorSystemProvider): Tcp = super.get(system)
@ -90,7 +86,7 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
// just wraps/unwraps the TLS byte events to provide ByteString, ByteString flows
private val tlsWrapping: BidiFlow[ByteString, TLSProtocol.SendBytes, TLSProtocol.SslTlsInbound, ByteString, NotUsed] =
BidiFlow.fromFlows(Flow[ByteString].map(TLSProtocol.SendBytes), Flow[TLSProtocol.SslTlsInbound].collect {
BidiFlow.fromFlows(Flow[ByteString].map(TLSProtocol.SendBytes.apply), Flow[TLSProtocol.SslTlsInbound].collect {
case sb: TLSProtocol.SessionBytes => sb.bytes
// ignore other kinds of inbounds (currently only Truncated)
})

View file

@ -282,7 +282,7 @@ private[akka] object ConcurrentAsyncCallbackState {
// stream is initialized and so no threads can just send events without any synchronization overhead
case object Initialized extends State[Nothing]
// Event with feedback promise
final case class Event[E](e: E, handlingPromise: Promise[Done])
final case class Event[+E](e: E, handlingPromise: Promise[Done])
val NoPendingEvents = Pending[Nothing](Nil)
}
@ -1243,7 +1243,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
// started - can just dispatch async message to interpreter
onAsyncInput(event, promise)
case list @ Pending(l) =>
case list @ Pending(l: List[Event[T]]) =>
// not started yet
if (!currentState.compareAndSet(list, Pending[T](Event[T](event, promise) :: l)))
invokeWithPromise(event, promise)

View file

@ -30,7 +30,7 @@ trait StageLogging { self: GraphStageLogic =>
if (_log eq null) {
materializer match {
case p: MaterializerLoggingProvider =>
_log = p.makeLogger(logSource)
_log = p.makeLogger(logSource.asInstanceOf[Class[Any]])
case _ =>
_log = NoLogging
}

View file

@ -8,23 +8,21 @@ import java.security.KeyStore
import java.security.cert.CertPathValidatorException
import java.util.Collections
import javax.net.ssl._
import com.typesafe.sslconfig.akka.util.AkkaLoggerFactory
import com.typesafe.sslconfig.ssl._
import com.typesafe.sslconfig.util.LoggerFactory
import akka.actor._
import akka.annotation.InternalApi
import akka.event.Logging
import akka.stream.impl.AkkaSSLConfigExtensionIdApply
@deprecated("Use Tcp and TLS with SSLEngine parameters instead. Setup the SSLEngine with needed parameters.", "2.6.0")
object AkkaSSLConfig extends ExtensionId[AkkaSSLConfig] with ExtensionIdProvider {
object AkkaSSLConfig extends ExtensionId[AkkaSSLConfig] with AkkaSSLConfigExtensionIdApply with ExtensionIdProvider {
//////////////////// EXTENSION SETUP ///////////////////
override def get(system: ActorSystem): AkkaSSLConfig = super.get(system)
override def get(system: ClassicActorSystemProvider): AkkaSSLConfig = super.get(system)
def apply()(implicit system: ActorSystem): AkkaSSLConfig = super.apply(system)
override def lookup = AkkaSSLConfig
@ -44,7 +42,7 @@ final class AkkaSSLConfig(system: ExtendedActorSystem, val config: SSLConfigSett
private val mkLogger = new AkkaLoggerFactory(system)
private val log = Logging(system, getClass)
private val log = Logging(system, classOf[AkkaSSLConfig])
log.debug("Initializing AkkaSSLConfig extension...")
/** Can be used to modify the underlying config, most typically used to change a few values in the default config */

View file

@ -27,9 +27,9 @@ object Dependencies {
val jacksonVersion = "2.11.4"
val scala212Version = "2.12.13"
val scala212Version = "2.12.14"
val scala213Version = "2.13.5"
val scala3Version = "3.0.0"
val scala3Version = "3.0.1-RC1"
val reactiveStreamsVersion = "1.0.3"
@ -93,7 +93,7 @@ object Dependencies {
val sigar = "org.fusesource" % "sigar" % "1.6.4" // ApacheV2
val jctools = "org.jctools" % "jctools-core" % "3.2.0" // ApacheV2
val jctools = "org.jctools" % "jctools-core" % "3.3.0" // ApacheV2
// reactive streams
val reactiveStreams = "org.reactivestreams" % "reactive-streams" % reactiveStreamsVersion // CC0
@ -131,7 +131,7 @@ object Dependencies {
object Docs {
val sprayJson = "io.spray" %% "spray-json" % "1.3.6" % "test"
val gson = "com.google.code.gson" % "gson" % "2.8.6" % "test"
val gson = "com.google.code.gson" % "gson" % "2.8.7" % "test"
}
object Test {

View file

@ -13,7 +13,7 @@ import com.typesafe.tools.mima.plugin.MimaPlugin.autoImport._
object MiMa extends AutoPlugin {
private val latestPatchOf25 = 32
private val latestPatchOf26 = 14
private val latestPatchOf26 = 15
override def requires = MimaPlugin
override def trigger = allRequirements

View file

@ -9,6 +9,7 @@ import sbt.Keys._
import java.io.File
import sbtwhitesource.WhiteSourcePlugin.autoImport.whitesourceIgnore
import com.lightbend.sbt.publishrsync.PublishRsyncPlugin.autoImport.publishRsyncHost
import xerial.sbt.Sonatype.autoImport.sonatypeProfileName
object Publish extends AutoPlugin {
@ -22,6 +23,7 @@ object Publish extends AutoPlugin {
credentials ++= akkaCredentials,
organizationName := "Lightbend Inc.",
organizationHomepage := Some(url("https://www.lightbend.com")),
sonatypeProfileName := "com.typesafe",
startYear := Some(2009),
developers := List(
Developer(

View file

@ -16,15 +16,15 @@ addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.27")
// sbt-osgi 0.9.6 is available but breaks populating akka-protobuf-v3
addSbtPlugin("com.typesafe.sbt" % "sbt-osgi" % "0.9.4")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.8.1")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "2.1.1")
addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.4.3")
addSbtPlugin("com.thoughtworks.sbt-api-mappings" % "sbt-api-mappings" % "3.0.0")
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.0")
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.4.3")
addSbtPlugin("io.spray" % "sbt-boilerplate" % "0.6.1")
addSbtPlugin("com.lightbend.akka" % "sbt-paradox-akka" % "0.38")
addSbtPlugin("com.lightbend" % "sbt-whitesource" % "0.1.18")
addSbtPlugin("de.heikoseeberger" % "sbt-header" % "5.6.0")
addSbtPlugin("com.hpe.sbt" % "sbt-pull-request-validator" % "1.0.0")
addSbtPlugin("net.bzzt" % "sbt-reproducible-builds" % "0.25")
addSbtPlugin("com.dwijnand" % "sbt-dynver" % "4.1.1")
addSbtPlugin("com.geirsson" % "sbt-ci-release" % "1.5.7")
addSbtPlugin("com.lightbend.sbt" % "sbt-publish-rsync" % "0.2")

View file

@ -1,290 +0,0 @@
#!/usr/bin/env bash
#
# Release script for Akka.
#
# ATTENTION: This script involves calling `git clean -fxd` which will remove all untracked
# files from your working directory (including IDE settings).
#
# Prerequisites and Installation Instructions
#
# 1) You must be able to sign the artifacts with PGP
#
# 1.1) If you don't have PGP and a PGP key
# On OS X from othe command line:
# shell> brew install gnupg
# shell> gpg --gen-key
#
# On OS X the following should be added to ~/.bash_profile
# GPG_TTY=$(tty)
# export GPG_TTY
#
# Default values for the key type and 2048 bits is OK.
# Make sure to use the email address that you will use later to register
# with Sonatype.
#
# 1.2) Check that signing works
# From inside sbt do the following
# sbt> publishLocalSigned
# It should should ask you for your pass phrase, and create .asc files for
# all artifacts
#
# 1.3) Publish your key to a server that Sonatype use
# From the command line:
# shell> gpg --keyserver hkp://pool.sks-keyservers.net/ --send-keys <your key id>
# To find out your key id do this from the command line:
# shell> gpg --list-keys
# pub 2048/<your key id> ...
# You can verify the existence of your key here, if you don't trust your tool:
# https://sks-keyservers.net/i/#extract
#
# 2) You must have publishing rights to oss.sonatype.org
#
# 2.1) Register with oss.sonatype.org by only following the instructions under
# sign up here https://docs.sonatype.org/display/Repository/Sonatype+OSS+Maven+Repository+Usage+Guide
# Use the same email address as you used for the pgp key.
#
# 2.2) Ask Jonas who is the original creator of this ticket https://issues.sonatype.org/browse/OSSRH-3097
# to add a comment that says that your username (not your full name) should
# have publish access to that project. There is manual administration of
# the ticket at Sonatype, so it could take a little while.
#
# 2.3) Add your credentials to sbt by adding a global.sbt file in your sbt home
# directory containing the following.
# credentials += Credentials("Sonatype Nexus Repository Manager",
# "oss.sonatype.org",
# "<your username>",
# "<your password>")
#
# 3) You must have access to gustav.akka.io
# Please note that gustav.akka.io is the same as repo.akka.io,
# but the latter domain is pointed at cloudflare so one could not ssh into it.
#
# 3.1) Ask someone in the team for login information for the akkarepo user.
#
# 3.2) Install your public ssh key to avoid typing in your password.
# From the command line:
# shell> cat ~/.ssh/id_rsa.pub | ssh akkarepo@gustav.akka.io "cat >> ~/.ssh/authorized_keys"
#
# 3.3) Also make it available for publishing snapshots.
# From the command line:
# shell> cp ~/.ssh/id_rsa ~/.ssh/id_rsa_gustav.pem
# shell> ssh-keygen -p -f ~/.ssh/id_rsa_gustav.pem -m pem
#
# 4) Have access to github.com/akka/akka. This should be a given.
#
# Now you should be all set to run the script
#
# Run the script:
#
# shell> project/scripts/release
#
# The artifacts published to oss.sonatype.org need to be released by following the
# instructions under release here
# https://docs.sonatype.org/display/Repository/Sonatype+OSS+Maven+Repository+Usage+Guide
# defaults
declare -r default_server="akkarepo@gustav.akka.io"
declare -r default_path="www"
# settings
declare release_server=${default_server}
declare release_path=${default_path}
# get the source location for this script; handles symlinks
function get_script_path {
local source="${BASH_SOURCE[0]}"
while [ -h "${source}" ] ; do
source="$(readlink "${source}")";
done
echo ${source}
}
# path, name, and dir for this script
declare -r script_path=$(get_script_path)
declare -r script_name=$(basename "${script_path}")
declare -r script_dir="$(cd -P "$(dirname "${script_path}")" && pwd)"
# print usage info
function usage {
cat <<EOM
Dry run is be default.
Usage: ${script_name} [options]
-h | --help Print this usage message
-s | --server SERVER Set the release server (default ${default_server})
-p | --path PATH Set the path on the release server (default ${default_path})
EOM
}
# echo a log message
function echolog {
echo "[${script_name}] $@"
}
# echo an error message
function echoerr {
echo "[${script_name}] $@" 1>&2
}
# fail the script with an error message
function fail {
echoerr "$@"
exit 1
}
# process options and set flags
while true; do
case "$1" in
-h | --help ) usage; exit 1 ;;
-s | --server ) release_server=$2; shift 2 ;;
-p | --path ) release_path=$2; shift 2 ;;
* ) break ;;
esac
done
declare -r publish_path="${release_server}:${release_path}"
JAVA_VERSION=`java -version 2>&1 | grep -E "java version|openjdk version" | cut -d '"' -f2 | cut -d '.' -f1`
[[ $JAVA_VERSION -ge 11 ]] || fail "Java version is not at least 11"
# check for a git command
type -P git &> /dev/null || fail "git command not found"
# check for an sbt command
type -P sbt &> /dev/null || fail "sbt command not found"
# check for an rsync command
type -P rsync &> /dev/null || fail "rsync command not found"
# check for a tar command
type -P tar &> /dev/null || fail "tar command not found"
# get the current project version from sbt
# a little messy as the ansi escape codes are included
function get_current_version {
local result=$(sbt version | grep -v warn | grep -v "coordinated shutdown" | tail -1 | cut -f2)
# remove ansi escape code from end
local code0=$(echo -e "\033[0m")
echo ${result%$code0}
}
# check that we have a clean status
[[ -z "$(git status --porcelain)" ]] || {
git status
fail "There are uncommitted changes - please commit before releasing"
}
(read -p "The working directory will now be cleaned from all non-tracked files. Are you sure you want this? " x; test "$x" = yes) || fail "bailing out"
git clean -fxd || fail "cannot git clean -fxd"
# try to run a cleanup command - these shouldn't actually fail
function safely {
"$@" || fail "Failed to clean up release - please check current state"
}
# perform a clean up when a failure has occurred
function git_cleanup {
echoerr "Cleaning up..."
safely git reset --hard
safely git clean -f
}
# clean up and fail the script with an error message
function bail_out {
echoerr "Bailing out!"
git_cleanup
echoerr "Cleaned up failed release"
fail "$@"
}
# bail out for signals
function signal_bail_out {
echoerr "Interrupted by signal"
bail_out "Received signal to stop release"
}
# bail out on signals
trap signal_bail_out SIGHUP SIGINT SIGTERM
# try to run a command or otherwise bail out
function try {
"$@" || bail_out "Failed to create release"
}
declare -r version=$(get_current_version)
if [ ${#version} -gt 6 ]; then
fail "Version [$version] is too long to be a regular release, have you created a tag?"
fi
echolog "Creating release ${version}..."
echolog "Publishing to ${publish_path}"
# try ssh'ing to the release server
echolog "Checking ssh connection to ${release_server}"
try ssh -t ${release_server} echo "Successfully contacted release server."
# start clean
try sbt clean
echolog "Running migration manager report..."
try sbt $RELEASE_OPT +mimaReportBinaryIssues
echolog "Finished migration manager report"
# build the release
echolog "Building the release..."
RELEASE_OPT="-Dakka.genjavadoc.enabled=true -Dpublish.maven.central=true"
# Release artifacts
try sbt $RELEASE_OPT +publishSigned
echolog "Successfully released artifacts"
try sbt $RELEASE_OPT whitesourceCheckPolicies
# use a special failure from now on
function arrgh {
cat 1>&2 <<EOM
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
Release failed while pushing to servers!
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
EOM
fail "Could not complete release - please check current state"
}
# try running a pushing command or otherwise fail
function important {
"$@" || arrgh
}
# new interrupted bail out for signals
function arrgh_interrupt {
echoerr "Interrupted by signal"
arrgh
}
# new exit on signals
trap arrgh_interrupt SIGHUP SIGINT SIGTERM
# push the commits and tags to git origin
echolog "Pushing to git origin..."
important git push origin --tags
echolog "Building docs and pushing to the server..."
for section in docs api japi; do
important ssh ${release_server} "cd ${release_path}/${section}/akka; git add .; git commit -m 'before publishing version $version $section'; true"
done
# using Scala 2.13 here to avoid the infamous problem with missing AskSupport in classpath
important sbt -Dakka.build.scalaVersion=2.13.0 $RELEASE_OPT publishRsync
for section in docs api japi; do
important ssh ${release_server} "cd ${release_path}/${section}/akka; git add .; git commit -m 'publish version $version $section'"
done
echolog "*****"
echolog "Do not forget to update https://github.com/akka/akka.io/blob/master/versions.json !"
echolog "*****"
echolog "Successfully created release ${version}"

View file

@ -0,0 +1,10 @@
#!/bin/bash
VERSION=$1
if [ -z $VERSION ]
then
echo specify the version name to be released, eg. 1.0.0
else
sed -e 's/\$VERSION\$/'$VERSION'/g' scripts/release-train-issue-template$2.md > /tmp/release-$VERSION.md
echo Created $(hub issue create -F /tmp/release-$VERSION.md --browse)
fi

View file

@ -0,0 +1,69 @@
Release Akka $VERSION$
### Before the release
- [ ] Make sure all important / big PRs have been merged by now
- [ ] Create a news item draft PR on [akka.io](https://github.com/akka/akka.io), using the milestone and `scripts/authors.scala v2.6.14 v2.6.15`
- [ ] Make sure to update `_config.yml` in it
- In case of a new minor release:
- [ ] update the branch descriptions at CONTRIBUTING.md#branches-summary
### Cutting the release
- [ ] Make sure any running [actions](https://github.com/akka/akka/actions) for the commit you would like to release have completed.
- [ ] Tag the release `git tag -a -s -m 'Release v$VERSION$' v$VERSION$` and push the tag `git push --tags`
- [ ] Create a [new milestone](https://github.com/akka/akka/milestones) for the next version and close the current one.
- [ ] Check that the GitHub Actions release build has executed successfully (it should publish artifacts to Sonatype and documentation to Gustav)
- [ ] Update `MiMa.latestPatchOf` and PR that change (`project/MiMa.scala`)
### Check availability
- [ ] Check [reference](https://doc.akka.io/docs/akka/$VERSION$/) documentation
- [ ] Check the release on [Maven central](https://repo1.maven.org/maven2/com/typesafe/akka/akka-actor_2.13/$VERSION$/)
### When everything is on maven central
- [ ] `ssh akkarepo@gustav.akka.io`
- [ ] update the `current` links on `repo.akka.io` to point to the latest version with
```
ln -nsf $VERSION$ www/docs/akka/current
ln -nsf $VERSION$ www/api/akka/current
ln -nsf $VERSION$ www/japi/akka/current
```
- [ ] check changes and commit the new version to the local git repository
```
cd ~/www
git add docs/akka/current docs/akka/$VERSION$
git add api/akka/current api/akka/$VERSION$
git add japi/akka/current japi/akka/$VERSION$
git commit -m "Akka $VERSION$"
```
- [ ] push changes to the [remote git repository](https://github.com/akka/doc.akka.io)
```
cd ~/www
git push origin master
```
### Announcements
- [ ] Merge draft news item for [akka.io](https://github.com/akka/akka.github.com)
- [ ] Create a [GitHub release](https://github.com/akka/akka/releases) with the next tag version `v$VERSION$`, title and a link to the announcement
- [ ] Post about it on the [forum](https://discuss.akka.io)
- [ ] Tweet using the [@akkateam](https://twitter.com/akkateam) account (or ask someone to) about the new release
- [ ] Announce on [Gitter akka/akka](https://gitter.im/akka/akka)
- [ ] Announce internally
## Update references
Update the versions used in:
* [ ] https://github.com/akka/akka-samples
* [ ] https://github.com/lightbend/lightbend-platform-docs/blob/master/docs/modules/getting-help/examples/build.sbt (this populates https://developer.lightbend.com/docs/lightbend-platform/introduction/getting-help/build-dependencies.html#_akka)
These are autoupdated by latest stable on maven central:
* https://github.com/akka/akka-quickstart-java.g8
* https://github.com/akka/akka-quickstart-scala.g8
* https://github.com/akka/akka-http-quickstart-java.g8
* https://github.com/akka/akka-http-quickstart-scala.g8
* https://github.com/akka/akka-grpc-quickstart-java.g8
* https://github.com/akka/akka-grpc-quickstart-scala.g8