=con #3975 Check for wrong id in cluster sharding
* Also, a watch leftover from ticket 3882 (cherry picked from commit cbc9dc535c0692a7df00bfb7292e62de1bed7e3f) Conflicts: akka-contrib/src/main/scala/akka/contrib/pattern/DistributedPubSubMediator.scala
This commit is contained in:
parent
d51e06b389
commit
93d069fc8f
2 changed files with 36 additions and 25 deletions
|
|
@ -796,32 +796,40 @@ class ShardRegion(
|
|||
regionByShard.get(shard) match {
|
||||
case Some(ref) if ref == self ⇒
|
||||
val (id, m) = idExtractor(msg)
|
||||
val name = URLEncoder.encode(id, "utf-8")
|
||||
val entry = context.child(name).getOrElse {
|
||||
if (entryProps.isEmpty)
|
||||
throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion")
|
||||
log.debug("Starting entry [{}] in shard [{}]", id, shard)
|
||||
val a = context.watch(context.actorOf(entryProps.get, name))
|
||||
entries = entries.updated(a, shard)
|
||||
entriesByShard = entriesByShard.updated(shard, entriesByShard.getOrElse(shard, Set.empty) + a)
|
||||
a
|
||||
}
|
||||
passivatingBuffers.get(entry) match {
|
||||
case None ⇒
|
||||
log.debug("Message [{}] for shard [{}] sent to entry", m.getClass.getName, shard)
|
||||
entry.tell(m, snd)
|
||||
case Some(buf) ⇒
|
||||
if (totalBufferSize >= bufferSize) {
|
||||
log.debug("Buffer is full, dropping message for passivated entry in shard [{}]", shard)
|
||||
context.system.deadLetters ! msg
|
||||
} else {
|
||||
log.debug("Message for shard [{}] buffered due to entry being passivated", shard)
|
||||
passivatingBuffers = passivatingBuffers.updated(entry, buf :+ ((msg, snd)))
|
||||
}
|
||||
if (id == null || id == "") {
|
||||
log.warning("Id must not be empty, dropping message [{}]", msg.getClass.getName)
|
||||
context.system.deadLetters ! msg
|
||||
} else {
|
||||
val name = URLEncoder.encode(id, "utf-8")
|
||||
val entry = context.child(name).getOrElse {
|
||||
if (entryProps.isEmpty)
|
||||
throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion")
|
||||
log.debug("Starting entry [{}] in shard [{}]", id, shard)
|
||||
val a = context.watch(context.actorOf(entryProps.get, name))
|
||||
entries = entries.updated(a, shard)
|
||||
entriesByShard = entriesByShard.updated(shard, entriesByShard.getOrElse(shard, Set.empty) + a)
|
||||
a
|
||||
}
|
||||
passivatingBuffers.get(entry) match {
|
||||
case None ⇒
|
||||
log.debug("Message [{}] for shard [{}] sent to entry", m.getClass.getName, shard)
|
||||
entry.tell(m, snd)
|
||||
case Some(buf) ⇒
|
||||
if (totalBufferSize >= bufferSize) {
|
||||
log.debug("Buffer is full, dropping message for passivated entry in shard [{}]", shard)
|
||||
context.system.deadLetters ! msg
|
||||
} else {
|
||||
log.debug("Message for shard [{}] buffered due to entry being passivated", shard)
|
||||
passivatingBuffers = passivatingBuffers.updated(entry, buf :+ ((msg, snd)))
|
||||
}
|
||||
}
|
||||
}
|
||||
case Some(ref) ⇒
|
||||
log.debug("Forwarding request for shard [{}] to [{}]", shard, ref)
|
||||
ref.tell(msg, snd)
|
||||
case None if (shard == null || shard == "") ⇒
|
||||
log.warning("Shard must not be empty, dropping message [{}]", msg.getClass.getName)
|
||||
context.system.deadLetters ! msg
|
||||
case None ⇒
|
||||
if (!shardBuffers.contains(shard)) {
|
||||
log.debug("Request shard [{}] home", shard)
|
||||
|
|
@ -1193,7 +1201,6 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite
|
|||
override def receiveRecover: Receive = {
|
||||
case evt: DomainEvent ⇒ evt match {
|
||||
case ShardRegionRegistered(region) ⇒
|
||||
context.watch(region)
|
||||
persistentState = persistentState.updated(evt)
|
||||
case ShardRegionProxyRegistered(proxy) ⇒
|
||||
persistentState = persistentState.updated(evt)
|
||||
|
|
|
|||
|
|
@ -65,8 +65,12 @@ object DistributedPubSubMediator {
|
|||
|
||||
@SerialVersionUID(1L) final case class Put(ref: ActorRef)
|
||||
@SerialVersionUID(1L) final case class Remove(path: String)
|
||||
@SerialVersionUID(1L) final case class Subscribe(topic: String, ref: ActorRef)
|
||||
@SerialVersionUID(1L) final case class Unsubscribe(topic: String, ref: ActorRef)
|
||||
@SerialVersionUID(1L) final case class Subscribe(topic: String, ref: ActorRef) {
|
||||
require(topic != null && topic != "", "topic must be defined")
|
||||
}
|
||||
@SerialVersionUID(1L) final case class Unsubscribe(topic: String, ref: ActorRef) {
|
||||
require(topic != null && topic != "", "topic must be defined")
|
||||
}
|
||||
@SerialVersionUID(1L) final case class SubscribeAck(subscribe: Subscribe)
|
||||
@SerialVersionUID(1L) final case class UnsubscribeAck(unsubscribe: Unsubscribe)
|
||||
@SerialVersionUID(1L) final case class Publish(topic: String, msg: Any) extends DistributedPubSubMessage
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue