Merge branch 'master' into 23111-AsyncCallbacks_lost_finished_stage-agolubev

This commit is contained in:
Patrik Nordwall 2017-11-09 14:48:47 +01:00 committed by GitHub
commit 26f0f2c898
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
64 changed files with 1413 additions and 384 deletions

13
.github/ISSUE_TEMPLATE.md vendored Normal file
View file

@ -0,0 +1,13 @@
<!--
Please report issues regarding specific projects in their respective issue trackers, e.g.:
- Akka-HTTP: https://github.com/akka/akka-http/issues
- Alpakka: https://github.com/akka/alpakka/issues
- Akka Persistence Cassandra Plugin: https://github.com/akka/akka-persistence-cassandra/issues
- ...
Please explain your issue precisely, and if possible provide a reproducer snippet (this helps resolve issues much quicker).
For general questions and discussion please use https://groups.google.com/group/akka-user/ or https://gitter.im/akka/akka instead.
Thanks, happy hakking!
-->

View file

@ -14,10 +14,9 @@ import akka.util.Timeout
import scala.concurrent.duration._
import com.typesafe.config._
import akka.pattern.ask
import org.apache.commons.codec.binary.Hex.encodeHex
import org.apache.commons.codec.binary.Hex.decodeHex
import java.nio.ByteOrder
import java.nio.ByteBuffer
import akka.actor.NoSerializationVerificationNeeded
import test.akka.serialization.NoVerification
object SerializationTests {
@ -351,7 +350,7 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR
"Cross-version serialization compatibility" must {
def verify(obj: SystemMessage, asExpected: String): Unit = {
val bytes = javax.xml.bind.DatatypeConverter.parseHexBinary(asExpected)
val bytes = decodeHex(asExpected.toCharArray)
val stream = new ObjectInputStream(new ByteArrayInputStream(bytes))
val read = stream.readObject()
read should ===(obj)

View file

@ -5,6 +5,3 @@ akka {
}
}
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.coordinated-shutdown.terminate-actor-system = off
akka.cluster.run-coordinated-shutdown-when-down = off

View file

@ -158,8 +158,9 @@ object ShardCoordinator {
val mostShards = currentShardAllocations.collect {
case (_, v) v.filterNot(s rebalanceInProgress(s))
}.maxBy(_.size)
if (mostShards.size - leastShards.size >= rebalanceThreshold)
Future.successful(mostShards.take(maxSimultaneousRebalance - rebalanceInProgress.size).toSet)
val difference = mostShards.size - leastShards.size
if (difference >= rebalanceThreshold)
Future.successful(mostShards.take(math.min(difference, maxSimultaneousRebalance - rebalanceInProgress.size)).toSet)
else
emptyRebalanceResult
} else emptyRebalanceResult

View file

@ -5,6 +5,3 @@ akka {
}
}
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.coordinated-shutdown.terminate-actor-system = off
akka.cluster.run-coordinated-shutdown-when-down = off

View file

@ -3,8 +3,6 @@
*/
package akka.cluster.sharding
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.Props
import akka.testkit.AkkaSpec
@ -20,7 +18,7 @@ class LeastShardAllocationStrategySpec extends AkkaSpec {
"LeastShardAllocationStrategy" must {
"allocate to region with least number of shards" in {
val allocations = Map(regionA Vector("shard1"), regionB Vector("shard2"), regionC Vector.empty)
Await.result(allocationStrategy.allocateShard(regionA, "shard3", allocations), 3.seconds) should ===(regionC)
allocationStrategy.allocateShard(regionA, "shard3", allocations).futureValue should ===(regionC)
}
"rebalance from region with most number of shards" in {
@ -28,14 +26,14 @@ class LeastShardAllocationStrategySpec extends AkkaSpec {
regionC Vector.empty)
// so far regionB has 2 shards and regionC has 0 shards, but the diff is less than rebalanceThreshold
Await.result(allocationStrategy.rebalance(allocations, Set.empty), 3.seconds) should ===(Set.empty[String])
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set.empty[String])
val allocations2 = allocations.updated(regionB, Vector("shard2", "shard3", "shard4"))
Await.result(allocationStrategy.rebalance(allocations2, Set.empty), 3.seconds) should ===(Set("shard2", "shard3"))
Await.result(allocationStrategy.rebalance(allocations2, Set("shard4")), 3.seconds) should ===(Set.empty[String])
allocationStrategy.rebalance(allocations2, Set.empty).futureValue should ===(Set("shard2", "shard3"))
allocationStrategy.rebalance(allocations2, Set("shard4")).futureValue should ===(Set.empty[String])
val allocations3 = allocations2.updated(regionA, Vector("shard1", "shard5", "shard6"))
Await.result(allocationStrategy.rebalance(allocations3, Set("shard1")), 3.seconds) should ===(Set("shard2"))
allocationStrategy.rebalance(allocations3, Set("shard1")).futureValue should ===(Set("shard2"))
}
"rebalance multiple shards if max simultaneous rebalances is not exceeded" in {
@ -44,16 +42,32 @@ class LeastShardAllocationStrategySpec extends AkkaSpec {
regionB Vector("shard2", "shard3", "shard4", "shard5", "shard6"),
regionC Vector.empty)
Await.result(allocationStrategy.rebalance(allocations, Set.empty), 3.seconds) should ===(Set("shard2", "shard3"))
Await.result(allocationStrategy.rebalance(allocations, Set("shard2", "shard3")), 3.seconds) should ===(Set.empty[String])
allocationStrategy.rebalance(allocations, Set.empty).futureValue should ===(Set("shard2", "shard3"))
allocationStrategy.rebalance(allocations, Set("shard2", "shard3")).futureValue should ===(Set.empty[String])
}
"limit number of simultaneous rebalance" in {
val allocations = Map(
regionA Vector("shard1"),
regionB Vector("shard2", "shard3", "shard4", "shard5", "shard6"), regionC Vector.empty)
regionB Vector("shard2", "shard3", "shard4", "shard5", "shard6"),
regionC Vector.empty)
Await.result(allocationStrategy.rebalance(allocations, Set("shard2")), 3.seconds) should ===(Set("shard3"))
Await.result(allocationStrategy.rebalance(allocations, Set("shard2", "shard3")), 3.seconds) should ===(Set.empty[String])
allocationStrategy.rebalance(allocations, Set("shard2")).futureValue should ===(Set("shard3"))
allocationStrategy.rebalance(allocations, Set("shard2", "shard3")).futureValue should ===(Set.empty[String])
}
"don't rebalance excessive shards if maxSimultaneousRebalance > rebalanceThreshold" in {
val allocationStrategy =
new LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 5)
val allocations = Map(
regionA Vector("shard1", "shard2", "shard3", "shard4", "shard5", "shard6", "shard7", "shard8"),
regionB Vector("shard9", "shard10", "shard11", "shard12"))
allocationStrategy.rebalance(allocations, Set("shard2")).futureValue should
===(Set("shard1", "shard3", "shard4"))
allocationStrategy.rebalance(allocations, Set("shard5", "shard6", "shard7", "shard8")).futureValue should
===(Set.empty[String])
}
}
}

View file

@ -5,6 +5,3 @@ akka {
}
}
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.coordinated-shutdown.terminate-actor-system = off
akka.cluster.run-coordinated-shutdown-when-down = off

View file

@ -235,6 +235,8 @@ akka {
# The n oldest nodes in a data center will choose to gossip to another data center with
# this probability. Must be a value between 0.0 and 1.0 where 0.0 means never, 1.0 means always.
# When a data center is first started (nodes < 5) a higher probability is used so other data
# centers find out about the new nodes more quickly
cross-data-center-gossip-probability = 0.2
failure-detector {

View file

@ -792,10 +792,10 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def receiveGossipStatus(status: GossipStatus): Unit = {
val from = status.from
if (!latestGossip.isReachable(selfUniqueAddress, from))
if (!latestGossip.hasMember(from))
logInfo("Ignoring received gossip status from unknown [{}]", from)
else if (!latestGossip.isReachable(selfUniqueAddress, from))
logInfo("Ignoring received gossip status from unreachable [{}] ", from)
else if (latestGossip.members.forall(_.uniqueAddress != from))
log.debug("Cluster Node [{}] - Ignoring received gossip status from unknown [{}]", selfAddress, from)
else {
(status.version compareTo latestGossip.version) match {
case VectorClock.Same // same version
@ -830,12 +830,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
} else if (envelope.to != selfUniqueAddress) {
logInfo("Ignoring received gossip intended for someone else, from [{}] to [{}]", from.address, envelope.to)
Ignored
} else if (!localGossip.hasMember(from)) {
logInfo("Ignoring received gossip from unknown [{}]", from)
Ignored
} else if (!localGossip.isReachable(selfUniqueAddress, from)) {
logInfo("Ignoring received gossip from unreachable [{}] ", from)
Ignored
} else if (localGossip.members.forall(_.uniqueAddress != from)) {
log.debug("Cluster Node [{}] - Ignoring received gossip from unknown [{}]", selfAddress, from)
Ignored
} else if (remoteGossip.members.forall(_.uniqueAddress != selfUniqueAddress)) {
logInfo("Ignoring received gossip that does not contain myself, from [{}]", from)
Ignored
@ -945,8 +945,12 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
def gossipSpeedupTick(): Unit =
if (isGossipSpeedupNeeded) gossip()
def isGossipSpeedupNeeded: Boolean =
(latestGossip.overview.seen.size < latestGossip.members.size / 2)
def isGossipSpeedupNeeded: Boolean = {
if (latestGossip.isMultiDc)
latestGossip.overview.seen.count(membershipState.isInSameDc) < latestGossip.members.count(_.dataCenter == cluster.selfDataCenter) / 2
else
(latestGossip.overview.seen.size < latestGossip.members.size / 2)
}
/**
* Sends full gossip to `n` other random members.
@ -970,6 +974,9 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef) extends Actor with
else
gossipTo(peer)
case None // nothing to see here
if (cluster.settings.Debug.VerboseGossipLogging)
log.debug("Cluster Node [{}] dc [{}] will not gossip this round", selfAddress, cluster.settings.SelfDataCenter)
}
}

View file

@ -3,7 +3,6 @@
*/
package akka.cluster
import java.util.{ ArrayList, Collections }
import java.util.concurrent.ThreadLocalRandom
import scala.collection.immutable
@ -102,16 +101,24 @@ import scala.util.Random
/**
* @return Up to `crossDcConnections` oldest members for each DC
*/
lazy val ageSortedTopOldestMembersPerDc: Map[DataCenter, SortedSet[Member]] =
// TODO make this recursive and bail early when size reached to make it fast for large clusters
lazy val ageSortedTopOldestMembersPerDc: Map[DataCenter, SortedSet[Member]] = {
latestGossip.members.foldLeft(Map.empty[DataCenter, SortedSet[Member]]) { (acc, member)
acc.get(member.dataCenter) match {
case Some(set)
if (set.size < crossDcConnections) acc + (member.dataCenter (set + member))
else acc
case None acc + (member.dataCenter (SortedSet.empty(Member.ageOrdering) + member))
if (set.size < crossDcConnections) {
acc + (member.dataCenter (set + member))
} else {
if (set.exists(member.isOlderThan)) {
acc + (member.dataCenter -> (set + member).take(crossDcConnections))
} else {
acc
}
}
case None
acc + (member.dataCenter (SortedSet.empty(Member.ageOrdering) + member))
}
}
}
/**
* @return true if toAddress should be reachable from the fromDc in general, within a data center
@ -255,12 +262,12 @@ import scala.util.Random
}
/**
* Choose cross-dc nodes if this one of the N oldest nodes, and if not fall back to gosip locally in the dc
* Choose cross-dc nodes if this one of the N oldest nodes, and if not fall back to gossip locally in the dc
*/
protected def multiDcGossipTargets(state: MembershipState): Vector[UniqueAddress] = {
val latestGossip = state.latestGossip
// only a fraction of the time across data centers
if (selectDcLocalNodes()) localDcGossipTargets(state)
if (selectDcLocalNodes(state))
localDcGossipTargets(state)
else {
val nodesPerDc = state.ageSortedTopOldestMembersPerDc
@ -321,7 +328,22 @@ import scala.util.Random
}
}
protected def selectDcLocalNodes(): Boolean = ThreadLocalRandom.current.nextDouble() > crossDcGossipProbability
/**
* For small DCs prefer cross DC gossip. This speeds up the bootstrapping of
* new DCs as adding an initial node means it has no local peers.
* Once the DC is at 5 members use the configured crossDcGossipProbability, before
* that for a single node cluster use 1.0, two nodes use 0.75 etc
*/
protected def selectDcLocalNodes(state: MembershipState): Boolean = {
val localMembers = state.dcMembers.size
val probability = if (localMembers > 4)
crossDcGossipProbability
else {
// don't go below the configured probability
math.max((5 - localMembers) * 0.25, crossDcGossipProbability)
}
ThreadLocalRandom.current.nextDouble() > probability
}
protected def preferNodesWithDifferentView(state: MembershipState): Boolean =
ThreadLocalRandom.current.nextDouble() < adjustedGossipDifferentViewProbability(state.latestGossip.members.size)

View file

@ -0,0 +1,108 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
import akka.remote.testkit._
import akka.testkit.ImplicitSender
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object InitialMembersOfNewDcSpec extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString(
s"""
akka.actor.provider = cluster
akka.actor.warn-about-java-serializer-usage = off
akka.coordinated-shutdown.terminate-actor-system = off
akka.cluster {
jmx.enabled = off
debug.verbose-gossip-logging = on
}
akka.cluster.multi-data-center {
#cross-data-center-gossip-probability = 0.5
}
akka.loglevel = INFO
akka.log-dead-letters = off
akka.log-dead-letters-during-shutdown = off
akka.loggers = ["akka.testkit.TestEventListener"]
"""))
val one = role("one")
val two = role("two")
val three = role("three")
val four = role("four")
val five = role("five")
nodeConfig(one, two, three) {
ConfigFactory.parseString("akka.cluster.multi-data-center.self-data-center = DC1")
}
nodeConfig(four, five) {
ConfigFactory.parseString("akka.cluster.multi-data-center.self-data-center = DC2")
}
}
class InitialMembersOfNewDcSpecMultiJvmNode1 extends InitialMembersOfNewDcSpec
class InitialMembersOfNewDcSpecMultiJvmNode2 extends InitialMembersOfNewDcSpec
class InitialMembersOfNewDcSpecMultiJvmNode3 extends InitialMembersOfNewDcSpec
class InitialMembersOfNewDcSpecMultiJvmNode4 extends InitialMembersOfNewDcSpec
class InitialMembersOfNewDcSpecMultiJvmNode5 extends InitialMembersOfNewDcSpec
abstract class InitialMembersOfNewDcSpec extends MultiNodeSpec(InitialMembersOfNewDcSpec) with STMultiNodeSpec with ImplicitSender {
import InitialMembersOfNewDcSpec._
def initialParticipants = roles.size
val cluster = Cluster(system)
"Joining a new DC" must {
"join node one" in {
runOn(one) {
cluster.join(node(one).address)
}
enterBarrier("node one up")
}
"see all dc1 nodes join" in {
runOn(two, three) {
cluster.join(node(one).address)
}
}
"see all dc1 nodes see each other as up" in {
runOn(two, three) {
within(20.seconds) {
awaitAssert({
cluster.state.members.filter(_.status == MemberStatus.Up) should have size 3
})
}
}
enterBarrier("dc1 fully up")
}
"join first member of new dc" in {
enterBarrier("Node 4 about to join")
val startTime = System.nanoTime()
runOn(four) {
log.info("Joining cluster")
cluster.join(node(one).address)
}
// Check how long it takes for all other nodes to see every node as up
runOn(one, two, three, four) {
within(20.seconds) {
awaitAssert({
cluster.state.members.filter(_.status == MemberStatus.Up) should have size 4
})
}
val totalTime = System.nanoTime() - startTime
log.info("Can see new node (and all others as up): {}ms", totalTime.nanos.toMillis)
}
enterBarrier("node 4 joined dc and all nodes know it is up")
}
}
}

View file

@ -19,7 +19,8 @@ class MultiDcSpecConfig(crossDcConnections: Int = 5) extends MultiNodeConfig {
commonConfig(ConfigFactory.parseString(
s"""
akka.loglevel = INFO
# DEBUG On for issue #23864
akka.loglevel = DEBUG
akka.cluster.multi-data-center.cross-data-center-connections = $crossDcConnections
""").withFallback(MultiNodeClusterSpec.clusterConfig))
@ -71,7 +72,7 @@ abstract class MultiDcSpec(config: MultiDcSpecConfig)
runOn(first, second, third, fourth) {
within(20.seconds) {
awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size (4))
awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size 4)
}
}
@ -85,6 +86,7 @@ abstract class MultiDcSpec(config: MultiDcSpecConfig)
val dc1 = Set(address(first), address(second))
dc1 should contain(clusterView.leader.get)
}
runOn(third, fourth) {
cluster.settings.SelfDataCenter should ===("dc2")
clusterView.leader shouldBe defined
@ -109,7 +111,7 @@ abstract class MultiDcSpec(config: MultiDcSpecConfig)
// should be able to join and become up since the
// unreachable is between dc1 and dc2,
within(10.seconds) {
awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size (5))
awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size 5)
}
}
@ -120,7 +122,7 @@ abstract class MultiDcSpec(config: MultiDcSpecConfig)
// should be able to join and become up since the
// unreachable is between dc1 and dc2,
within(10.seconds) {
awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size (5))
awaitAssert(clusterView.members.filter(_.status == MemberStatus.Up) should have size 5)
}
enterBarrier("inter-data-center unreachability end")
@ -133,12 +135,13 @@ abstract class MultiDcSpec(config: MultiDcSpecConfig)
enterBarrier("other-data-center-internal-unreachable")
runOn(third) {
// FIXME This is already part of the cluster, is this intended? Joined on line 107
cluster.join(fifth)
// should be able to join and leave
// since the unreachable nodes are inside of dc1
cluster.leave(fourth)
awaitAssert(clusterView.members.map(_.address) should not contain (address(fourth)))
awaitAssert(clusterView.members.map(_.address) should not contain address(fourth))
awaitAssert(clusterView.members.collect { case m if m.status == Up m.address } should contain(address(fifth)))
}
@ -147,7 +150,7 @@ abstract class MultiDcSpec(config: MultiDcSpecConfig)
runOn(first) {
testConductor.passThrough(first, second, Direction.Both).await
}
enterBarrier("other-datac-enter-internal-unreachable end")
enterBarrier("other-data-center-internal-unreachable end")
}
"be able to down a member of another data-center" in within(20.seconds) {
@ -156,10 +159,9 @@ abstract class MultiDcSpec(config: MultiDcSpecConfig)
}
runOn(first, third, fifth) {
awaitAssert(clusterView.members.map(_.address) should not contain (address(second)))
awaitAssert(clusterView.members.map(_.address) should not contain address(second))
}
enterBarrier("cross-data-center-downed")
}
}
}

