Cluster aware routers for typed #26355

This commit is contained in:
Johan Andrén 2019-07-15 15:25:00 +02:00 committed by GitHub
parent d2f5d2daa3
commit f0e42d2b9c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 447 additions and 57 deletions

View file

@ -69,16 +69,15 @@ object ReceptionistApiSpec {
// to cover as much of the API as possible // to cover as much of the API as possible
context.system.receptionist ! Receptionist.Register(key, context.self.narrow, context.self.narrow) context.system.receptionist ! Receptionist.Register(key, context.self.narrow, context.self.narrow)
Behaviors.receive { (context, message) => Behaviors.receiveMessage {
message match { case key.Listing(services) =>
case key.Listing(services) => services.foreach(_ ! "woho")
services.foreach(_ ! "woho") Behaviors.same
Behaviors.same case key.Registered(service) => // ack on Register above
case key.Registered(service) => // ack on Register above service ! "woho"
service ! "woho" Behaviors.same
Behaviors.same
}
} }
} }
} }

View file

@ -8,7 +8,10 @@ import java.util.concurrent.atomic.AtomicInteger
import akka.actor.Dropped import akka.actor.Dropped
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.testkit.typed.scaladsl.TestProbe import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior import akka.actor.typed.Behavior
import akka.actor.typed.internal.routing.GroupRouterImpl
import akka.actor.typed.internal.routing.RoutingLogics
import akka.actor.typed.receptionist.Receptionist import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._
@ -212,6 +215,37 @@ class RoutersSpec extends ScalaTestWithActorTestKit("""
} }
"not route to unreachable when there are reachable" in {
val serviceKey = ServiceKey[String]("group-routing-4")
val router = spawn(Behaviors.setup[String](context =>
new GroupRouterImpl(context, serviceKey, new RoutingLogics.RoundRobinLogic[String], true)))
val reachableProbe = createTestProbe[String]
val unreachableProbe = createTestProbe[String]
router
.unsafeUpcast[Any] ! Receptionist.Listing(serviceKey, Set(reachableProbe.ref), Set(unreachableProbe.ref), false)
router ! "one"
router ! "two"
reachableProbe.expectMessage("one")
reachableProbe.expectMessage("two")
}
"route to unreachable when there are no reachable" in {
val serviceKey = ServiceKey[String]("group-routing-4")
val router = spawn(Behaviors.setup[String](context =>
new GroupRouterImpl(context, serviceKey, new RoutingLogics.RoundRobinLogic[String], true)))
val unreachableProbe = createTestProbe[String]
router.unsafeUpcast[Any] ! Receptionist.Listing(
serviceKey,
Set.empty[ActorRef[String]],
Set(unreachableProbe.ref),
true)
router ! "one"
router ! "two"
unreachableProbe.expectMessage("one")
unreachableProbe.expectMessage("two")
}
} }
} }

View file

@ -80,15 +80,21 @@ private[akka] object LocalReceptionist extends ReceptionistBehaviorProvider {
def notifySubscribersFor[T](key: AbstractServiceKey): Unit = { def notifySubscribersFor[T](key: AbstractServiceKey): Unit = {
val newListing = newRegistry.get(key) val newListing = newRegistry.get(key)
subscriptions.get(key).foreach(_ ! ReceptionistMessages.Listing(key.asServiceKey, newListing)) subscriptions
.get(key)
.foreach(
_ ! ReceptionistMessages
.Listing(key.asServiceKey, newListing, newListing, servicesWereAddedOrRemoved = true))
} }
changedKeysHint.foreach(notifySubscribersFor) changedKeysHint.foreach(notifySubscribersFor)
next(newRegistry = newRegistry) next(newRegistry = newRegistry)
} }
def replyWithListing[T](key: ServiceKey[T], replyTo: ActorRef[Listing]): Unit = def replyWithListing[T](key: ServiceKey[T], replyTo: ActorRef[Listing]): Unit = {
replyTo ! ReceptionistMessages.Listing(key, serviceRegistry.get(key)) val listing = serviceRegistry.get(key)
replyTo ! ReceptionistMessages.Listing(key, listing, listing, servicesWereAddedOrRemoved = true)
}
def onCommand(ctx: ActorContext[Any], cmd: Command): Behavior[Any] = cmd match { def onCommand(ctx: ActorContext[Any], cmd: Command): Behavior[Any] = cmd match {
case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) => case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) =>

View file

@ -8,7 +8,6 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.receptionist.Receptionist.Command import akka.actor.typed.receptionist.Receptionist.Command
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.util.ccompat.JavaConverters._ import akka.util.ccompat.JavaConverters._
/** /**
@ -43,7 +42,11 @@ private[akka] object ReceptionistMessages {
final case class Find[T] private[akka] (key: ServiceKey[T], replyTo: ActorRef[Receptionist.Listing]) extends Command final case class Find[T] private[akka] (key: ServiceKey[T], replyTo: ActorRef[Receptionist.Listing]) extends Command
final case class Listing[T] private[akka] (key: ServiceKey[T], _serviceInstances: Set[ActorRef[T]]) final case class Listing[T] private[akka] (
key: ServiceKey[T],
_serviceInstances: Set[ActorRef[T]],
_allServiceInstances: Set[ActorRef[T]],
servicesWereAddedOrRemoved: Boolean)
extends Receptionist.Listing { extends Receptionist.Listing {
def isForKey(key: ServiceKey[_]): Boolean = key == this.key def isForKey(key: ServiceKey[_]): Boolean = key == this.key
@ -56,6 +59,15 @@ private[akka] object ReceptionistMessages {
def getServiceInstances[M](key: ServiceKey[M]): java.util.Set[ActorRef[M]] = def getServiceInstances[M](key: ServiceKey[M]): java.util.Set[ActorRef[M]] =
serviceInstances(key).asJava serviceInstances(key).asJava
override def allServiceInstances[M](key: ServiceKey[M]): Set[ActorRef[M]] = {
if (key != this.key)
throw new IllegalArgumentException(s"Wrong key [$key] used, must use listing key [${this.key}]")
_allServiceInstances.asInstanceOf[Set[ActorRef[M]]]
}
override def getAllServiceInstances[M](key: ServiceKey[M]): java.util.Set[ActorRef[M]] =
allServiceInstances(key).asJava
} }
final case class Subscribe[T] private[akka] (key: ServiceKey[T], subscriber: ActorRef[Receptionist.Listing]) final case class Subscribe[T] private[akka] (key: ServiceKey[T], subscriber: ActorRef[Receptionist.Listing])

View file

@ -73,23 +73,26 @@ private final class InitialGroupRouterImpl[T](
* INTERNAL API * INTERNAL API
*/ */
@InternalApi @InternalApi
private final class GroupRouterImpl[T]( private[akka] final class GroupRouterImpl[T](
ctx: ActorContext[T], ctx: ActorContext[T],
serviceKey: ServiceKey[T], serviceKey: ServiceKey[T],
routingLogic: RoutingLogic[T], routingLogic: RoutingLogic[T],
routeesInitiallyEmpty: Boolean) routeesInitiallyEmpty: Boolean)
extends AbstractBehavior[T] { extends AbstractBehavior[T] {
// casting trix to avoid having to wrap incoming messages - note that this will cause problems if intercepting
// messages to a router
ctx.system.receptionist ! Receptionist.Subscribe(serviceKey, ctx.self.unsafeUpcast[Any].narrow[Receptionist.Listing])
private var routeesEmpty = routeesInitiallyEmpty private var routeesEmpty = routeesInitiallyEmpty
def onMessage(msg: T): Behavior[T] = msg match { def onMessage(msg: T): Behavior[T] = msg match {
case serviceKey.Listing(update) => case l @ serviceKey.Listing(update) =>
// we don't need to watch, because receptionist already does that ctx.log.debug("Update from receptionist: [{}]", l)
routingLogic.routeesUpdated(update) val routees =
routeesEmpty = update.isEmpty if (update.nonEmpty) update
else
// empty listing in a cluster context can mean all nodes with registered services
// are unreachable, in that case trying the unreachable ones is better than dropping messages
l.allServiceInstances(serviceKey)
routeesEmpty = routees.isEmpty
routingLogic.routeesUpdated(routees)
this this
case msg: T @unchecked => case msg: T @unchecked =>
import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.adapter._

View file

@ -54,7 +54,19 @@ abstract class ServiceKey[T] extends AbstractServiceKey { key =>
def asServiceKey: ServiceKey[T] = this def asServiceKey: ServiceKey[T] = this
/** /**
* Scala API: Provides a type safe pattern match for listings * Scala API: Provides a type safe pattern match for listings.
*
* Using it for pattern match like this will return the reachable service instances:
*
* ```
* case MyServiceKey.Listing(reachable) =>
* ```
*
* In a non-clustered `ActorSystem` this will always be all registered instances
* for a service key. For a clustered environment services on nodes that have
* been observed unreachable are not among these (note that they could have
* become unreachable between this message being sent and the receiving actor
* processing it).
*/ */
object Listing { object Listing {
def unapply(l: Receptionist.Listing): Option[Set[ActorRef[T]]] = def unapply(l: Receptionist.Listing): Option[Set[ActorRef[T]]] =
@ -258,15 +270,68 @@ object Receptionist extends ExtensionId[Receptionist] {
def isForKey(key: ServiceKey[_]): Boolean def isForKey(key: ServiceKey[_]): Boolean
/** /**
* Scala API * Scala API: Return the reachable service instances.
*
* In a non-clustered `ActorSystem` this will always be all registered instances
* for a service key.
*
* For a clustered `ActorSystem` it only contain services on nodes that
* are not seen as unreachable (note that they could have still have become
* unreachable between this message being sent and the receiving actor processing it).
*
* For a list including both reachable and unreachable instances see [[#allServiceInstances]]
* *
* Also, see [[ServiceKey.Listing]] for more convenient pattern matching * Also, see [[ServiceKey.Listing]] for more convenient pattern matching
*/ */
def serviceInstances[T](key: ServiceKey[T]): Set[ActorRef[T]] def serviceInstances[T](key: ServiceKey[T]): Set[ActorRef[T]]
/** Java API */ /**
* Java API: Return the reachable service instances.
*
* In a non-clustered `ActorSystem` this will always be all registered instances
* for a service key.
*
* For a clustered `ActorSystem` it only contain services on nodes that has
* are not seen as unreachable (note that they could have still have become
* unreachable between this message being sent and the receiving actor processing it).
*
* For a list including both reachable and unreachable instances see [[#getAllServiceInstances]]
*/
def getServiceInstances[T](key: ServiceKey[T]): java.util.Set[ActorRef[T]] def getServiceInstances[T](key: ServiceKey[T]): java.util.Set[ActorRef[T]]
/**
* Scala API: Return both the reachable and the unreachable service instances.
*
* In a non-clustered `ActorSystem` this will always be the same as [[#serviceInstances]].
*
* For a clustered `ActorSystem` this include both services on nodes that are reachable
* and nodes that are unreachable.
*/
def allServiceInstances[T](key: ServiceKey[T]): Set[ActorRef[T]]
/**
* Java API: Return both the reachable and the unreachable service instances.
*
* In a non-clustered `ActorSystem` this will always be the same as [[#getServiceInstances]].
*
* For a clustered `ActorSystem` this include both services on nodes that are reachable
* and nodes that are unreachable.
*/
def getAllServiceInstances[T](key: ServiceKey[T]): java.util.Set[ActorRef[T]]
/**
* Returns `true` only if this `Listing` was sent triggered by new actors added or removed to the receptionist.
* When `false` the event is only about reachability changes - meaning that the full set of actors
* ([[#allServiceInstances]] or [[#getAllServiceInstances]]) is the same as the previous `Listing`.
*
* knowing this is useful for subscribers only concerned with [[#allServiceInstances]] or [[#getAllServiceInstances]]
* that can then ignore `Listing`s related to reachability.
*
* In a non-clustered `ActorSystem` this will be `true` for all listings.
* For `Find` queries and the initial listing for a `Subscribe` this will always be `true`.
*/
def servicesWereAddedOrRemoved: Boolean
} }
/** /**
@ -276,15 +341,32 @@ object Receptionist extends ExtensionId[Receptionist] {
/** Scala API: */ /** Scala API: */
def apply[T](key: ServiceKey[T], serviceInstances: Set[ActorRef[T]]): Listing = def apply[T](key: ServiceKey[T], serviceInstances: Set[ActorRef[T]]): Listing =
new ReceptionistMessages.Listing[T](key, serviceInstances) apply(key, serviceInstances, serviceInstances, servicesWereAddedOrRemoved = true)
/** Scala API: */
def apply[T](
key: ServiceKey[T],
serviceInstances: Set[ActorRef[T]],
allServiceInstances: Set[ActorRef[T]],
servicesWereAddedOrRemoved: Boolean): Listing =
new ReceptionistMessages.Listing[T](key, serviceInstances, allServiceInstances, servicesWereAddedOrRemoved)
} }
/** /**
* Java API: Sent by the receptionist, available here for easier testing * Java API: Sent by the receptionist, available here for easier testing
*/ */
def listing[T](key: ServiceKey[T], serviceInstances: java.util.Set[ActorRef[T]]): Listing = def listing[T](key: ServiceKey[T], serviceInstances: java.util.Set[ActorRef[T]]): Listing =
Listing(key, Set[ActorRef[T]](serviceInstances.asScala.toSeq: _*)) Listing(key, serviceInstances.asScala.toSet)
/**
* Java API: Sent by the receptionist, available here for easier testing
*/
def listing[T](
key: ServiceKey[T],
serviceInstances: java.util.Set[ActorRef[T]],
allServiceInstances: java.util.Set[ActorRef[T]],
servicesWereAddedOrRemoved: Boolean): Listing =
Listing(key, serviceInstances.asScala.toSet, allServiceInstances.asScala.toSet, servicesWereAddedOrRemoved)
} }

View file

@ -16,14 +16,17 @@ import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey, Replicator }
import akka.cluster.{ Cluster, ClusterEvent, UniqueAddress } import akka.cluster.{ Cluster, ClusterEvent, UniqueAddress }
import akka.remote.AddressUidExtension import akka.remote.AddressUidExtension
import akka.util.TypedMultiMap import akka.util.TypedMultiMap
import scala.concurrent.duration._
import scala.concurrent.duration._
import akka.actor.Address import akka.actor.Address
import akka.cluster.ClusterEvent.ClusterDomainEvent import akka.cluster.ClusterEvent.ClusterDomainEvent
import akka.cluster.ClusterEvent.ClusterShuttingDown import akka.cluster.ClusterEvent.ClusterShuttingDown
import akka.cluster.ClusterEvent.MemberJoined import akka.cluster.ClusterEvent.MemberJoined
import akka.cluster.ClusterEvent.MemberUp import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.ClusterEvent.MemberWeaklyUp import akka.cluster.ClusterEvent.MemberWeaklyUp
import akka.cluster.ClusterEvent.ReachabilityEvent
import akka.cluster.ClusterEvent.ReachableMember
import akka.cluster.ClusterEvent.UnreachableMember
import akka.cluster.ddata.SelfUniqueAddress import akka.cluster.ddata.SelfUniqueAddress
// just to provide a log class // just to provide a log class
@ -60,6 +63,8 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
extends InternalCommand extends InternalCommand
private final case class NodeAdded(addresses: UniqueAddress) extends InternalCommand private final case class NodeAdded(addresses: UniqueAddress) extends InternalCommand
private final case class NodeRemoved(addresses: UniqueAddress) extends InternalCommand private final case class NodeRemoved(addresses: UniqueAddress) extends InternalCommand
private final case class NodeUnreachable(addresses: UniqueAddress) extends InternalCommand
private final case class NodeReachable(addresses: UniqueAddress) extends InternalCommand
private final case class ChangeFromReplicator(key: DDataKey, value: ORMultiMap[ServiceKey[_], Entry]) private final case class ChangeFromReplicator(key: DDataKey, value: ORMultiMap[ServiceKey[_], Entry])
extends InternalCommand extends InternalCommand
private case object RemoveTick extends InternalCommand private case object RemoveTick extends InternalCommand
@ -109,11 +114,13 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
// remove entries when members are removed // remove entries when members are removed
val clusterEventMessageAdapter: ActorRef[ClusterDomainEvent] = val clusterEventMessageAdapter: ActorRef[ClusterDomainEvent] =
ctx.messageAdapter[ClusterDomainEvent] { ctx.messageAdapter[ClusterDomainEvent] {
case MemberJoined(member) => NodeAdded(member.uniqueAddress) case MemberJoined(member) => NodeAdded(member.uniqueAddress)
case MemberWeaklyUp(member) => NodeAdded(member.uniqueAddress) case MemberWeaklyUp(member) => NodeAdded(member.uniqueAddress)
case MemberUp(member) => NodeAdded(member.uniqueAddress) case MemberUp(member) => NodeAdded(member.uniqueAddress)
case MemberRemoved(member, _) => NodeRemoved(member.uniqueAddress) case MemberRemoved(member, _) => NodeRemoved(member.uniqueAddress)
case ClusterShuttingDown => NodeRemoved(setup.cluster.selfUniqueAddress) case UnreachableMember(member) => NodeUnreachable(member.uniqueAddress)
case ReachableMember(member) => NodeReachable(member.uniqueAddress)
case ClusterShuttingDown => NodeRemoved(setup.cluster.selfUniqueAddress)
case other => case other =>
throw new IllegalStateException(s"Unexpected ClusterDomainEvent $other. Please report bug.") throw new IllegalStateException(s"Unexpected ClusterDomainEvent $other. Please report bug.")
} }
@ -124,6 +131,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
classOf[MemberWeaklyUp], classOf[MemberWeaklyUp],
classOf[MemberUp], classOf[MemberUp],
classOf[MemberRemoved], classOf[MemberRemoved],
classOf[ReachabilityEvent],
ClusterShuttingDown.getClass) ClusterShuttingDown.getClass)
// also periodic cleanup in case removal from ORMultiMap is skipped due to concurrent update, // also periodic cleanup in case removal from ORMultiMap is skipped due to concurrent update,
@ -211,6 +219,20 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
} }
} }
def reachabilityChanged(keysForNode: Set[AbstractServiceKey], newRegistry: ShardedServiceRegistry): Unit = {
keysForNode.foreach { changedKey =>
val serviceKey = changedKey.asServiceKey
val subscribers = subscriptions.get(changedKey)
if (subscribers.nonEmpty) {
val (reachable, all) = newRegistry.activeActorRefsFor(serviceKey, selfUniqueAddress)
val listing =
ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved = false)
subscribers.foreach(_ ! listing)
}
}
}
def onCommand(cmd: Command): Behavior[Command] = cmd match { def onCommand(cmd: Command): Behavior[Command] = cmd match {
case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) => case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) =>
if (serviceInstance.path.address.hasLocalScope) { if (serviceInstance.path.address.hasLocalScope) {
@ -231,15 +253,18 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
Behaviors.same Behaviors.same
case ReceptionistMessages.Find(key, replyTo) => case ReceptionistMessages.Find(key, replyTo) =>
replyTo ! ReceptionistMessages.Listing(key.asServiceKey, registry.activeActorRefsFor(key, selfUniqueAddress)) val (reachable, all) = registry.activeActorRefsFor(key, selfUniqueAddress)
replyTo ! ReceptionistMessages.Listing(key.asServiceKey, reachable, all, servicesWereAddedOrRemoved = true)
Behaviors.same Behaviors.same
case ReceptionistMessages.Subscribe(key, subscriber) => case ReceptionistMessages.Subscribe(key, subscriber) =>
watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber)) watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber))
// immediately reply with initial listings to the new subscriber // immediately reply with initial listings to the new subscriber
val listing = val listing = {
ReceptionistMessages.Listing(key.asServiceKey, registry.activeActorRefsFor(key, selfUniqueAddress)) val (reachable, all) = registry.activeActorRefsFor(key, selfUniqueAddress)
ReceptionistMessages.Listing(key.asServiceKey, reachable, all, servicesWereAddedOrRemoved = true)
}
subscriber ! listing subscriber ! listing
next(newSubscriptions = subscriptions.inserted(key)(subscriber)) next(newSubscriptions = subscriptions.inserted(key)(subscriber))
@ -287,9 +312,9 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
val subscribers = subscriptions.get(changedKey) val subscribers = subscriptions.get(changedKey)
if (subscribers.nonEmpty) { if (subscribers.nonEmpty) {
val (reachable, all) = newRegistry.activeActorRefsFor(serviceKey, selfUniqueAddress)
val listing = val listing =
ReceptionistMessages ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved = true)
.Listing(serviceKey, newRegistry.activeActorRefsFor(serviceKey, selfUniqueAddress))
subscribers.foreach(_ ! listing) subscribers.foreach(_ ! listing)
} }
@ -340,6 +365,30 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
next(registry.removeNode(uniqueAddress)) next(registry.removeNode(uniqueAddress))
} }
case NodeUnreachable(uniqueAddress) =>
val keysForNode = registry.keysFor(uniqueAddress)
val newRegistry = registry.addUnreachable(uniqueAddress)
if (keysForNode.nonEmpty) {
ctx.log.debug(
"ClusterReceptionist [{}] - Node with registered services unreachable [{}]",
cluster.selfAddress,
uniqueAddress)
reachabilityChanged(keysForNode, newRegistry)
}
next(newRegistry)
case NodeReachable(uniqueAddress) =>
val keysForNode = registry.keysFor(uniqueAddress)
val newRegistry = registry.removeUnreachable(uniqueAddress)
if (keysForNode.nonEmpty) {
ctx.log.debug(
"ClusterReceptionist [{}] - Node with registered services reachable again [{}]",
cluster.selfAddress,
uniqueAddress)
reachabilityChanged(keysForNode, newRegistry)
}
next(newRegistry)
case RemoveTick => case RemoveTick =>
// ok to update from several nodes but more efficient to try to do it from one node // ok to update from several nodes but more efficient to try to do it from one node
if (isLeader) { if (isLeader) {

View file

@ -23,7 +23,7 @@ import scala.concurrent.duration.Deadline
val key = ORMultiMapKey[ServiceKey[_], Entry](s"ReceptionistKey_$n") val key = ORMultiMapKey[ServiceKey[_], Entry](s"ReceptionistKey_$n")
key -> new ServiceRegistry(EmptyORMultiMap) key -> new ServiceRegistry(EmptyORMultiMap)
}.toMap }.toMap
new ShardedServiceRegistry(emptyRegistries, Map.empty, Set.empty) new ShardedServiceRegistry(emptyRegistries, Map.empty, Set.empty, Set.empty)
} }
} }
@ -42,7 +42,8 @@ import scala.concurrent.duration.Deadline
@InternalApi private[akka] final case class ShardedServiceRegistry( @InternalApi private[akka] final case class ShardedServiceRegistry(
serviceRegistries: Map[DDataKey, ServiceRegistry], serviceRegistries: Map[DDataKey, ServiceRegistry],
tombstones: Map[ActorRef[_], Deadline], tombstones: Map[ActorRef[_], Deadline],
nodes: Set[UniqueAddress]) { nodes: Set[UniqueAddress],
unreachable: Set[UniqueAddress]) {
private val keys = serviceRegistries.keySet.toArray private val keys = serviceRegistries.keySet.toArray
@ -63,14 +64,34 @@ import scala.concurrent.duration.Deadline
serviceRegistries(ddataKey).actorRefsFor(key) serviceRegistries(ddataKey).actorRefsFor(key)
} }
def activeActorRefsFor[T](key: ServiceKey[T], selfUniqueAddress: UniqueAddress): Set[ActorRef[T]] = { /**
* @return keys that has a registered service instance on the given `address`
*/
def keysFor(address: UniqueAddress)(implicit node: SelfUniqueAddress): Set[AbstractServiceKey] =
serviceRegistries.valuesIterator.flatMap(_.keysFor(address)).toSet
/**
* @return (reachable-nodes, all)
*/
def activeActorRefsFor[T](
key: ServiceKey[T],
selfUniqueAddress: UniqueAddress): (Set[ActorRef[T]], Set[ActorRef[T]]) = {
val ddataKey = ddataKeyFor(key) val ddataKey = ddataKeyFor(key)
val entries = serviceRegistries(ddataKey).entriesFor(key) val entries = serviceRegistries(ddataKey).entriesFor(key)
val selfAddress = selfUniqueAddress.address val selfAddress = selfUniqueAddress.address
entries.collect { val reachable = Set.newBuilder[ActorRef[T]]
case entry if nodes.contains(entry.uniqueAddress(selfAddress)) && !hasTombstone(entry.ref) => val all = Set.newBuilder[ActorRef[T]]
entry.ref.asInstanceOf[ActorRef[key.Protocol]] entries.foreach { entry =>
val entryAddress = entry.uniqueAddress(selfAddress)
if (nodes.contains(entryAddress) && !hasTombstone(entry.ref)) {
val ref = entry.ref.asInstanceOf[ActorRef[key.Protocol]]
all += ref
if (!unreachable.contains(entryAddress)) {
reachable += ref
}
}
} }
(reachable.result(), all.result())
} }
def withServiceRegistry(ddataKey: DDataKey, registry: ServiceRegistry): ShardedServiceRegistry = def withServiceRegistry(ddataKey: DDataKey, registry: ServiceRegistry): ShardedServiceRegistry =
@ -113,7 +134,13 @@ import scala.concurrent.duration.Deadline
copy(nodes = nodes + node) copy(nodes = nodes + node)
def removeNode(node: UniqueAddress): ShardedServiceRegistry = def removeNode(node: UniqueAddress): ShardedServiceRegistry =
copy(nodes = nodes - node) copy(nodes = nodes - node, unreachable = unreachable - node)
def addUnreachable(uniqueAddress: UniqueAddress): ShardedServiceRegistry =
copy(unreachable = unreachable + uniqueAddress)
def removeUnreachable(uniqueAddress: UniqueAddress): ShardedServiceRegistry =
copy(unreachable = unreachable - uniqueAddress)
} }
@ -129,6 +156,12 @@ import scala.concurrent.duration.Deadline
def entriesFor(key: AbstractServiceKey): Set[Entry] = def entriesFor(key: AbstractServiceKey): Set[Entry] =
entries.getOrElse(key.asServiceKey, Set.empty[Entry]) entries.getOrElse(key.asServiceKey, Set.empty[Entry])
def keysFor(address: UniqueAddress)(implicit node: SelfUniqueAddress): Set[ServiceKey[_]] =
entries.entries.collect {
case (key, entries) if entries.exists(_.uniqueAddress(node.uniqueAddress.address) == address) =>
key
}.toSet
def addBinding[T](key: ServiceKey[T], value: Entry)(implicit node: SelfUniqueAddress): ServiceRegistry = def addBinding[T](key: ServiceKey[T], value: Entry)(implicit node: SelfUniqueAddress): ServiceRegistry =
copy(entries = entries.addBinding(node, key, value)) copy(entries = entries.addBinding(node, key, value))

View file

@ -0,0 +1,152 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.typed.internal
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.SpawnProtocol
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.MultiNodeClusterSpec
import akka.cluster.typed.MultiDcClusterSingletonSpecConfig.first
import akka.cluster.typed.MultiDcClusterSingletonSpecConfig.second
import akka.cluster.typed.MultiDcClusterSingletonSpecConfig.third
import akka.cluster.typed.MultiNodeTypedClusterSpec
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
object ClusterReceptionistUnreachabilitySpecConfig extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
""").withFallback(MultiNodeClusterSpec.clusterConfig))
testTransport(on = true)
}
object ClusterReceptionistUnreachabilitySpec {
val MyServiceKey = ServiceKey[String]("my-service")
}
class ClusterReceptionistUnreachabilityMultiJvmNode1 extends ClusterReceptionistUnreachabilitySpec
class ClusterReceptionistUnreachabilityMultiJvmNode2 extends ClusterReceptionistUnreachabilitySpec
class ClusterReceptionistUnreachabilityMultiJvmNode3 extends ClusterReceptionistUnreachabilitySpec
abstract class ClusterReceptionistUnreachabilitySpec
extends MultiNodeSpec(ClusterReceptionistUnreachabilitySpecConfig)
with MultiNodeTypedClusterSpec {
import ClusterReceptionistUnreachabilitySpec._
val spawnActor = system.actorOf(PropsAdapter(SpawnProtocol.behavior)).toTyped[SpawnProtocol]
def spawn[T](behavior: Behavior[T], name: String): ActorRef[T] = {
implicit val timeout: Timeout = 3.seconds
implicit val scheduler = typedSystem.scheduler
val f: Future[ActorRef[T]] = spawnActor.ask(ref => SpawnProtocol.Spawn(behavior, name)(ref))
Await.result(f, 3.seconds)
}
val probe = TestProbe[AnyRef]()
val receptionistProbe = TestProbe[AnyRef]()
"The clustered receptionist" must {
"subscribe to the receptionist" in {
typedSystem.receptionist ! Receptionist.Subscribe(MyServiceKey, receptionistProbe.ref)
val listing = receptionistProbe.expectMessageType[Receptionist.Listing]
listing.serviceInstances(MyServiceKey) should ===(Set.empty)
listing.allServiceInstances(MyServiceKey) should ===(Set.empty)
listing.servicesWereAddedOrRemoved should ===(true)
enterBarrier("all subscribed")
}
"form a cluster" in {
formCluster(first, second, third)
enterBarrier("cluster started")
}
"register a service" in {
val localServiceRef = spawn(Behaviors.receiveMessage[String] {
case msg =>
probe.ref ! msg
Behaviors.same
}, "my-service")
typedSystem.receptionist ! Receptionist.Register(MyServiceKey, localServiceRef)
enterBarrier("all registered")
}
"see registered services" in {
awaitAssert({
val listing = receptionistProbe.expectMessageType[Receptionist.Listing]
listing.serviceInstances(MyServiceKey) should have size (3)
listing.allServiceInstances(MyServiceKey) should have size (3)
listing.servicesWereAddedOrRemoved should ===(true)
}, 20.seconds)
enterBarrier("all seen registered")
}
"remove unreachable from listing" in {
// make second unreachable
runOn(first) {
testConductor.blackhole(first, second, Direction.Both).await
testConductor.blackhole(third, second, Direction.Both).await
}
runOn(first, third) {
// assert service on 2 is not in listing but in all and flag is false
awaitAssert({
val listing = receptionistProbe.expectMessageType[Receptionist.Listing]
listing.serviceInstances(MyServiceKey) should have size (2)
listing.allServiceInstances(MyServiceKey) should have size (3)
listing.servicesWereAddedOrRemoved should ===(false)
}, 20.seconds)
}
runOn(second) {
// assert service on 1 and 3 is not in listing but in all and flag is false
awaitAssert({
val listing = receptionistProbe.expectMessageType[Receptionist.Listing]
listing.serviceInstances(MyServiceKey) should have size (1)
listing.allServiceInstances(MyServiceKey) should have size (3)
listing.servicesWereAddedOrRemoved should ===(false)
}, 20.seconds)
}
enterBarrier("all seen unreachable")
}
"add again-reachable to list again" in {
// make second unreachable
runOn(first) {
testConductor.passThrough(first, second, Direction.Both).await
testConductor.passThrough(third, second, Direction.Both).await
}
awaitAssert({
val listing = receptionistProbe.expectMessageType[Receptionist.Listing]
listing.serviceInstances(MyServiceKey) should have size (3)
listing.allServiceInstances(MyServiceKey) should have size (3)
listing.servicesWereAddedOrRemoved should ===(false)
})
enterBarrier("all seen reachable-again")
}
}
}

View file

@ -191,7 +191,10 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
clusterNode1.manager ! Leave(clusterNode2.selfMember.address) clusterNode1.manager ! Leave(clusterNode2.selfMember.address)
} }
regProbe1.expectMessage(10.seconds, Listing(PingKey, Set(service1))) regProbe1.awaitAssert({
// we will also potentially get an update that the service was unreachable before the expected one
regProbe1.expectMessage(10.seconds, Listing(PingKey, Set(service1)))
}, 10.seconds)
// register another after removal // register another after removal
val service1b = testKit1.spawn(pingPongBehavior) val service1b = testKit1.spawn(pingPongBehavior)
@ -243,7 +246,10 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
clusterNode2.manager ! Down(clusterNode1.selfMember.address) clusterNode2.manager ! Down(clusterNode1.selfMember.address)
// service1 removed // service1 removed
regProbe2.expectMessage(10.seconds, Listing(PingKey, Set(service2))) regProbe2.awaitAssert({
// we will also potentially get an update that the service was unreachable before the expected one
regProbe2.expectMessage(10.seconds, Listing(PingKey, Set(service2)))
}, 10.seconds)
} finally { } finally {
testKit1.shutdownTestKit() testKit1.shutdownTestKit()
testKit2.shutdownTestKit() testKit2.shutdownTestKit()
@ -298,8 +304,11 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
system2.terminate() system2.terminate()
Await.ready(system2.whenTerminated, 10.seconds) Await.ready(system2.whenTerminated, 10.seconds)
clusterNode1.manager ! Down(clusterNode2.selfMember.address) clusterNode1.manager ! Down(clusterNode2.selfMember.address)
regProbe1.awaitAssert({
regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) // we will also potentially get an update that the service was unreachable before the expected one
regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
}, 10.seconds)
} finally { } finally {
testKit1.shutdownTestKit() testKit1.shutdownTestKit()
testKit2.shutdownTestKit() testKit2.shutdownTestKit()

View file

@ -73,8 +73,15 @@ sends a `Ping` message and when receiving the `Pong` reply it stops.
## Cluster Receptionist ## Cluster Receptionist
The `Receptionist` also works in a cluster, an actor registered to the receptionist will appear in the receptionist of the other nodes of the cluster. The `Receptionist` also works in a cluster, an actor registered to the receptionist will appear in the receptionist
of the other nodes of the cluster.
The state for the receptionist is propagated via @ref:[distributed data](../distributed-data.md) which means that each node will eventually reach the same set of actors per `ServiceKey`. The state for the receptionist is propagated via @ref:[distributed data](../distributed-data.md) which means that each node
will eventually reach the same set of actors per `ServiceKey`.
One important difference from a local only receptions is the serialisation concerns, all messages sent to and back from an actor on another node must be serializable, see @ref:[clustering](cluster.md#serialization). `Subscription`s and `Find` queries to a clustered receptionist will keep track of cluster reachability and only list
registered actors that are reachable. The full set of actors, including unreachable ones, is available through
@scala[`Listing.allServiceInstances`]@java[`Listing.getAllServiceInstances`].
One important difference from local only receptions are the serialization concerns, all messages sent to and back from
an actor on another node must be serializable, see @ref:[clustering](cluster.md#serialization).

View file

@ -43,12 +43,16 @@ Java
The group router is created with a `ServiceKey` and uses the receptionist (see @ref:[Receptionist](actor-discovery.md#receptionist)) to discover The group router is created with a `ServiceKey` and uses the receptionist (see @ref:[Receptionist](actor-discovery.md#receptionist)) to discover
available actors for that key and routes messages to one of the currently known registered actors for a key. available actors for that key and routes messages to one of the currently known registered actors for a key.
Since the receptionist is used this means the group router is cluster aware out of the box and will pick up routees Since the receptionist is used this means the group router is cluster aware out of the box. The router route
registered on any node in the cluster (there is currently no logic to avoid routing to unreachable nodes, see [#26355](https://github.com/akka/akka/issues/26355)). messages to registered actors on any node in the cluster that is reachable. If no reachable actor exists the router
will fallback and route messages to actors on nodes marked as unreachable.
It also means that the set of routees is eventually consistent, and that immediately when the group router is started That the receptionist is used also means that the set of routees is eventually consistent, and that immediately when
the set of routees it knows about is empty. When the set of routees is empty messages sent to the router is forwarded the group router is started the set of routees it knows about is empty, until it has seen a listing from the receptionist
to dead letters. it stashes incoming messages and forwards them as soon as it gets a listing from the receptionist.
When the router has received a listing from the receptionist and the set of registered actors is empty the router will
drop them (published them to the event stream as `akka.actor.Dropped`).
Scala Scala
: @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #group } : @@snip [RouterSpec.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/RouterSpec.scala) { #group }
@ -88,4 +92,4 @@ it will not give better performance to create more routees than there are thread
Since the router itself is an actor and has a mailbox this means that messages are routed sequentially to the routees Since the router itself is an actor and has a mailbox this means that messages are routed sequentially to the routees
where it can be processed in parallel (depending on the available threads in the dispatcher). where it can be processed in parallel (depending on the available threads in the dispatcher).
In a high throughput use cases the sequential routing could be a bottle neck. Akka Typed does not provide an optimized tool for this. In a high throughput use cases the sequential routing could be a bottle neck. Akka Typed does not provide an optimized tool for this.