View file

@ -46,6 +46,8 @@ object MultiNodeClusterSpec {
periodic-tasks-initial-delay = 300 ms
publish-stats-interval = 0 s # always, when it happens
failure-detector.heartbeat-interval = 500 ms
run-coordinated-shutdown-when-down = off
}
akka.loglevel = INFO
akka.log-dead-letters = off

View file

@ -3,12 +3,9 @@
*/
package akka.cluster
import scala.collection.immutable.SortedSet
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._
import scala.concurrent.duration._
import akka.actor.Props
import akka.actor.Actor
import akka.cluster.MemberStatus._
@ -62,7 +59,6 @@ abstract class NodeLeavingAndExitingSpec
// Verify that 'second' node is set to EXITING
exitingLatch.await
}
// node that is leaving

View file

@ -3,7 +3,6 @@
*/
package akka.cluster
import com.typesafe.config.ConfigFactory
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.testkit._

View file

@ -5,7 +5,4 @@ akka {
}
}
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.coordinated-shutdown.terminate-actor-system = off
akka.cluster.run-coordinated-shutdown-when-down = off

View file

@ -90,12 +90,12 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
}
"be produced for reachability observations between data centers" in {
val dc2AMemberUp = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Up, Set.empty, "dc2")
val dc2AMemberDown = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Down, Set.empty, "dc2")
val dc2BMemberUp = TestMember(Address("akka.tcp", "sys", "dc2B", 2552), Up, Set.empty, "dc2")
val dc2AMemberUp = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Up, Set.empty[String], "dc2")
val dc2AMemberDown = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Down, Set.empty[String], "dc2")
val dc2BMemberUp = TestMember(Address("akka.tcp", "sys", "dc2B", 2552), Up, Set.empty[String], "dc2")
val dc3AMemberUp = TestMember(Address("akka.tcp", "sys", "dc3A", 2552), Up, Set.empty, "dc3")
val dc3BMemberUp = TestMember(Address("akka.tcp", "sys", "dc3B", 2552), Up, Set.empty, "dc3")
val dc3AMemberUp = TestMember(Address("akka.tcp", "sys", "dc3A", 2552), Up, Set.empty[String], "dc3")
val dc3BMemberUp = TestMember(Address("akka.tcp", "sys", "dc3B", 2552), Up, Set.empty[String], "dc3")
val reachability1 = Reachability.empty
val g1 = Gossip(members = SortedSet(aUp, bUp, dc2AMemberUp, dc2BMemberUp, dc3AMemberUp, dc3BMemberUp), overview = GossipOverview(reachability = reachability1))
@ -121,8 +121,8 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
}
"not be produced for same reachability observations between data centers" in {
val dc2AMemberUp = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Up, Set.empty, "dc2")
val dc2AMemberDown = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Down, Set.empty, "dc2")
val dc2AMemberUp = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Up, Set.empty[String], "dc2")
val dc2AMemberDown = TestMember(Address("akka.tcp", "sys", "dc2A", 2552), Down, Set.empty[String], "dc2")
val reachability1 = Reachability.empty
val g1 = Gossip(members = SortedSet(aUp, dc2AMemberUp), overview = GossipOverview(reachability = reachability1))
@ -246,8 +246,8 @@ class ClusterDomainEventSpec extends WordSpec with Matchers {
val g2 = Gossip(members = SortedSet(bUp, cUp, dExiting, eJoining))
val s2 = state(g2).copy(selfDc = "dc2")
diffRolesLeader(s0, s1) should ===(Set.empty)
diffRolesLeader(s1, s2) should ===(Set.empty)
diffRolesLeader(s0, s1) should ===(Set.empty[String])
diffRolesLeader(s1, s2) should ===(Set.empty[String])
}
}
}

View file

@ -22,6 +22,8 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
val gDc3 = TestMember(Address("akka.tcp", "sys", "g", 2552), Up, Set.empty, dataCenter = "dc3")
val hDc3 = TestMember(Address("akka.tcp", "sys", "h", 2552), Up, Set.empty, dataCenter = "dc3")
val iDc4 = TestMember(Address("akka.tcp", "sys", "i", 2552), Up, Set.empty, dataCenter = "dc4")
val defaultSelector = new GossipTargetSelector(
reduceGossipDifferentViewProbability = 400,
crossDcGossipProbability = 0.2
@ -29,9 +31,18 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"The gossip target selection" should {
"select remote nodes in a multi dc setting for a single node cluster regardless of probability" in {
val realSelector = new GossipTargetSelector(400, 0.0)
val state = MembershipState(Gossip(SortedSet(iDc4, eDc2, fDc2)), iDc4, iDc4.dataCenter, crossDcConnections = 5)
val gossipTo = realSelector.gossipTargets(state)
gossipTo should ===(Vector[UniqueAddress](eDc2, fDc2))
}
"select local nodes in a multi dc setting when chance says so" in {
val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = true
override protected def selectDcLocalNodes(s: MembershipState): Boolean = true
}
val state = MembershipState(Gossip(SortedSet(aDc1, bDc1, eDc2, fDc2)), aDc1, aDc1.dataCenter, crossDcConnections = 5)
@ -43,7 +54,7 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"select cross dc nodes when chance says so" in {
val alwaysCrossDcSelector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = false
override protected def selectDcLocalNodes(s: MembershipState): Boolean = false
}
val state = MembershipState(Gossip(SortedSet(aDc1, bDc1, eDc2, fDc2)), aDc1, aDc1.dataCenter, crossDcConnections = 5)
@ -55,7 +66,7 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"select local nodes that hasn't seen the gossip when chance says so" in {
val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) {
override protected def preferNodesWithDifferentView(state: MembershipState): Boolean = true
override protected def preferNodesWithDifferentView(s: MembershipState): Boolean = true
}
val state = MembershipState(
@ -72,7 +83,7 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"select among all local nodes regardless if they saw the gossip already when chance says so" in {
val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) {
override protected def preferNodesWithDifferentView(state: MembershipState): Boolean = false
override protected def preferNodesWithDifferentView(s: MembershipState): Boolean = false
}
val state = MembershipState(
@ -89,7 +100,7 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"not choose unreachable nodes" in {
val alwaysLocalSelector = new GossipTargetSelector(400, 0.2) {
override protected def preferNodesWithDifferentView(state: MembershipState): Boolean = false
override protected def preferNodesWithDifferentView(s: MembershipState): Boolean = false
}
val state = MembershipState(
@ -108,7 +119,7 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"continue with the next dc when doing cross dc and no node where suitable" in {
val selector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = false
override protected def selectDcLocalNodes(s: MembershipState): Boolean = false
override protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = dcs.sorted // sort on name
}
@ -128,7 +139,7 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"not care about seen/unseen for cross dc" in {
val selector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = false
override protected def selectDcLocalNodes(s: MembershipState): Boolean = false
override protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = dcs.sorted // sort on name
}
@ -145,7 +156,7 @@ class GossipTargetSelectorSpec extends WordSpec with Matchers {
"limit the numbers of chosen cross dc nodes to the crossDcConnections setting" in {
val selector = new GossipTargetSelector(400, 0.2) {
override protected def selectDcLocalNodes: Boolean = false
override protected def selectDcLocalNodes(s: MembershipState): Boolean = false
override protected def dcsInRandomOrder(dcs: List[DataCenter]): List[DataCenter] = dcs.sorted // sort on name
}

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2017 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.cluster
import akka.actor.Address
import akka.cluster.MemberStatus.Up
import org.scalatest.{ Matchers, WordSpec }
import scala.collection.immutable.SortedSet
class MembershipStateSpec extends WordSpec with Matchers {
// DC-a is in reverse age order
val a1 = TestMember(Address("akka.tcp", "sys", "a4", 2552), Up, 1, "dc-a")
val a2 = TestMember(Address("akka.tcp", "sys", "a3", 2552), Up, 2, "dc-a")
val a3 = TestMember(Address("akka.tcp", "sys", "a2", 2552), Up, 3, "dc-a")
val a4 = TestMember(Address("akka.tcp", "sys", "a1", 2552), Up, 4, "dc-a")
// DC-b it is the first and the last that are the oldest
val b1 = TestMember(Address("akka.tcp", "sys", "b3", 2552), Up, 1, "dc-b")
val b3 = TestMember(Address("akka.tcp", "sys", "b2", 2552), Up, 3, "dc-b")
// Won't be replaced by b3
val b2 = TestMember(Address("akka.tcp", "sys", "b1", 2552), Up, 2, "dc-b")
// for the case that we don't replace it ever
val bOldest = TestMember(Address("akka.tcp", "sys", "b0", 2552), Up, 0, "dc-b")
"Membership state" must {
"sort by upNumber for oldest top members" in {
val gossip = Gossip(SortedSet(a1, a2, a3, a4, b1, b2, b3, bOldest))
val membershipState = MembershipState(
gossip,
a1.uniqueAddress,
"dc-a",
2
)
membershipState.ageSortedTopOldestMembersPerDc should equal(Map(
"dc-a" -> SortedSet(a1, a2),
"dc-b" -> SortedSet(bOldest, b1)
))
}
}
}

View file

@ -7,11 +7,14 @@ import akka.actor.Address
object TestMember {
def apply(address: Address, status: MemberStatus): Member =
apply(address, status, Set.empty)
apply(address, status, Set.empty[String])
def apply(address: Address, status: MemberStatus, roles: Set[String], dataCenter: ClusterSettings.DataCenter = ClusterSettings.DefaultDataCenter): Member =
withUniqueAddress(UniqueAddress(address, 0L), status, roles, dataCenter)
def apply(address: Address, status: MemberStatus, upNumber: Int, dc: ClusterSettings.DataCenter): Member =
apply(address, status, Set.empty, dc, upNumber)
def withUniqueAddress(uniqueAddress: UniqueAddress, status: MemberStatus, roles: Set[String], dataCenter: ClusterSettings.DataCenter): Member =
new Member(uniqueAddress, Int.MaxValue, status, roles + (ClusterSettings.DcRolePrefix + dataCenter))
def apply(address: Address, status: MemberStatus, roles: Set[String], dataCenter: ClusterSettings.DataCenter = ClusterSettings.DefaultDataCenter, upNumber: Int = Int.MaxValue): Member =
withUniqueAddress(UniqueAddress(address, 0L), status, roles, dataCenter, upNumber)
def withUniqueAddress(uniqueAddress: UniqueAddress, status: MemberStatus, roles: Set[String], dataCenter: ClusterSettings.DataCenter, upNumber: Int = Int.MaxValue): Member =
new Member(uniqueAddress, upNumber, status, roles + (ClusterSettings.DcRolePrefix + dataCenter))
}

View file

@ -35,9 +35,9 @@ class ClusterMessageSerializerSpec extends AkkaSpec(
import MemberStatus._
val a1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Joining, Set.empty)
val a1 = TestMember(Address("akka.tcp", "sys", "a", 2552), Joining, Set.empty[String])
val b1 = TestMember(Address("akka.tcp", "sys", "b", 2552), Up, Set("r1"))
val c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Leaving, Set.empty, "foo")
val c1 = TestMember(Address("akka.tcp", "sys", "c", 2552), Leaving, Set.empty[String], "foo")
val d1 = TestMember(Address("akka.tcp", "sys", "d", 2552), Exiting, Set("r1"), "foo")
val e1 = TestMember(Address("akka.tcp", "sys", "e", 2552), Down, Set("r3"))
val f1 = TestMember(Address("akka.tcp", "sys", "f", 2552), Removed, Set("r3"), "foo")

View file

@ -3,6 +3,3 @@ akka.actor {
warn-about-java-serializer-usage = off
}
akka.coordinated-shutdown.run-by-jvm-shutdown-hook = off
akka.coordinated-shutdown.terminate-actor-system = off
akka.cluster.run-coordinated-shutdown-when-down = off

View file

@ -37,15 +37,12 @@ along with the implementation of how the messages should be processed.
@@@ div { .group-java }
Actor classes are implemented by extending the `AbstractActor` class and setting
the “initial behavior” in the constructor by calling the `receive` method in
the `AbstractActor`.
the “initial behavior” in `createReceive` method.
The argument to the `receive` method is a `PartialFunction<Object,BoxedUnit>`
that defines which messages your Actor can handle, along with the implementation of
how the messages should be processed.
Don't let the type signature scare you. To allow you to easily build up a partial
function there is a builder named `ReceiveBuilder` that you can use.
`createReceive` method has no arguments and returns `AbstractActor.Receive`. It
defines which messages your Actor can handle, along with the implementation of how
the messages should be processed. You can build such behavior with a builder named
`ReceiveBuilder`. This build has convenient factory in `AbstractActor` called `receiveBuilder`.
@@@
@ -57,8 +54,8 @@ Scala
Java
: @@snip [MyActor.java]($code$/java/jdocs/actor/MyActor.java) { #imports #my-actor }
Please note that the Akka Actor `receive` message loop is exhaustive, which
is different compared to Erlang and the late Scala Actors. This means that you
Please note that the Akka Actor @scala[`receive`] message loop is exhaustive, which
is different compared to Erlang and the late Scala Actors. This means that you
need to provide a pattern match for all messages that it can accept and if you
want to be able to handle unknown messages then you need to have a default case
as in the example above. Otherwise an `akka.actor.UnhandledMessage(message,
@ -69,8 +66,10 @@ Note further that the return type of the behavior defined above is `Unit`; if
the actor shall reply to the received message then this must be done explicitly
as explained below.
The @scala[result of] @java[argument to] the `receive` method is a partial function object, which is
stored within the actor as its “initial behavior”, see [Become/Unbecome](#become-unbecome) for
The result of the @scala[`receive` method is a partial function object, which is]
@java[`createReceive` method is `AbstractActor.Receive` which is a wrapper around partial
scala function object. It is] stored within the actor as its “initial behavior”,
see [Become/Unbecome](#become-unbecome) for
further information on changing the behavior of an actor after its
construction.
@ -169,12 +168,13 @@ which simultaneously safe-guards against these edge cases.
#### Recommended Practices
It is a good idea to provide factory methods on the companion object of each
`Actor` which help keeping the creation of suitable `Props` as
close to the actor definition as possible. This also avoids the pitfalls
associated with using the @scala[`Props.apply(...)`] @java[ `Props.create(...)`] method which takes a by-name
argument, since within a companion object the given code block will not retain
a reference to its enclosing scope:
It is a good idea to provide @scala[factory methods on the companion object of each
`Actor`] @java[static factory methods for each `Actor`] which help keeping the creation of
suitable `Props` as close to the actor definition as possible. This also avoids the pitfalls
associated with using the @scala[`Props.apply(...)` method which takes a by-name
argument, since within a companion object] @java[ `Props.create(...)` method which takes
arguments as constructor parameters, since within static method]
the given code block will not retain a reference to its enclosing scope:
Scala
: @@snip [ActorDocSpec.scala]($code$/scala/docs/actor/ActorDocSpec.scala) { #props-factory }
@ -320,7 +320,7 @@ last line. Watching an actor is quite simple as well:
@scala[The `Actor` trait defines only one abstract method, the above mentioned
`receive`, which implements the behavior of the actor.]
@java[The `AbstractActor` class defines a method called `receive`,
@java[The `AbstractActor` class defines a method called `createReceive`,
that is used to set the “initial behavior” of the actor.]
If the current actor behavior does not match a received message,

View file

@ -336,7 +336,7 @@ stdout logger is `WARNING` and it can be silenced completely by setting
<a id="slf4j"></a>
## SLF4J
Akka provides a logger for [SL4FJ](http://www.slf4j.org/). This module is available in the 'akka-slf4j.jar'.
Akka provides a logger for [SLF4J](http://www.slf4j.org/). This module is available in the 'akka-slf4j.jar'.
It has a single dependency: the slf4j-api jar. In your runtime, you also need a SLF4J backend. We recommend [Logback](http://logback.qos.ch/):
sbt

View file

@ -25,7 +25,7 @@ Here is an example of a @extref[sample project](samples:akka-sample-multi-node-s
## Running tests
The multi-JVM tasks are similar to the normal tasks: `test`, `test-only`,
The multi-JVM tasks are similar to the normal tasks: `test`, `testOnly`,
and `run`, but are under the `multi-jvm` configuration.
So in Akka, to run all the multi-JVM tests in the akka-remote project use (at
@ -43,20 +43,20 @@ project akka-remote-tests
multi-jvm:test
```
To run individual tests use `test-only`:
To run individual tests use `testOnly`:
```none
multi-jvm:test-only akka.remote.RandomRoutedRemoteActor
multi-jvm:testOnly akka.remote.RandomRoutedRemoteActor
```
More than one test name can be listed to run multiple specific
tests. Tab-completion in sbt makes it easy to complete the test names.
It's also possible to specify JVM options with `test-only` by including those
It's also possible to specify JVM options with `testOnly` by including those
options after the test names and `--`. For example:
```none
multi-jvm:test-only akka.remote.RandomRoutedRemoteActor -- -Dsome.option=something
multi-jvm:testOnly akka.remote.RandomRoutedRemoteActor -- -Dsome.option=something
```
## Creating application tests
@ -190,7 +190,7 @@ class SpecMultiJvmNode2 extends WordSpec with MustMatchers {
}
```
To run just these tests you would call `multi-jvm:test-only sample.Spec` at
To run just these tests you would call `multi-jvm:testOnly sample.Spec` at
the sbt prompt.
## Multi Node Additions

View file

@ -121,10 +121,10 @@ Here are some examples of how you define hosts:
### Running the Multi Node Tests
To run all the multi node test in multi-node mode (i.e. distributing the jar files and kicking off the tests
remotely) from inside sbt, use the `multi-node-test` task:
remotely) from inside sbt, use the `multiNodeTest` task:
```none
multi-node-test
multiNodeTest
```
To run all of them in multi-jvm mode (i.e. all JVMs on the local machine) do:
@ -133,16 +133,16 @@ To run all of them in multi-jvm mode (i.e. all JVMs on the local machine) do:
multi-jvm:test
```
To run individual tests use the `multi-node-test-only` task:
To run individual tests use the `multiNodeTestOnly` task:
```none
multi-node-test-only your.MultiNodeTest
multiNodeTestOnly your.MultiNodeTest
```
To run individual tests in the multi-jvm mode do:
```none
multi-jvm:test-only your.MultiNodeTest
multi-jvm:testOnly your.MultiNodeTest
```
More than one test name can be listed to run multiple specific tests. Tab completion in sbt makes it easy to

View file

@ -206,6 +206,20 @@ Scala
Java
: @@snip [RecipeMultiGroupByTest.java]($code$/java/jdocs/stream/javadsl/cookbook/RecipeMultiGroupByTest.java) { #multi-groupby }
### Adhoc source
**Situation:** The idea is that you have a source which you don't want to start until you have a demand.
Also, you want to shutdown it down when there is no more demand, and start it up again there is new demand again.
You can achieve this behavior by combining `lazily`, `backpressureTimeout` and `recoverWithRetries` as follows:
Scala
: @@snip [RecipeAdhocSource.scala]($code$/scala/docs/stream/cookbook/RecipeAdhocSource.scala) { #adhoc-source }
Java
: @@snip [RecipeAdhocSourceTest.scala]($code$/java/jdocs/stream/javadsl/cookbook/RecipeAdhocSourceTest.java) { #adhoc-source }
## Working with Graphs
In this collection we show recipes that use stream graph elements to achieve various goals.
@ -470,4 +484,4 @@ Scala
: @@snip [RecipeKeepAlive.scala]($code$/scala/docs/stream/cookbook/RecipeKeepAlive.scala) { #inject-keepalive }
Java
: @@snip [RecipeKeepAlive.java]($code$/java/jdocs/stream/javadsl/cookbook/RecipeKeepAlive.java) { #inject-keepalive }
: @@snip [RecipeKeepAlive.java]($code$/java/jdocs/stream/javadsl/cookbook/RecipeKeepAlive.java) { #inject-keepalive }

View file

@ -26,7 +26,7 @@ slowing down the upstream producer to match their consumption speeds.
In the context of Akka Streams back-pressure is always understood as *non-blocking* and *asynchronous*.
Non-Blocking
: Means that a certain operation does not hinder the progress of the calling thread, even if it takes long time to
: Means that a certain operation does not hinder the progress of the calling thread, even if it takes a long time to
finish the requested operation.
Graph
@ -354,4 +354,57 @@ been signalled already thus the ordering in the case of zipping is defined b
If you find yourself in need of fine grained control over order of emitted elements in fan-in
scenarios consider using `MergePreferred`, `MergePrioritized` or `GraphStage` which gives you full control over how the
merge is performed.
merge is performed.
# Actor Materializer Lifecycle
An important aspect of working with streams and actors is understanding an `ActorMaterializer`'s life-cycle.
The materializer is bound to the lifecycle of the `ActorRefFactory` it is created from, which in practice will
be either an `ActorSystem` or `ActorContext` (when the materializer is created within an `Actor`).
The usual way of creating an `ActorMaterializer` is to create it next to your `ActorSystem`,
which likely is in a "main" class of your application:
Scala
: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #materializer-from-system }
Java
: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #materializer-from-system }
In this case the streams run by the materializer will run until it is shut down. When the materializer is shut down
*before* the streams have run to completion, they will be terminated abruptly. This is a little different than the
usual way to terminate streams, which is by cancelling/completing them. The stream lifecycles are bound to the materializer
like this to prevent leaks, and in normal operations you should not rely on the mechanism and rather use `KillSwitch` or
normal completion signals to manage the lifecycles of your streams.
If we look at the following example, where we create the `ActorMaterializer` within an `Actor`:
Scala
: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #materializer-from-actor-context }
Java
: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #materializer-from-actor-context }
In the above example we used the `ActorContext` to create the materializer. This binds its lifecycle to the surrounding `Actor`. In other words, while the stream we started there would under normal circumstances run forever, if we stop the Actor it would terminate the stream as well. We have *bound the streams' lifecycle to the surrounding actor's lifecycle*.
This is a very useful technique if the stream is closely related to the actor, e.g. when the actor represents an user or other entity, that we continiously query using the created stream -- and it would not make sense to keep the stream alive when the actor has terminated already. The streams termination will be signalled by an "Abrupt termination exception" signaled by the stream.
You may also cause an `ActorMaterializer` to shutdown by explicitly calling `shutdown()` on it, resulting in abruptly terminating all of the streams it has been running then.
Sometimes however you may want to explicitly create a stream that will out-last the actor's life. For example, if you want to continue pushing some large stream of data to an external service and are doing so via an Akka stream while you already want to eagerly stop the Actor since it has performed all of it's duties already:
Scala
: @@snip [FlowDocSpec.scala]($code$/scala/docs/stream/FlowDocSpec.scala) { #materializer-from-system-in-actor }
Java
: @@snip [FlowDocTest.java]($code$/java/jdocs/stream/FlowDocTest.java) { #materializer-from-system-in-actor }
In the above example we pass in a materializer to the Actor, which results in binding its lifecycle to the entire `ActorSystem` rather than the single enclosing actor. This can be useful if you want to share a materializer or group streams into specific materializers,
for example because of the materializer's settings etc.
@@@ warning
Do not create new actor materializers inside actors by passing the `context.system` to it.
This will cause a new @ActorMaterializer@ to be created and potentially leaked (unless you shut it down explicitly) for each such actor.
It is instead recommended to either pass-in the Materializer or create one using the actor's `context`.
@@@

View file

@ -11,6 +11,7 @@ import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import akka.NotUsed;
import akka.actor.AbstractActor;
import akka.japi.Pair;
import jdocs.AbstractJavaTest;
import akka.testkit.javadsl.TestKit;
@ -279,4 +280,64 @@ public class FlowDocTest extends AbstractJavaTest {
//#flow-async
}
static {
//#materializer-from-system
ActorSystem system = ActorSystem.create("ExampleSystem");
// created from `system`:
ActorMaterializer mat = ActorMaterializer.create(system);
//#materializer-from-system
}
//#materializer-from-actor-context
final class RunWithMyself extends AbstractActor {
ActorMaterializer mat = ActorMaterializer.create(context());
@Override
public void preStart() throws Exception {
Source
.repeat("hello")
.runWith(Sink.onComplete(tryDone -> {
System.out.println("Terminated stream: " + tryDone);
}), mat);
}
@Override
public Receive createReceive() {
return receiveBuilder().match(String.class, p -> {
// this WILL terminate the above stream as well
context().stop(self());
}).build();
}
}
//#materializer-from-actor-context
//#materializer-from-system-in-actor
final class RunForever extends AbstractActor {
final ActorMaterializer mat;
RunForever(ActorMaterializer mat) {
this.mat = mat;
}
@Override
public void preStart() throws Exception {
Source
.repeat("hello")
.runWith(Sink.onComplete(tryDone -> {
System.out.println("Terminated stream: " + tryDone);
}), mat);
}
@Override
public Receive createReceive() {
return receiveBuilder().match(String.class, p -> {
// will NOT terminate the stream (it's bound to the system!)
context().stop(self());
}).build();
}
//#materializer-from-system-in-actor
}
}

View file

@ -0,0 +1,210 @@
/**
* Copyright (C) 2015-2017 Lightbend Inc. <http://www.lightbend.com/>
*/
package jdocs.stream.javadsl.cookbook;
import akka.Done;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import akka.japi.pf.PFBuilder;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Source;
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.javadsl.TestSink;
import akka.testkit.javadsl.TestKit;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.Ignore;
import scala.concurrent.Promise;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
public class RecipeAdhocSourceTest extends RecipeTest {
static ActorSystem system;
static Materializer mat;
FiniteDuration duration200mills = Duration.create(200, "milliseconds");
@BeforeClass
public static void setup() {
system = ActorSystem.create("RecipeAdhocSource");
mat = ActorMaterializer.create(system);
}
@AfterClass
public static void tearDown() {
TestKit.shutdownActorSystem(system);
system = null;
mat = null;
}
//#adhoc-source
public <T> Source<T, ?> adhocSource(Source<T, ?> source, FiniteDuration timeout, int maxRetries) {
return Source.lazily(
() -> source.backpressureTimeout(timeout).recoverWithRetries(
maxRetries,
new PFBuilder()
.match(TimeoutException.class, ex -> Source.lazily(() -> source.backpressureTimeout(timeout)))
.build()
)
);
}
//#adhoc-source
@Test
@Ignore
public void noStart() throws Exception {
new TestKit(system) {
{
AtomicBoolean isStarted = new AtomicBoolean();
adhocSource(
Source.empty().mapMaterializedValue(x -> {isStarted.set(true); return x;}), duration200mills, 3);
Thread.sleep(300);
assertEquals(false, isStarted.get());
}
};
}
@Test
@Ignore
public void startStream() throws Exception {
new TestKit(system) {
{
TestSubscriber.Probe<String> probe = adhocSource(Source.repeat("a"), duration200mills, 3)
.toMat(TestSink.probe(system), Keep.right())
.run(mat);
probe.requestNext("a");
}
};
}
@Test
@Ignore
public void shutdownStream() throws Exception {
new TestKit(system) {
{
Promise<Done> shutdown = Futures.promise();
TestSubscriber.Probe<String> probe = adhocSource(
Source.repeat("a").watchTermination((a, term) ->
term.thenRun(() -> shutdown.success(Done.getInstance()))
), duration200mills, 3)
.toMat(TestSink.probe(system), Keep.right())
.run(mat);
probe.requestNext("a");
Thread.sleep(500);
assertEquals(true, shutdown.isCompleted());
}
};
}
@Test
@Ignore
public void notShutDownStream() throws Exception {
new TestKit(system) {
{
Promise<Done> shutdown = Futures.promise();
TestSubscriber.Probe<String> probe =
adhocSource(
Source.repeat("a").watchTermination((a, term) ->
term.thenRun(() -> shutdown.success(Done.getInstance()))
), duration200mills, 3)
.toMat(TestSink.probe(system), Keep.right())
.run(mat);
probe.requestNext("a");
Thread.sleep(100);
probe.requestNext("a");
Thread.sleep(100);
probe.requestNext("a");
Thread.sleep(100);
probe.requestNext("a");
Thread.sleep(100);
probe.requestNext("a");
Thread.sleep(100);
assertEquals(false, shutdown.isCompleted());
}
};
}
@Test
@Ignore
public void restartUponDemand() throws Exception {
new TestKit(system) {
{
Promise<Done> shutdown = Futures.promise();
AtomicInteger startedCount = new AtomicInteger(0);
Source<String, ?> source = Source
.empty().mapMaterializedValue(x -> startedCount.incrementAndGet())
.concat(Source.repeat("a"));
TestSubscriber.Probe<String> probe =
adhocSource(source.watchTermination((a, term) ->
term.thenRun(() -> shutdown.success(Done.getInstance()))
), duration200mills, 3)
.toMat(TestSink.probe(system), Keep.right())
.run(mat);
probe.requestNext("a");
assertEquals(1, startedCount.get());
Thread.sleep(500);
assertEquals(true, shutdown.isCompleted());
}
};
}
@Test
@Ignore
public void restartUptoMaxRetries() throws Exception {
new TestKit(system) {
{
Promise<Done> shutdown = Futures.promise();
AtomicInteger startedCount = new AtomicInteger(0);
Source<String, ?> source = Source
.empty().mapMaterializedValue(x -> startedCount.incrementAndGet())
.concat(Source.repeat("a"));
TestSubscriber.Probe<String> probe =
adhocSource(source.watchTermination((a, term) ->
term.thenRun(() -> shutdown.success(Done.getInstance()))
), duration200mills, 3)
.toMat(TestSink.probe(system), Keep.right())
.run(mat);
probe.requestNext("a");
assertEquals(1, startedCount.get());
Thread.sleep(500);
assertEquals(true, shutdown.isCompleted());
Thread.sleep(500);
probe.requestNext("a");
assertEquals(2, startedCount.get());
Thread.sleep(500);
probe.requestNext("a");
assertEquals(3, startedCount.get());
Thread.sleep(500);
probe.requestNext("a");
assertEquals(4, startedCount.get()); //startCount == 4, which means "re"-tried 3 times
Thread.sleep(500);
assertEquals(TimeoutException.class, probe.expectError().getClass());
probe.request(1); //send demand
probe.expectNoMessage(Duration.create(200, "milliseconds")); //but no more restart
}
};
}
}

View file

@ -3,15 +3,17 @@
*/
package docs.stream
import akka.NotUsed
import akka.actor.Cancellable
import akka.stream.{ ClosedShape, FlowShape }
import akka.{ Done, NotUsed }
import akka.actor.{ Actor, ActorSystem, Cancellable }
import akka.stream.{ ActorMaterializer, ClosedShape, FlowShape, Materializer }
import akka.stream.scaladsl._
import akka.testkit.AkkaSpec
import docs.CompileOnlySpec
import scala.concurrent.{ Promise, Future }
import scala.concurrent.{ Future, Promise }
import scala.util.{ Failure, Success }
class FlowDocSpec extends AkkaSpec {
class FlowDocSpec extends AkkaSpec with CompileOnlySpec {
implicit val ec = system.dispatcher
@ -229,3 +231,48 @@ class FlowDocSpec extends AkkaSpec {
//#flow-async
}
}
object FlowDocSpec {
{
//#materializer-from-system
implicit val system = ActorSystem("ExampleSystem")
implicit val mat = ActorMaterializer() // created from `system`
//#materializer-from-system
}
//#materializer-from-actor-context
final class RunWithMyself extends Actor {
implicit val mat = ActorMaterializer()
Source.maybe
.runWith(Sink.onComplete {
case Success(done) println(s"Completed: $done")
case Failure(ex) println(s"Failed: ${ex.getMessage}")
})
def receive = {
case "boom"
context.stop(self) // will also terminate the stream
}
}
//#materializer-from-actor-context
//#materializer-from-system-in-actor
final class RunForever(implicit val mat: Materializer) extends Actor {
Source.maybe
.runWith(Sink.onComplete {
case Success(done) println(s"Completed: $done")
case Failure(ex) println(s"Failed: ${ex.getMessage}")
})
def receive = {
case "boom"
context.stop(self) // will NOT terminate the stream (it's bound to the system!)
}
}
//#materializer-from-system-in-actor
}

View file

@ -0,0 +1,132 @@
package docs.stream.cookbook
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
import akka.stream.scaladsl.{ Keep, Sink, Source }
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.{ Graph, SourceShape }
import akka.testkit.TimingTest
import akka.{ Done, NotUsed }
import scala.concurrent._
import scala.concurrent.duration._
class RecipeAdhocSource extends RecipeSpec {
//#adhoc-source
def adhocSource[T](source: Source[T, _], timeout: FiniteDuration, maxRetries: Int): Source[T, _] =
Source.lazily(
() source.backpressureTimeout(timeout).recoverWithRetries(maxRetries, {
case t: TimeoutException
Source.lazily(() source.backpressureTimeout(timeout)).mapMaterializedValue(_ NotUsed)
})
)
//#adhoc-source
"Recipe for adhoc source" must {
"not start the source if there is no demand" taggedAs TimingTest in {
val isStarted = new AtomicBoolean()
adhocSource(Source.empty.mapMaterializedValue(_ isStarted.set(true)), 200.milliseconds, 3)
.runWith(TestSink.probe[Int])
Thread.sleep(300)
isStarted.get() should be(false)
}
"start the source when there is a demand" taggedAs TimingTest in {
val sink = adhocSource(Source.repeat("a"), 200.milliseconds, 3)
.runWith(TestSink.probe[String])
sink.requestNext("a")
}
"shut down the source when the next demand times out" taggedAs TimingTest in {
val shutdown = Promise[Done]()
val sink = adhocSource(
Source.repeat("a").watchTermination() { (_, term)
shutdown.completeWith(term)
}, 200.milliseconds, 3)
.runWith(TestSink.probe[String])
sink.requestNext("a")
Thread.sleep(500)
shutdown.isCompleted should be(true)
}
"not shut down the source when there are still demands" taggedAs TimingTest in {
val shutdown = Promise[Done]()
val sink = adhocSource(
Source.repeat("a").watchTermination() { (_, term)
shutdown.completeWith(term)
}, 200.milliseconds, 3)
.runWith(TestSink.probe[String])
sink.requestNext("a")
Thread.sleep(100)
sink.requestNext("a")
Thread.sleep(100)
sink.requestNext("a")
Thread.sleep(100)
sink.requestNext("a")
Thread.sleep(100)
sink.requestNext("a")
Thread.sleep(100)
shutdown.isCompleted should be(false)
}
"restart upon demand again after timeout" taggedAs TimingTest in {
val shutdown = Promise[Done]()
val startedCount = new AtomicInteger(0)
val source = Source
.empty.mapMaterializedValue(_ startedCount.incrementAndGet())
.concat(Source.repeat("a"))
val sink = adhocSource(source.watchTermination() { (_, term)
shutdown.completeWith(term)
}, 200.milliseconds, 3)
.runWith(TestSink.probe[String])
sink.requestNext("a")
startedCount.get() should be(1)
Thread.sleep(500)
shutdown.isCompleted should be(true)
}
"restart up to specified maxRetries" taggedAs TimingTest in {
val shutdown = Promise[Done]()
val startedCount = new AtomicInteger(0)
val source = Source
.empty.mapMaterializedValue(_ startedCount.incrementAndGet())
.concat(Source.repeat("a"))
val sink = adhocSource(source.watchTermination() { (_, term)
shutdown.completeWith(term)
}, 200.milliseconds, 3)
.runWith(TestSink.probe[String])
sink.requestNext("a")
startedCount.get() should be(1)
Thread.sleep(500)
shutdown.isCompleted should be(true)
Thread.sleep(500)
sink.requestNext("a")
startedCount.get() should be(2)
Thread.sleep(500)
sink.requestNext("a")
startedCount.get() should be(3)
Thread.sleep(500)
sink.requestNext("a")
startedCount.get() should be(4) //startCount == 4, which means "re"-tried 3 times
Thread.sleep(500)
sink.expectError().getClass should be(classOf[TimeoutException])
sink.request(1) //send demand
sink.expectNoMessage(200.milliseconds) //but no more restart
}
}
}

View file

@ -217,6 +217,7 @@ object MultiNodeSpec {
loggers = ["akka.testkit.TestEventListener"]
loglevel = "WARNING"
stdout-loglevel = "WARNING"
coordinated-shutdown.terminate-actor-system = off
coordinated-shutdown.run-by-jvm-shutdown-hook = off
actor {
default-dispatcher {
@ -308,11 +309,11 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
}
}
}
shutdown(system)
shutdown(system, duration = shutdownTimeout)
afterTermination()
}
def shutdownTimeout: FiniteDuration = 5.seconds.dilated
def shutdownTimeout: FiniteDuration = 15.seconds.dilated
/**
* Override this and return `true` to assert that the

View file

@ -25,7 +25,6 @@ class RemoteRoundRobinConfig(artery: Boolean) extends MultiNodeConfig {
commonConfig(debugConfig(on = false).withFallback(
ConfigFactory.parseString(s"""
akka.loglevel = DEBUG
akka.remote.artery.enabled = $artery
""")).withFallback(RemotingMultiNodeSpec.commonConfig))

View file

@ -81,6 +81,9 @@ class RemoteScatterGatherSpec(multiNodeConfig: RemoteScatterGatherConfig) extend
val connectionCount = 3
val iterationCount = 10
// let them start
Thread.sleep(2000)
for (i 0 until iterationCount; k 0 until connectionCount) {
actor ! "hit"
}
@ -104,7 +107,7 @@ class RemoteScatterGatherSpec(multiNodeConfig: RemoteScatterGatherConfig) extend
enterBarrier("done")
}
enterBarrier("done")
enterBarrier("all-done")
}
}
}

View file

@ -19,6 +19,7 @@ import scala.util.Failure
import scala.util.Success
import scala.util.control.NoStackTrace
import scala.util.control.NonFatal
import akka.Done
import akka.NotUsed
import akka.actor._
@ -530,7 +531,11 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
val maybeDriver = mediaDriver.getAndSet(None)
maybeDriver.foreach { driver
// this is only for embedded media driver
driver.close()
try driver.close() catch {
case NonFatal(e)
// don't think driver.close will ever throw, but just in case
log.warning("Couldn't close Aeron embedded media driver due to [{}]", e.getMessage)
}
try {
if (settings.Advanced.DeleteAeronDirectory) {
@ -619,12 +624,14 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
log.debug("Inbound channel is now active")
} else if (status == ChannelEndpointStatus.ERRORED) {
areonErrorLog.logErrors(log, 0L)
stopMediaDriver()
throw new RemoteTransportException("Inbound Aeron channel is in errored state. See Aeron logs for details.")
} else if (status == ChannelEndpointStatus.INITIALIZING && retries > 0) {
Thread.sleep(waitInterval)
retry(retries - 1)
} else {
areonErrorLog.logErrors(log, 0L)
stopMediaDriver()
throw new RemoteTransportException("Timed out waiting for Aeron transport to bind. See Aeoron logs.")
}
}

View file

@ -220,7 +220,7 @@ object TestPublisher {
this
}
def sendError(cause: Exception): Self = {
def sendError(cause: Throwable): Self = {
subscription.sendError(cause)
this
}
@ -795,7 +795,7 @@ private[testkit] object StreamTestKit {
def sendNext(element: I): Unit = subscriber.onNext(element)
def sendComplete(): Unit = subscriber.onComplete()
def sendError(cause: Exception): Unit = subscriber.onError(cause)
def sendError(cause: Throwable): Unit = subscriber.onError(cause)
def sendOnSubscribe(): Unit = subscriber.onSubscribe(this)
}

View file

@ -131,9 +131,9 @@ class StreamTestKitSpec extends AkkaSpec {
"#expectNextWithTimeoutPF should fail after timeout when element delayed" in {
intercept[AssertionError] {
val timeout = 100 millis
val overTimeout = timeout + (10 millis)
Source.tick(overTimeout, 1 millis, 1).runWith(TestSink.probe)
val timeout = 100.millis
val overTimeout = timeout + (10.millis)
Source.tick(overTimeout, 1.millis, 1).runWith(TestSink.probe)
.request(1)
.expectNextWithTimeoutPF(timeout, {
case 1
@ -169,9 +169,9 @@ class StreamTestKitSpec extends AkkaSpec {
"#expectNextChainingPF should fail after timeout when element delayed" in {
intercept[AssertionError] {
val timeout = 100 millis
val overTimeout = timeout + (10 millis)
Source.tick(overTimeout, 1 millis, 1).runWith(TestSink.probe)
val timeout = 100.millis
val overTimeout = timeout + (10.millis)
Source.tick(overTimeout, 1.millis, 1).runWith(TestSink.probe)
.request(1)
.expectNextChainingPF(timeout, {
case 1

View file

@ -11,6 +11,8 @@ import akka.japi.Pair;
import akka.japi.function.*;
import akka.japi.pf.PFBuilder;
import akka.stream.*;
import akka.stream.testkit.TestSubscriber;
import akka.stream.testkit.javadsl.TestSink;
import akka.util.ConstantFun;
import akka.stream.stage.*;
import akka.testkit.AkkaSpec;
@ -655,6 +657,28 @@ public class SourceTest extends StreamTest {
future.toCompletableFuture().get(3, TimeUnit.SECONDS);
}
@Test
public void mustBeAbleToCombineMat() throws Exception {
final TestKit probe = new TestKit(system);
final Source<Integer, SourceQueueWithComplete<Integer>> source1 = Source.queue(1, OverflowStrategy.dropNew());
final Source<Integer, NotUsed> source2 = Source.from(Arrays.asList(2, 3));
// compiler to check the correct materialized value of type = SourceQueueWithComplete<Integer> available
final Source<Integer, SourceQueueWithComplete<Integer>> combined = Source.combineMat(
source1, source2, width -> Concat.<Integer> create(width), Keep.left()); //Keep.left() (i.e. preserve queueSource's materialized value)
SourceQueueWithComplete<Integer> queue = combined
.toMat(Sink.foreach(elem -> probe.getRef().tell(elem, ActorRef.noSender())), Keep.left())
.run(materializer);
queue.offer(0);
queue.offer(1);
queue.complete(); //complete queueSource so that combined with `Concat` pulls elements from queueSource
// elements from source1 (i.e. first of combined source) come first, then source2 elements, due to `Concat`
probe.expectMsgAllOf(0, 1, 2, 3);
}
@Test
public void mustBeAbleToZipN() throws Exception {
final TestKit probe = new TestKit(system);

View file

@ -1,13 +1,16 @@
package akka.stream
import akka.actor.{ ActorSystem, Props }
import akka.Done
import akka.actor.{ Actor, ActorSystem, PoisonPill, Props }
import akka.stream.ActorMaterializerSpec.ActorWithMaterializer
import akka.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.scaladsl.{ Flow, Sink, Source }
import akka.stream.testkit.{ StreamSpec, TestPublisher }
import akka.testkit.{ ImplicitSender, TestActor }
import akka.testkit.{ ImplicitSender, TestActor, TestProbe }
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.{ Failure, Try }
class ActorMaterializerSpec extends StreamSpec with ImplicitSender {
@ -47,7 +50,18 @@ class ActorMaterializerSpec extends StreamSpec with ImplicitSender {
m.shutdown()
m.supervisor ! StreamSupervisor.GetChildren
expectNoMsg(1.second)
expectNoMessage(1.second)
}
"terminate if ActorContext it was created from terminates" in {
val p = TestProbe()
val a = system.actorOf(Props(new ActorWithMaterializer(p)).withDispatcher("akka.test.stream-dispatcher"))
p.expectMsg("hello")
p.expectMsg("one")
a ! PoisonPill
val Failure(ex) = p.expectMsgType[Try[Done]]
}
"handle properly broken Props" in {
@ -67,3 +81,19 @@ class ActorMaterializerSpec extends StreamSpec with ImplicitSender {
}
}
object ActorMaterializerSpec {
class ActorWithMaterializer(p: TestProbe) extends Actor {
private val settings: ActorMaterializerSettings = ActorMaterializerSettings(context.system).withDispatcher("akka.test.stream-dispatcher")
implicit val mat = ActorMaterializer(settings)(context)
Source.repeat("hello")
.alsoTo(Flow[String].take(1).to(Sink.actorRef(p.ref, "one")))
.runWith(Sink.onComplete(signal {
println(signal)
p.ref ! signal
}))
def receive = Actor.emptyBehavior
}
}

View file

@ -124,10 +124,6 @@ class FlowThrottleSpec extends StreamSpec {
(1 to 5) foreach upstream.sendNext
downstream.receiveWithin(300.millis, 5) should be(1 to 5)
downstream.request(1)
upstream.sendNext(6)
downstream.expectNoMsg(100.millis)
downstream.expectNext(6)
downstream.request(5)
downstream.expectNoMsg(1200.millis)
for (i 7 to 11) upstream.sendNext(i)

View file

@ -18,6 +18,8 @@ import scala.collection.immutable
import java.util
import java.util.stream.BaseStream
import akka.stream.testkit.scaladsl.TestSink
class SourceSpec extends StreamSpec with DefaultTimeout {
implicit val materializer = ActorMaterializer()
@ -141,6 +143,45 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
out.expectComplete()
}
"combine from two inputs with combinedMat and take a materialized value" in {
val queueSource = Source.queue[Int](1, OverflowStrategy.dropBuffer)
val intSeqSource = Source(1 to 3)
// compiler to check the correct materialized value of type = SourceQueueWithComplete[Int] available
val combined1: Source[Int, SourceQueueWithComplete[Int]] =
Source.combineMat(queueSource, intSeqSource)(Concat(_))(Keep.left) //Keep.left (i.e. preserve queueSource's materialized value)
val (queue1, sinkProbe1) = combined1.toMat(TestSink.probe[Int])(Keep.both).run()
sinkProbe1.request(6)
queue1.offer(10)
queue1.offer(20)
queue1.offer(30)
queue1.complete() //complete queueSource so that combined1 with `Concat` then pulls elements from intSeqSource
sinkProbe1.expectNext(10)
sinkProbe1.expectNext(20)
sinkProbe1.expectNext(30)
sinkProbe1.expectNext(1)
sinkProbe1.expectNext(2)
sinkProbe1.expectNext(3)
// compiler to check the correct materialized value of type = SourceQueueWithComplete[Int] available
val combined2: Source[Int, SourceQueueWithComplete[Int]] =
//queueSource to be the second of combined source
Source.combineMat(intSeqSource, queueSource)(Concat(_))(Keep.right) //Keep.right (i.e. preserve queueSource's materialized value)
val (queue2, sinkProbe2) = combined2.toMat(TestSink.probe[Int])(Keep.both).run()
sinkProbe2.request(6)
queue2.offer(10)
queue2.offer(20)
queue2.offer(30)
queue2.complete() //complete queueSource so that combined1 with `Concat` then pulls elements from queueSource
sinkProbe2.expectNext(1) //as intSeqSource iss the first in combined source, elements from intSeqSource come first
sinkProbe2.expectNext(2)
sinkProbe2.expectNext(3)
sinkProbe2.expectNext(10) //after intSeqSource run out elements, queueSource elements come
sinkProbe2.expectNext(20)
sinkProbe2.expectNext(30)
}
}
"Repeat Source" must {

View file

@ -17,4 +17,7 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper$I
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper$Initialized$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper$NotInitialized$")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper$CallbackState")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper")
ProblemFilters.exclude[MissingClassProblem]("akka.stream.stage.CallbackWrapper")
# Optimize TCP stream writes
ProblemFilters.exclude[Problem]("akka.stream.impl.io.*")

View file

@ -72,6 +72,16 @@ akka {
# of 1 on the corresponding dispatchers.
fuzzing-mode = off
}
io.tcp {
# The outgoing bytes are accumulated in a buffer while waiting for acknoledgment
# of pending write. This improves throughput for small messages (frames) without
# sacrificing latency. While waiting for the ack the stage will eagerly pull
# from upstream until the buffer exceeds this size. That means that the buffer may hold
# slightly more bytes than this limit (at most one element more). It can be set to 0
# to disable the usage of the buffer.
write-buffer-size = 16 KiB
}
}
# Fully qualified config path which holds the dispatcher configuration

View file

@ -272,7 +272,8 @@ object ActorMaterializerSettings {
fuzzingMode = config.getBoolean("debug.fuzzing-mode"),
autoFusing = config.getBoolean("auto-fusing"),
maxFixedBufferSize = config.getInt("max-fixed-buffer-size"),
syncProcessingLimit = config.getInt("sync-processing-limit"))
syncProcessingLimit = config.getInt("sync-processing-limit"),
ioSettings = IOSettings(config.getConfig("io")))
/**
* Create [[ActorMaterializerSettings]] from individual settings (Java).
@ -322,7 +323,25 @@ final class ActorMaterializerSettings private (
val fuzzingMode: Boolean,
val autoFusing: Boolean,
val maxFixedBufferSize: Int,
val syncProcessingLimit: Int) {
val syncProcessingLimit: Int,
val ioSettings: IOSettings) {
// backwards compatibility when added IOSettings, shouldn't be needed since private, but added to satisfy mima
def this(
initialInputBufferSize: Int,
maxInputBufferSize: Int,
dispatcher: String,
supervisionDecider: Supervision.Decider,
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
debugLogging: Boolean,
outputBurstLimit: Int,
fuzzingMode: Boolean,
autoFusing: Boolean,
maxFixedBufferSize: Int,
syncProcessingLimit: Int) =
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit,
IOSettings(tcpWriteBufferSize = 16 * 1024))
def this(
initialInputBufferSize: Int,
@ -334,10 +353,9 @@ final class ActorMaterializerSettings private (
outputBurstLimit: Int,
fuzzingMode: Boolean,
autoFusing: Boolean,
maxFixedBufferSize: Int) {
maxFixedBufferSize: Int) =
this(initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, defaultMaxFixedBufferSize)
}
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
require(syncProcessingLimit > 0, "syncProcessingLimit must be > 0")
@ -356,10 +374,11 @@ final class ActorMaterializerSettings private (
fuzzingMode: Boolean = this.fuzzingMode,
autoFusing: Boolean = this.autoFusing,
maxFixedBufferSize: Int = this.maxFixedBufferSize,
syncProcessingLimit: Int = this.syncProcessingLimit) = {
syncProcessingLimit: Int = this.syncProcessingLimit,
ioSettings: IOSettings = this.ioSettings) = {
new ActorMaterializerSettings(
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit)
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize, syncProcessingLimit, ioSettings)
}
/**
@ -465,6 +484,10 @@ final class ActorMaterializerSettings private (
if (settings == this.subscriptionTimeoutSettings) this
else copy(subscriptionTimeoutSettings = settings)
def withIOSettings(ioSettings: IOSettings): ActorMaterializerSettings =
if (ioSettings == this.ioSettings) this
else copy(ioSettings = ioSettings)
private def requirePowerOfTwo(n: Integer, name: String): Unit = {
require(n > 0, s"$name must be > 0")
require((n & (n - 1)) == 0, s"$name must be a power of two")
@ -481,11 +504,52 @@ final class ActorMaterializerSettings private (
s.outputBurstLimit == outputBurstLimit &&
s.syncProcessingLimit == syncProcessingLimit &&
s.fuzzingMode == fuzzingMode &&
s.autoFusing == autoFusing
s.autoFusing == autoFusing &&
s.ioSettings == ioSettings
case _ false
}
override def toString: String = s"ActorMaterializerSettings($initialInputBufferSize,$maxInputBufferSize,$dispatcher,$supervisionDecider,$subscriptionTimeoutSettings,$debugLogging,$outputBurstLimit,$syncProcessingLimit,$fuzzingMode,$autoFusing)"
override def toString: String = s"ActorMaterializerSettings($initialInputBufferSize,$maxInputBufferSize," +
s"$dispatcher,$supervisionDecider,$subscriptionTimeoutSettings,$debugLogging,$outputBurstLimit," +
s"$syncProcessingLimit,$fuzzingMode,$autoFusing,$ioSettings)"
}
object IOSettings {
def apply(system: ActorSystem): IOSettings =
apply(system.settings.config.getConfig("akka.stream.materializer.io"))
def apply(config: Config): IOSettings =
new IOSettings(
tcpWriteBufferSize = math.min(Int.MaxValue, config.getBytes("tcp.write-buffer-size")).toInt)
def apply(tcpWriteBufferSize: Int): IOSettings =
new IOSettings(tcpWriteBufferSize)
/** Java API */
def create(config: Config) = apply(config)
/** Java API */
def create(system: ActorSystem) = apply(system)
/** Java API */
def create(tcpWriteBufferSize: Int): IOSettings =
apply(tcpWriteBufferSize)
}
final class IOSettings private (val tcpWriteBufferSize: Int) {
def withTcpWriteBufferSize(value: Int): IOSettings = copy(tcpWriteBufferSize = value)
private def copy(tcpWriteBufferSize: Int = tcpWriteBufferSize): IOSettings = new IOSettings(
tcpWriteBufferSize = tcpWriteBufferSize)
override def equals(other: Any): Boolean = other match {
case s: IOSettings s.tcpWriteBufferSize == tcpWriteBufferSize
case _ false
}
override def toString =
s"""IoSettings(${tcpWriteBufferSize})"""
}
object StreamSubscriptionTimeoutSettings {

View file

@ -801,7 +801,7 @@ private final case class SavedIslandData(islandGlobalOffset: Int, lastVisitedOff
*/
@InternalApi private[akka] final class SinkModulePhase(materializer: PhasedFusingActorMaterializer, islandName: String)
extends PhaseIsland[AnyRef] {
override def name: String = s"SourceModule phase"
override def name: String = s"SinkModule phase"
var subscriberOrVirtualPublisher: AnyRef = _
override def materializeAtomic(mod: AtomicModule[Shape, Any], attributes: Attributes): (AnyRef, Any) = {

View file

@ -606,7 +606,7 @@ import scala.collection.JavaConverters._
extends GraphStage[SinkShape[T]] {
import SubSink._
private val in = Inlet[T]("SubSink.in")
private val in = Inlet[T](s"SubSink($name).in")
override def initialAttributes = Attributes.name(s"SubSink($name)")
override val shape = SinkShape(in)
@ -676,7 +676,7 @@ import scala.collection.JavaConverters._
extends GraphStage[SourceShape[T]] {
import SubSink._
val out: Outlet[T] = Outlet("SubSource.out")
val out: Outlet[T] = Outlet(s"SubSource($name).out")
override def initialAttributes = Attributes.name(s"SubSource($name)")
override val shape: SourceShape[T] = SourceShape(out)

View file

@ -36,7 +36,8 @@ import scala.concurrent.{ Future, Promise }
val options: immutable.Traversable[SocketOption],
val halfClose: Boolean,
val idleTimeout: Duration,
val bindShutdownTimeout: FiniteDuration)
val bindShutdownTimeout: FiniteDuration,
val ioSettings: IOSettings)
extends GraphStageWithMaterializedValue[SourceShape[StreamTcp.IncomingConnection], Future[StreamTcp.ServerBinding]] {
import ConnectionSourceStage._
@ -114,7 +115,7 @@ import scala.concurrent.{ Future, Promise }
connectionFlowsAwaitingInitialization.incrementAndGet()
val tcpFlow =
Flow.fromGraph(new IncomingConnectionStage(connection, connected.remoteAddress, halfClose))
Flow.fromGraph(new IncomingConnectionStage(connection, connected.remoteAddress, halfClose, ioSettings))
.via(detacher[ByteString]) // must read ahead for proper completions
.mapMaterializedValue { m
connectionFlowsAwaitingInitialization.decrementAndGet()
@ -176,13 +177,16 @@ private[stream] object ConnectionSourceStage {
trait TcpRole {
def halfClose: Boolean
def ioSettings: IOSettings
}
case class Outbound(
manager: ActorRef,
connectCmd: Connect,
localAddressPromise: Promise[InetSocketAddress],
halfClose: Boolean) extends TcpRole
case class Inbound(connection: ActorRef, halfClose: Boolean) extends TcpRole
halfClose: Boolean,
ioSettings: IOSettings) extends TcpRole
case class Inbound(connection: ActorRef, halfClose: Boolean, ioSettings: IOSettings) extends TcpRole
/*
* This is a *non-detached* design, i.e. this does not prefetch itself any of the inputs. It relies on downstream
@ -198,6 +202,11 @@ private[stream] object ConnectionSourceStage {
private def bytesOut = shape.out
private var connection: ActorRef = _
private val writeBufferSize = role.ioSettings.tcpWriteBufferSize
private var writeBuffer = ByteString.empty
private var writeInProgress = false
private var connectionClosePending = false
// No reading until role have been decided
setHandler(bytesOut, new OutHandler {
override def onPull(): Unit = ()
@ -206,13 +215,13 @@ private[stream] object ConnectionSourceStage {
override def preStart(): Unit = {
setKeepGoing(true)
role match {
case Inbound(conn, _)
case Inbound(conn, _, _)
setHandler(bytesOut, readHandler)
connection = conn
getStageActor(connected).watch(connection)
connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false)
pull(bytesIn)
case ob @ Outbound(manager, cmd, _, _)
case ob @ Outbound(manager, cmd, _, _, _)
getStageActor(connecting(ob)).watch(manager)
manager ! cmd
}
@ -238,9 +247,30 @@ private[stream] object ConnectionSourceStage {
}
private def connected(evt: (ActorRef, Any)): Unit = {
val sender = evt._1
val msg = evt._2
msg match {
case Received(data)
// Keep on reading even when closed. There is no "close-read-side" in TCP
if (isClosed(bytesOut)) connection ! ResumeReading
else push(bytesOut, data)
case WriteAck
if (writeBuffer.isEmpty)
writeInProgress = false
else {
connection ! Write(writeBuffer, WriteAck)
writeInProgress = true
writeBuffer = ByteString.empty
}
if (!writeInProgress && connectionClosePending) {
// continue onUpstreamFinish
closeConnection()
}
if (!isClosed(bytesIn) && !hasBeenPulled(bytesIn))
pull(bytesIn)
case Terminated(_) failStage(new StreamTcpException("The connection actor has terminated. Stopping now."))
case f @ CommandFailed(cmd) failStage(new StreamTcpException(s"Tcp command [$cmd] failed${f.causedByString}"))
case ErrorClosed(cause) failStage(new StreamTcpException(s"The connection closed with error: $cause"))
@ -249,15 +279,27 @@ private[stream] object ConnectionSourceStage {
case ConfirmedClosed completeStage()
case PeerClosed complete(bytesOut)
case Received(data)
// Keep on reading even when closed. There is no "close-read-side" in TCP
if (isClosed(bytesOut)) connection ! ResumeReading
else push(bytesOut, data)
case WriteAck if (!isClosed(bytesIn)) pull(bytesIn)
}
}
private def closeConnection(): Unit = {
// Note that if there are pending bytes in the writeBuffer those must be written first.
if (isClosed(bytesOut) || !role.halfClose) {
// Reading has stopped before, either because of cancel, or PeerClosed, so just Close now
// (or half-close is turned off)
if (writeInProgress)
connectionClosePending = true // will continue when WriteAck is received and writeBuffer drained
else
connection ! Close
} else if (connection != null) {
// We still read, so we only close the write side
if (writeInProgress)
connectionClosePending = true // will continue when WriteAck is received and writeBuffer drained
else
connection ! ConfirmedClose
} else completeStage()
}
val readHandler = new OutHandler {
override def onPull(): Unit = {
connection ! ResumeReading
@ -276,17 +318,20 @@ private[stream] object ConnectionSourceStage {
override def onPush(): Unit = {
val elem = grab(bytesIn)
ReactiveStreamsCompliance.requireNonNullElement(elem)
connection ! Write(elem.asInstanceOf[ByteString], WriteAck)
if (writeInProgress) {
writeBuffer = writeBuffer ++ elem
} else {
connection ! Write(writeBuffer ++ elem, WriteAck)
writeInProgress = true
writeBuffer = ByteString.empty
}
if (writeBuffer.size < writeBufferSize)
pull(bytesIn)
}
override def onUpstreamFinish(): Unit = {
// Reading has stopped before, either because of cancel, or PeerClosed, so just Close now
// (or half-close is turned off)
if (isClosed(bytesOut) || !role.halfClose) connection ! Close
// We still read, so we only close the write side
else if (connection != null) connection ! ConfirmedClose
else completeStage()
}
override def onUpstreamFinish(): Unit =
closeConnection()
override def onUpstreamFailure(ex: Throwable): Unit = {
if (connection != null) {
@ -302,18 +347,20 @@ private[stream] object ConnectionSourceStage {
})
override def postStop(): Unit = role match {
case Outbound(_, _, localAddressPromise, _)
case Outbound(_, _, localAddressPromise, _, _)
// Fail if has not been completed with an address earlier
localAddressPromise.tryFailure(new StreamTcpException("Connection failed."))
case _ // do nothing...
}
writeBuffer = ByteString.empty
}
}
/**
* INTERNAL API
*/
@InternalApi private[akka] class IncomingConnectionStage(connection: ActorRef, remoteAddress: InetSocketAddress, halfClose: Boolean)
@InternalApi private[akka] class IncomingConnectionStage(
connection: ActorRef, remoteAddress: InetSocketAddress, halfClose: Boolean, ioSettings: IOSettings)
extends GraphStage[FlowShape[ByteString, ByteString]] {
import TcpConnectionStage._
@ -328,7 +375,7 @@ private[stream] object ConnectionSourceStage {
if (hasBeenCreated.get) throw new IllegalStateException("Cannot materialize an incoming connection Flow twice.")
hasBeenCreated.set(true)
new TcpStreamLogic(shape, Inbound(connection, halfClose), remoteAddress)
new TcpStreamLogic(shape, Inbound(connection, halfClose, ioSettings), remoteAddress)
}
override def toString = s"TCP-from($remoteAddress)"
@ -343,7 +390,8 @@ private[stream] object ConnectionSourceStage {
localAddress: Option[InetSocketAddress] = None,
options: immutable.Traversable[SocketOption] = Nil,
halfClose: Boolean = true,
connectTimeout: Duration = Duration.Inf)
connectTimeout: Duration = Duration.Inf,
ioSettings: IOSettings)
extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[StreamTcp.OutgoingConnection]] {
import TcpConnectionStage._
@ -365,7 +413,8 @@ private[stream] object ConnectionSourceStage {
manager,
Connect(remoteAddress, localAddress, options, connTimeout, pullMode = true),
localAddressPromise,
halfClose),
halfClose,
ioSettings),
remoteAddress)
(logic, localAddressPromise.future.map(OutgoingConnection(remoteAddress, _))(ExecutionContexts.sameThreadExecutionContext))

View file

@ -317,6 +317,15 @@ object Source {
new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num strategy.apply(num)))
}
/**
* Combines two sources with fan-in strategy like `Merge` or `Concat` and returns `Source` with a materialized value.
*/
def combineMat[T, U, M1, M2, M](first: Source[T, M1], second: Source[T, M2],
strategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]],
combine: function.Function2[M1, M2, M]): Source[U, M] = {
new Source(scaladsl.Source.combineMat(first.asScala, second.asScala)(num strategy.apply(num))(combinerToScala(combine)))
}
/**
* Combine the elements of multiple streams into a stream of lists.
*/

View file

@ -448,6 +448,19 @@ object Source {
combineRest(2, rest.iterator)
})
/**
* Combines two sources with fan-in strategy like `Merge` or `Concat` and returns `Source` with a materialized value.
*/
def combineMat[T, U, M1, M2, M](first: Source[T, M1], second: Source[T, M2])(strategy: Int Graph[UniformFanInShape[T, U], NotUsed])(matF: (M1, M2) M): Source[U, M] = {
val secondPartiallyCombined = GraphDSL.create(second) { implicit b secondShape
import GraphDSL.Implicits._
val c = b.add(strategy(2))
secondShape ~> c.in(1)
FlowShape(c.in(0), c.out)
}
first.viaMat(secondPartiallyCombined)(matF)
}
/**
* Combine the elements of multiple streams into a stream of sequences.
*/

View file

@ -65,8 +65,10 @@ object Tcp extends ExtensionId[Tcp] with ExtensionIdProvider {
final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
import Tcp._
private val settings = ActorMaterializerSettings(system)
// TODO maybe this should be a new setting, like `akka.stream.tcp.bind.timeout` / `shutdown-timeout` instead?
val bindShutdownTimeout = ActorMaterializer()(system).settings.subscriptionTimeoutSettings.timeout
val bindShutdownTimeout = settings.subscriptionTimeoutSettings.timeout
/**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`.
@ -103,7 +105,8 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
options,
halfClose,
idleTimeout,
bindShutdownTimeout))
bindShutdownTimeout,
settings.ioSettings))
/**
* Creates a [[Tcp.ServerBinding]] instance which represents a prospective TCP server binding on the given `endpoint`
@ -175,7 +178,8 @@ final class Tcp(system: ExtendedActorSystem) extends akka.actor.Extension {
localAddress,
options,
halfClose,
connectTimeout)).via(detacher[ByteString]) // must read ahead for proper completions
connectTimeout,
settings.ioSettings)).via(detacher[ByteString]) // must read ahead for proper completions
idleTimeout match {
case d: FiniteDuration tcpFlow.join(TcpIdleTimeout(d, Some(remoteAddress)))

View file

@ -929,7 +929,7 @@ object TestKit {
val msg = "Failed to stop [%s] within [%s] \n%s".format(actorSystem.name, duration,
actorSystem.asInstanceOf[ActorSystemImpl].printTree)
if (verifySystemShutdown) throw new RuntimeException(msg)
else actorSystem.log.warning(msg)
else println(msg)
}
}
}

View file

@ -53,14 +53,14 @@ object ClusterShardingPersistenceSpec {
PersistentActor.persistentEntity[Command, String, String](
persistenceIdFromActorName = name "Test-" + name,
initialState = "",
actions = Actions((ctx, cmd, state) cmd match {
case Add(s) Persist(s)
commandHandler = CommandHandler((ctx, state, cmd) cmd match {
case Add(s) Effect.persist(s)
case Get(replyTo)
replyTo ! state
PersistNothing()
case StopPlz Stop()
Effect.done
case StopPlz Effect.stop
}),
applyEvent = (evt, state) if (state.isEmpty) evt else state + "|" + evt)
eventHandler = (state, evt) if (state.isEmpty) evt else state + "|" + evt)
val typeKey = EntityTypeKey[Command]("test")

View file

@ -31,12 +31,12 @@ object PersistentActorCompileOnlyTest {
initialState = ExampleState(Nil),
actions = Actions.command {
case Cmd(data) Persist(Evt(data))
commandHandler = CommandHandler.command {
case Cmd(data) Effect.persist(Evt(data))
},
applyEvent = {
case (Evt(data), state) state.copy(data :: state.events)
eventHandler = {
case (state, Evt(data)) state.copy(data :: state.events)
})
}
@ -56,14 +56,14 @@ object PersistentActorCompileOnlyTest {
initialState = ExampleState(Nil),
actions = Actions.command {
commandHandler = CommandHandler.command {
case Cmd(data, sender)
Persist(Evt(data))
Effect.persist(Evt(data))
.andThen { sender ! Ack }
},
applyEvent = {
case (Evt(data), state) state.copy(data :: state.events)
eventHandler = {
case (state, Evt(data)) state.copy(data :: state.events)
})
}
@ -98,16 +98,16 @@ object PersistentActorCompileOnlyTest {
initialState = EventsInFlight(0, Map.empty),
actions = Actions((ctx, cmd, state) cmd match {
commandHandler = CommandHandler((ctx, state, cmd) cmd match {
case DoSideEffect(data)
Persist(IntentRecorded(state.nextCorrelationId, data)).andThen {
Effect.persist(IntentRecorded(state.nextCorrelationId, data)).andThen {
performSideEffect(ctx.self, state.nextCorrelationId, data)
}
case AcknowledgeSideEffect(correlationId)
Persist(SideEffectAcknowledged(correlationId))
Effect.persist(SideEffectAcknowledged(correlationId))
}),
applyEvent = (evt, state) evt match {
eventHandler = (state, evt) evt match {
case IntentRecorded(correlationId, data)
EventsInFlight(
nextCorrelationId = correlationId + 1,
@ -119,7 +119,6 @@ object PersistentActorCompileOnlyTest {
state.dataByCorrelationId.foreach {
case (correlationId, data) performSideEffect(ctx.self, correlationId, data)
}
state
}
}
@ -140,22 +139,22 @@ object PersistentActorCompileOnlyTest {
val b: Behavior[Command] = PersistentActor.immutable[Command, Event, Mood](
persistenceId = "myPersistenceId",
initialState = Happy,
actions = Actions.byState {
case Happy Actions.command {
commandHandler = CommandHandler.byState {
case Happy CommandHandler.command {
case Greet(whom)
println(s"Super happy to meet you $whom!")
PersistNothing()
case MoodSwing Persist(MoodChanged(Sad))
Effect.done
case MoodSwing Effect.persist(MoodChanged(Sad))
}
case Sad Actions.command {
case Sad CommandHandler.command {
case Greet(whom)
println(s"hi $whom")
PersistNothing()
case MoodSwing Persist(MoodChanged(Happy))
Effect.done
case MoodSwing Effect.persist(MoodChanged(Happy))
}
},
applyEvent = {
case (MoodChanged(to), _) to
eventHandler = {
case (_, MoodChanged(to)) to
})
// FIXME this doesn't work, wrapping is not supported
@ -181,11 +180,11 @@ object PersistentActorCompileOnlyTest {
PersistentActor.immutable[Command, Event, State](
persistenceId = "asdf",
initialState = State(Nil),
actions = Actions.command {
case RegisterTask(task) Persist(TaskRegistered(task))
case TaskDone(task) Persist(TaskRemoved(task))
commandHandler = CommandHandler.command {
case RegisterTask(task) Effect.persist(TaskRegistered(task))
case TaskDone(task) Effect.persist(TaskRemoved(task))
},
applyEvent = (evt, state) evt match {
eventHandler = (state, evt) evt match {
case TaskRegistered(task) State(task :: state.tasksInFlight)
case TaskRemoved(task) State(state.tasksInFlight.filter(_ != task))
}).snapshotOnState(_.tasksInFlight.isEmpty)
@ -208,16 +207,17 @@ object PersistentActorCompileOnlyTest {
PersistentActor.immutable[Command, Event, State](
persistenceId = "asdf",
initialState = State(Nil),
actions = Actions((ctx, cmd, _) cmd match {
case RegisterTask(task) Persist(TaskRegistered(task))
.andThen {
val child = ctx.spawn[Nothing](worker(task), task)
// This assumes *any* termination of the child may trigger a `TaskDone`:
ctx.watchWith(child, TaskDone(task))
}
case TaskDone(task) Persist(TaskRemoved(task))
commandHandler = CommandHandler((ctx, _, cmd) cmd match {
case RegisterTask(task)
Effect.persist(TaskRegistered(task))
.andThen {
val child = ctx.spawn[Nothing](worker(task), task)
// This assumes *any* termination of the child may trigger a `TaskDone`:
ctx.watchWith(child, TaskDone(task))
}
case TaskDone(task) Effect.persist(TaskRemoved(task))
}),
applyEvent = (evt, state) evt match {
eventHandler = (state, evt) evt match {
case TaskRegistered(task) State(task :: state.tasksInFlight)
case TaskRemoved(task) State(state.tasksInFlight.filter(_ != task))
})
@ -239,21 +239,21 @@ object PersistentActorCompileOnlyTest {
persistenceId = "asdf",
initialState = State(Nil),
// The 'onSignal' seems to break type inference here.. not sure if that can be avoided?
actions = Actions[RegisterTask, Event, State]((ctx, cmd, state) cmd match {
case RegisterTask(task) Persist(TaskRegistered(task))
commandHandler = CommandHandler[RegisterTask, Event, State]((ctx, state, cmd) cmd match {
case RegisterTask(task) Effect.persist(TaskRegistered(task))
.andThen {
val child = ctx.spawn[Nothing](worker(task), task)
// This assumes *any* termination of the child may trigger a `TaskDone`:
ctx.watch(child)
}
}).onSignal {
case (ctx, Terminated(actorRef), _)
case (ctx, _, Terminated(actorRef))
// watchWith (as in the above example) is nicer because it means we don't have to
// 'manually' associate the task and the child actor, but we wanted to demonstrate
// signals here:
Persist(TaskRemoved(actorRef.path.name))
Effect.persist(TaskRemoved(actorRef.path.name))
},
applyEvent = (evt, state) evt match {
eventHandler = (state, evt) evt match {
case TaskRegistered(task) State(task :: state.tasksInFlight)
case TaskRemoved(task) State(state.tasksInFlight.filter(_ != task))
})
@ -305,38 +305,42 @@ object PersistentActorCompileOnlyTest {
PersistentActor.immutable[Command, Event, List[Id]](
persistenceId = "basket-1",
initialState = Nil,
actions =
Actions.byState(state
if (isFullyHydrated(basket, state)) Actions { (ctx, cmd, state)
commandHandler =
CommandHandler.byState(state
if (isFullyHydrated(basket, state)) CommandHandler { (ctx, state, cmd)
cmd match {
case AddItem(id) addItem(id, ctx.self)
case RemoveItem(id) Persist(ItemRemoved(id))
case RemoveItem(id) Effect.persist(ItemRemoved(id))
case GotMetaData(data)
basket = basket.updatedWith(data); PersistNothing()
case GetTotalPrice(sender) sender ! basket.items.map(_.price).sum; PersistNothing()
basket = basket.updatedWith(data)
Effect.done
case GetTotalPrice(sender)
sender ! basket.items.map(_.price).sum
Effect.done
}
}
else Actions { (ctx, cmd, state)
else CommandHandler { (ctx, state, cmd)
cmd match {
case AddItem(id) addItem(id, ctx.self)
case RemoveItem(id) Persist(ItemRemoved(id))
case RemoveItem(id) Effect.persist(ItemRemoved(id))
case GotMetaData(data)
basket = basket.updatedWith(data)
if (isFullyHydrated(basket, state)) {
stash.foreach(ctx.self ! _)
stash = Nil
}
PersistNothing()
case cmd: GetTotalPrice stash :+= cmd; PersistNothing()
Effect.done
case cmd: GetTotalPrice
stash :+= cmd
Effect.done
}
}),
applyEvent = (evt, state) evt match {
eventHandler = (state, evt) evt match {
case ItemAdded(id) id +: state
case ItemRemoved(id) state.filter(_ != id)
}).onRecoveryCompleted((ctx, state) {
val ad = ctx.spawnAdapter((m: MetaData) GotMetaData(m))
state.foreach(id metadataRegistry ! GetMetaData(id, ad))
state
})
}
}
@ -358,17 +362,17 @@ object PersistentActorCompileOnlyTest {
case class Remembered(memory: String) extends Event
def changeMoodIfNeeded(currentState: Mood, newMood: Mood): Effect[Event, Mood] =
if (currentState == newMood) PersistNothing()
else Persist(MoodChanged(newMood))
if (currentState == newMood) Effect.done
else Effect.persist(MoodChanged(newMood))
PersistentActor.immutable[Command, Event, Mood](
persistenceId = "myPersistenceId",
initialState = Sad,
actions = Actions { (_, cmd, state)
commandHandler = CommandHandler { (_, state, cmd)
cmd match {
case Greet(whom)
println(s"Hi there, I'm $state!")
PersistNothing()
Effect.done
case CheerUp(sender)
changeMoodIfNeeded(state, Happy)
.andThen { sender ! Ack }
@ -376,15 +380,13 @@ object PersistentActorCompileOnlyTest {
// A more elaborate example to show we still have full control over the effects
// if needed (e.g. when some logic is factored out but you want to add more effects)
val commonEffects = changeMoodIfNeeded(state, Happy)
CompositeEffect(
PersistAll[Event, Mood](commonEffects.events :+ Remembered(memory)),
commonEffects.sideEffects)
Effect.persistAll(commonEffects.events :+ Remembered(memory), commonEffects.sideEffects)
}
},
applyEvent = {
case (MoodChanged(to), _) to
case (Remembered(_), state) state
eventHandler = {
case (_, MoodChanged(to)) to
case (state, Remembered(_)) state
})
}
@ -396,22 +398,20 @@ object PersistentActorCompileOnlyTest {
sealed trait Event
case object Done extends Event
type State = Unit
class State
PersistentActor.immutable[Command, Event, State](
persistenceId = "myPersistenceId",
initialState = (),
actions = Actions { (_, cmd, _)
cmd match {
case Enough
Persist(Done)
.andThen(
SideEffect(_ println("yay")),
Stop())
}
initialState = new State,
commandHandler = CommandHandler.command {
case Enough
Effect.persist(Done)
.andThen(println("yay"))
.andThenStop
},
applyEvent = {
case (Done, _) ()
eventHandler = {
case (state, Done) state
})
}

View file

@ -45,12 +45,12 @@ object PersistentActorSpec {
PersistentActor.immutable[Command, Event, State](
persistenceId,
initialState = State(0, Vector.empty),
actions = Actions[Command, Event, State]((ctx, cmd, state) cmd match {
commandHandler = CommandHandler[Command, Event, State]((ctx, state, cmd) cmd match {
case Increment
Persist(Incremented(1))
Effect.persist(Incremented(1))
case GetValue(replyTo)
replyTo ! state
PersistNothing()
Effect.done
case IncrementLater
// purpose is to test signals
val delay = ctx.spawnAnonymous(Actor.withTimers[Tick.type] { timers
@ -60,19 +60,19 @@ object PersistentActorSpec {
})
})
ctx.watch(delay)
PersistNothing()
Effect.done
case IncrementAfterReceiveTimeout
ctx.setReceiveTimeout(10.millis, Timeout)
PersistNothing()
Effect.done
case Timeout
ctx.cancelReceiveTimeout()
Persist(Incremented(100))
Effect.persist(Incremented(100))
})
.onSignal {
case (_, Terminated(_), _)
Persist(Incremented(10))
case (_, _, Terminated(_))
Effect.persist(Incremented(10))
},
applyEvent = (evt, state) evt match {
eventHandler = (state, evt) evt match {
case Incremented(delta)
State(state.value + delta, state.history :+ state.value)
})

View file

@ -29,7 +29,7 @@ trait ActorRef[-T] extends java.lang.Comparable[ActorRef[_]] {
def tell(msg: T): Unit
/**
* Narrow the type of this `ActorRef, which is always a safe operation.
* Narrow the type of this `ActorRef`, which is always a safe operation.
*/
def narrow[U <: T]: ActorRef[U]

View file

@ -50,9 +50,9 @@ import akka.typed.internal.adapter.ActorRefAdapter
private var state: S = behavior.initialState
private val actions: Actions[C, E, S] = behavior.actions
private val commandHandler: CommandHandler[C, E, S] = behavior.commandHandler
private val eventHandler: (E, S) S = behavior.applyEvent
private val eventHandler: (S, E) S = behavior.eventHandler
private val ctxAdapter = new ActorContextAdapter[C](context)
private val ctx = ctxAdapter.asScala
@ -62,17 +62,17 @@ import akka.typed.internal.adapter.ActorRefAdapter
state = snapshot.asInstanceOf[S]
case RecoveryCompleted
state = behavior.recoveryCompleted(ctx, state)
behavior.recoveryCompleted(ctx, state)
case event: E @unchecked
state = applyEvent(state, event)
}
def applyEvent(s: S, event: E): S =
eventHandler.apply(event, s)
eventHandler.apply(s, event)
private val unhandledSignal: PartialFunction[(ActorContext[C], Signal, S), Effect[E, S]] = {
case sig Unhandled()
private val unhandledSignal: PartialFunction[(ActorContext[C], S, Signal), Effect[E, S]] = {
case sig Effect.unhandled
}
override def receiveCommand: Receive = {
@ -84,13 +84,13 @@ import akka.typed.internal.adapter.ActorRefAdapter
val effects = msg match {
case a.Terminated(ref)
val sig = Terminated(ActorRefAdapter(ref))(null)
actions.sigHandler(state).applyOrElse((ctx, sig, state), unhandledSignal)
commandHandler.sigHandler(state).applyOrElse((ctx, state, sig), unhandledSignal)
case a.ReceiveTimeout
actions.commandHandler(ctx, ctxAdapter.receiveTimeoutMsg, state)
commandHandler.commandHandler(ctx, state, ctxAdapter.receiveTimeoutMsg)
// TODO note that PostStop and PreRestart signals are not handled, we wouldn't be able to persist there
case cmd: C @unchecked
// FIXME we could make it more safe by using ClassTag for C
actions.commandHandler(ctx, cmd, state)
commandHandler.commandHandler(ctx, state, cmd)
}
applyEffects(msg, effects)
@ -121,16 +121,16 @@ import akka.typed.internal.adapter.ActorRefAdapter
persistAll(scala.collection.immutable.Seq(events)) { _
sideEffects.foreach(applySideEffect)
}
case PersistNothing()
case Unhandled()
case _: PersistNothing.type @unchecked
case _: Unhandled.type @unchecked
super.unhandled(msg)
case c: ChainableEffect[_, S]
applySideEffect(c)
}
def applySideEffect(effect: ChainableEffect[_, S]): Unit = effect match {
case Stop() context.stop(self)
case SideEffect(callbacks) callbacks.apply(state)
case _: Stop.type @unchecked context.stop(self)
case SideEffect(callbacks) callbacks.apply(state)
}
}

View file

@ -4,23 +4,23 @@
package akka.typed.persistence.scaladsl
import scala.collection.{ immutable im }
import akka.annotation.DoNotInherit
import akka.annotation.InternalApi
import akka.annotation.{ ApiMayChange, DoNotInherit, InternalApi }
import akka.typed.Behavior.UntypedBehavior
import akka.typed.Signal
import akka.typed.persistence.internal.PersistentActorImpl
import akka.typed.scaladsl.ActorContext
object PersistentActor {
/**
* Create a `Behavior` for a persistent actor.
*/
def immutable[Command, Event, State](
persistenceId: String,
initialState: State,
actions: Actions[Command, Event, State],
applyEvent: (Event, State) State): PersistentBehavior[Command, Event, State] =
persistentEntity(_ persistenceId, initialState, actions, applyEvent)
persistenceId: String,
initialState: State,
commandHandler: CommandHandler[Command, Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
persistentEntity(_ persistenceId, initialState, commandHandler, eventHandler)
/**
* Create a `Behavior` for a persistent actor in Cluster Sharding, when the persistenceId is not known
@ -32,125 +32,186 @@ object PersistentActor {
def persistentEntity[Command, Event, State](
persistenceIdFromActorName: String String,
initialState: State,
actions: Actions[Command, Event, State],
applyEvent: (Event, State) State): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(persistenceIdFromActorName, initialState, actions, applyEvent,
recoveryCompleted = (_, state) state)
commandHandler: CommandHandler[Command, Event, State],
eventHandler: (State, Event) State): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(persistenceIdFromActorName, initialState, commandHandler, eventHandler,
recoveryCompleted = (_, _) ())
sealed abstract class Effect[+Event, State]() {
/**
* Factories for effects - how a persitent actor reacts on a command
*/
object Effect {
def persist[Event, State](event: Event): Effect[Event, State] =
new Persist[Event, State](event)
def persistAll[Event, State](events: im.Seq[Event]): Effect[Event, State] =
new PersistAll[Event, State](events)
def persistAll[Event, State](events: im.Seq[Event], sideEffects: im.Seq[ChainableEffect[Event, State]]): Effect[Event, State] =
new CompositeEffect[Event, State](Some(new PersistAll[Event, State](events)), sideEffects)
/**
* Do not persist anything
*/
def done[Event, State]: Effect[Event, State] = PersistNothing.asInstanceOf[Effect[Event, State]]
/**
* This command is not handled, but it is not an error that it isn't.
*/
def unhandled[Event, State]: Effect[Event, State] = Unhandled.asInstanceOf[Effect[Event, State]]
/**
* Stop this persistent actor
*/
def stop[Event, State]: ChainableEffect[Event, State] = Stop.asInstanceOf[ChainableEffect[Event, State]]
}
/**
* Instances are created through the factories in the [[Effect]] companion object.
*
* Not for user extension.
*/
@DoNotInherit
sealed abstract class Effect[+Event, State] {
/* All events that will be persisted in this effect */
def events: im.Seq[Event] = Nil
/* All side effects that will be performed in this effect */
def sideEffects: im.Seq[ChainableEffect[_, State]] =
if (isInstanceOf[ChainableEffect[_, State]]) im.Seq(asInstanceOf[ChainableEffect[_, State]])
else Nil
def andThen(sideEffects: ChainableEffect[_, State]*): Effect[Event, State] =
CompositeEffect(if (events.isEmpty) None else Some(this), sideEffects.toList)
def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] = Nil
/** Convenience method to register a side effect with just a callback function */
def andThen(callback: State Unit): Effect[Event, State] =
andThen(SideEffect[Event, State](callback))
CompositeEffect(this, SideEffect[Event, State](callback))
/** Convenience method to register a side effect with just a lazy expression */
def andThen(callback: Unit): Effect[Event, State] =
andThen(SideEffect[Event, State]((_: State) callback))
CompositeEffect(this, SideEffect[Event, State]((_: State) callback))
/** The side effect is to stop the actor */
def andThenStop: Effect[Event, State] =
CompositeEffect(this, Effect.stop[Event, State])
}
case class CompositeEffect[Event, State](persistingEffect: Option[Effect[Event, State]], override val sideEffects: im.Seq[ChainableEffect[_, State]]) extends Effect[Event, State] {
@InternalApi
private[akka] object CompositeEffect {
def apply[Event, State](effect: Effect[Event, State], sideEffects: ChainableEffect[Event, State]): Effect[Event, State] =
CompositeEffect[Event, State](
if (effect.events.isEmpty) None else Some(effect),
sideEffects :: Nil)
}
@InternalApi
private[akka] final case class CompositeEffect[Event, State](
persistingEffect: Option[Effect[Event, State]],
_sideEffects: im.Seq[ChainableEffect[Event, State]]) extends Effect[Event, State] {
override val events = persistingEffect.map(_.events).getOrElse(Nil)
override def andThen(additionalSideEffects: ChainableEffect[_, State]*): CompositeEffect[Event, State] =
copy(sideEffects = sideEffects ++ additionalSideEffects)
}
object CompositeEffect {
def apply[Event, State](persistAll: PersistAll[Event, State], sideEffects: im.Seq[ChainableEffect[_, State]]): CompositeEffect[Event, State] =
CompositeEffect(Some(persistAll), sideEffects)
override def sideEffects[E >: Event]: im.Seq[ChainableEffect[E, State]] = _sideEffects.asInstanceOf[im.Seq[ChainableEffect[E, State]]]
}
case class PersistNothing[Event, State]() extends Effect[Event, State]
@InternalApi
private[akka] case object PersistNothing extends Effect[Nothing, Nothing]
case class Persist[Event, State](event: Event) extends Effect[Event, State] {
@InternalApi
private[akka] case class Persist[Event, State](event: Event) extends Effect[Event, State] {
override val events = event :: Nil
}
case class PersistAll[Event, State](override val events: im.Seq[Event]) extends Effect[Event, State]
trait ChainableEffect[Event, State] {
self: Effect[Event, State]
}
case class SideEffect[Event, State](effect: State Unit) extends Effect[Event, State] with ChainableEffect[Event, State]
case class Stop[Event, State]() extends Effect[Event, State] with ChainableEffect[Event, State]()
case class Unhandled[Event, State]() extends Effect[Event, State]
type CommandHandler[Command, Event, State] = Function3[ActorContext[Command], Command, State, Effect[Event, State]]
type SignalHandler[Command, Event, State] = PartialFunction[(ActorContext[Command], Signal, State), Effect[Event, State]]
@InternalApi
private[akka] case class PersistAll[Event, State](override val events: im.Seq[Event]) extends Effect[Event, State]
/**
* `Actions` defines command handlers and partial function for other signals,
* Not for user extension
*/
@DoNotInherit
sealed abstract class ChainableEffect[Event, State] extends Effect[Event, State]
@InternalApi
private[akka] case class SideEffect[Event, State](effect: State Unit) extends ChainableEffect[Event, State]
@InternalApi
private[akka] case object Stop extends ChainableEffect[Nothing, Nothing]
@InternalApi
private[akka] case object Unhandled extends Effect[Nothing, Nothing]
type CommandToEffect[Command, Event, State] = (ActorContext[Command], State, Command) Effect[Event, State]
type SignalHandler[Command, Event, State] = PartialFunction[(ActorContext[Command], State, Signal), Effect[Event, State]]
/**
* The `CommandHandler` defines how to act on commands and partial function for other signals,
* e.g. `Termination` messages if `watch` is used.
*
* Note that you can have different actions based on current state by using
* [[Actions#byState]].
* Note that you can have different command handlers based on current state by using
* [[CommandHandler#byState]].
*/
object Actions {
def apply[Command, Event, State](commandHandler: CommandHandler[Command, Event, State]): Actions[Command, Event, State] =
new Actions(commandHandler, Map.empty)
object CommandHandler {
/**
* Create a command handler that will be applied for commands.
*
* @see [[Effect]] for possible effects of a command.
*/
// Note: using full parameter type instead of type aliase here to make API more straight forward to figure out in an IDE
def apply[Command, Event, State](commandHandler: (ActorContext[Command], State, Command) Effect[Event, State]): CommandHandler[Command, Event, State] =
new CommandHandler(commandHandler, Map.empty)
/**
* Convenience for simple commands that don't need the state and context.
*
* @see [[Effect]] for possible effects of a command.
*/
def command[Command, Event, State](commandHandler: Command Effect[Event, State]): Actions[Command, Event, State] =
apply((_, cmd, _) commandHandler(cmd))
def command[Command, Event, State](commandHandler: Command Effect[Event, State]): CommandHandler[Command, Event, State] =
apply((_, _, cmd) commandHandler(cmd))
/**
* Select different actions based on current state.
* Select different command handlers based on current state.
*/
def byState[Command, Event, State](choice: State Actions[Command, Event, State]): Actions[Command, Event, State] =
new ByStateActions(choice, signalHandler = PartialFunction.empty)
def byState[Command, Event, State](choice: State CommandHandler[Command, Event, State]): CommandHandler[Command, Event, State] =
new ByStateCommandHandler(choice, signalHandler = PartialFunction.empty)
}
/**
* INTERNAL API
*/
@InternalApi private[akka] final class ByStateActions[Command, Event, State](
choice: State Actions[Command, Event, State],
@InternalApi private[akka] final class ByStateCommandHandler[Command, Event, State](
choice: State CommandHandler[Command, Event, State],
signalHandler: SignalHandler[Command, Event, State])
extends Actions[Command, Event, State](
commandHandler = (ctx, cmd, state) choice(state).commandHandler(ctx, cmd, state),
extends CommandHandler[Command, Event, State](
commandHandler = (ctx, state, cmd) choice(state).commandHandler(ctx, state, cmd),
signalHandler) {
// SignalHandler may be registered in the wrapper or in the wrapped
private[akka] override def sigHandler(state: State): SignalHandler[Command, Event, State] =
choice(state).sigHandler(state).orElse(signalHandler)
// override to preserve the ByStateActions
// override to preserve the ByStateCommandHandler
private[akka] override def withSignalHandler(
handler: SignalHandler[Command, Event, State]): Actions[Command, Event, State] =
new ByStateActions(choice, handler)
handler: SignalHandler[Command, Event, State]): CommandHandler[Command, Event, State] =
new ByStateCommandHandler(choice, handler)
}
/**
* `Actions` defines command handlers and partial function for other signals,
* `CommandHandler` defines command handlers and partial function for other signals,
* e.g. `Termination` messages if `watch` is used.
* `Actions` is an immutable class.
* `CommandHandler` is an immutable class.
*/
@DoNotInherit class Actions[Command, Event, State] private[akka] (
val commandHandler: CommandHandler[Command, Event, State],
@DoNotInherit class CommandHandler[Command, Event, State] private[akka] (
val commandHandler: CommandToEffect[Command, Event, State],
val signalHandler: SignalHandler[Command, Event, State]) {
@InternalApi private[akka] def sigHandler(state: State): SignalHandler[Command, Event, State] =
signalHandler
def onSignal(handler: SignalHandler[Command, Event, State]): Actions[Command, Event, State] =
// Note: using full parameter type instead of type alias here to make API more straight forward to figure out in an IDE
def onSignal(handler: PartialFunction[(ActorContext[Command], State, Signal), Effect[Event, State]]): CommandHandler[Command, Event, State] =
withSignalHandler(signalHandler.orElse(handler))
/** INTERNAL API */
@InternalApi private[akka] def withSignalHandler(
handler: SignalHandler[Command, Event, State]): Actions[Command, Event, State] =
new Actions(commandHandler, handler)
handler: SignalHandler[Command, Event, State]): CommandHandler[Command, Event, State] =
new CommandHandler(commandHandler, handler)
}
@ -159,9 +220,9 @@ object PersistentActor {
class PersistentBehavior[Command, Event, State](
@InternalApi private[akka] val persistenceIdFromActorName: String String,
val initialState: State,
val actions: PersistentActor.Actions[Command, Event, State],
val applyEvent: (Event, State) State,
val recoveryCompleted: (ActorContext[Command], State) State) extends UntypedBehavior[Command] {
val commandHandler: PersistentActor.CommandHandler[Command, Event, State],
val eventHandler: (State, Event) State,
val recoveryCompleted: (ActorContext[Command], State) Unit) extends UntypedBehavior[Command] {
import PersistentActor._
/** INTERNAL API */
@ -171,7 +232,7 @@ class PersistentBehavior[Command, Event, State](
* The `callback` function is called to notify the actor that the recovery process
* is finished.
*/
def onRecoveryCompleted(callback: (ActorContext[Command], State) State): PersistentBehavior[Command, Event, State] =
def onRecoveryCompleted(callback: (ActorContext[Command], State) Unit): PersistentBehavior[Command, Event, State] =
copy(recoveryCompleted = callback)
/**
@ -185,10 +246,10 @@ class PersistentBehavior[Command, Event, State](
def snapshotOn(predicate: (State, Event) Boolean): PersistentBehavior[Command, Event, State] = ???
private def copy(
persistenceIdFromActorName: String String = persistenceIdFromActorName,
initialState: State = initialState,
actions: Actions[Command, Event, State] = actions,
applyEvent: (Event, State) State = applyEvent,
recoveryCompleted: (ActorContext[Command], State) State = recoveryCompleted): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(persistenceIdFromActorName, initialState, actions, applyEvent, recoveryCompleted)
persistenceIdFromActorName: String String = persistenceIdFromActorName,
initialState: State = initialState,
commandHandler: CommandHandler[Command, Event, State] = commandHandler,
eventHandler: (State, Event) State = eventHandler,
recoveryCompleted: (ActorContext[Command], State) Unit = recoveryCompleted): PersistentBehavior[Command, Event, State] =
new PersistentBehavior(persistenceIdFromActorName, initialState, commandHandler, eventHandler, recoveryCompleted)
}

View file

@ -309,6 +309,7 @@ lazy val streamTestkit = akkaModule("akka-stream-testkit")
.dependsOn(stream, testkit % "compile->compile;test->test")
.settings(Dependencies.streamTestkit)
.settings(OSGi.streamTestkit)
.disablePlugins(MimaPlugin)
lazy val streamTests = akkaModule("akka-stream-tests")
.dependsOn(streamTestkit % "test->test", stream)

View file

@ -20,7 +20,7 @@ object Dependencies {
val aeronVersion = "1.3.0"
val Versions = Seq(
crossScalaVersions := Seq("2.11.11", "2.12.3"),
crossScalaVersions := Seq("2.11.11", "2.12.4"),
scalaVersion := System.getProperty("akka.build.scalaVersion", crossScalaVersions.value.head),
scalaStmVersion := sys.props.get("akka.build.scalaStmVersion").getOrElse("0.8"),
scalaCheckVersion := sys.props.get("akka.build.scalaCheckVersion").getOrElse(

View file

@ -18,15 +18,8 @@ object Formatting {
ScalariformKeys.preferences in MultiJvm := setPreferences(ScalariformKeys.preferences.value)
)
lazy val docFormatSettings = Seq(
ScalariformKeys.preferences := setPreferences(ScalariformKeys.preferences.value, rewriteArrowSymbols = false),
ScalariformKeys.preferences in Compile := setPreferences(ScalariformKeys.preferences.value, rewriteArrowSymbols = false),
ScalariformKeys.preferences in Test := setPreferences(ScalariformKeys.preferences.value, rewriteArrowSymbols = false),
ScalariformKeys.preferences in MultiJvm := setPreferences(ScalariformKeys.preferences.value, rewriteArrowSymbols = false)
)
def setPreferences(preferences: IFormattingPreferences, rewriteArrowSymbols: Boolean = true) = preferences
.setPreference(RewriteArrowSymbols, rewriteArrowSymbols)
def setPreferences(preferences: IFormattingPreferences) = preferences
.setPreference(RewriteArrowSymbols, true)
.setPreference(AlignParameters, true)
.setPreference(AlignSingleLineCaseStatements, true)
.setPreference(DoubleIndentConstructorArguments, false)

View file

@ -46,7 +46,7 @@ object MiMa extends AutoPlugin {
case "2.12"
akka24WithScala212 ++ akka25Versions
case "2.13"
case v if v.startsWith("2.13") =>
// no Akka released for 2.13 yet, no jars to check BC against
Seq.empty
}

View file

@ -42,7 +42,7 @@ object MultiNode extends AutoPlugin {
// -DMultiJvm.akka.cluster.Stress.nrOfNodes=15
val MultinodeJvmArgs = "multinode\\.(D|X)(.*)".r
val knownPrefix = Set("multnode.", "akka.", "MultiJvm.")
val akkaProperties = System.getProperties.propertyNames.asScala.toList.collect {
val akkaProperties = System.getProperties.stringPropertyNames.asScala.toList.collect {
case MultinodeJvmArgs(a, b)
val value = System.getProperty("multinode." + a + b)
"-" + a + b + (if (value == "") "" else "=" + value)

View file

@ -61,11 +61,8 @@ object NoPublish extends AutoPlugin {
override def requires = plugins.JvmPlugin
override def projectSettings = Seq(
publishArtifact := false,
publishArtifact in Compile := false,
publish := {},
skip in publish := true,
publishLocal := {}
sources in (Compile, doc) := Seq.empty
)
}