+per #18485 Add cluster sharding data removal utility

This commit is contained in:
Patrik Nordwall 2015-09-17 08:36:22 +02:00
parent d2cc69e65a
commit 0cdd9af3c4
7 changed files with 528 additions and 6 deletions

View file

@ -25,7 +25,7 @@ import scala.util.Random
* routees in the pool are exhausted, or the `within` duration has passed since the first send. If no
* routee sends a response in time, a [[akka.actor.Status.Failure]] wrapping a [[akka.pattern.AskTimeoutException]]
* is sent to the sender.
*
*
* The goal of this routing algorithm is to decrease tail latencies ("chop off the tail latency") in situations
* where multiple routees can perform the same piece of work, and where a routee may occasionally respond
* more slowly than expected. In this case, sending the same work request (also known as a "backup request")
@ -103,7 +103,7 @@ private[akka] final case class TailChoppingRoutees(
* the `within` duration has passed since the first send. If no routee sends
* a response in time, a [[akka.actor.Status.Failure]] wrapping a [[akka.pattern.AskTimeoutException]]
* is sent to the sender.
*
*
* Refer to [[akka.routing.TailChoppingRoutingLogic]] for comments regarding the goal of this
* routing algorithm.
*
@ -210,7 +210,7 @@ final case class TailChoppingPool(
*
* Refer to [[akka.routing.TailChoppingRoutingLogic]] for comments regarding the goal of this
* routing algorithm.
*
*
* The configuration parameter trumps the constructor arguments. This means that
* if you provide `paths` during instantiation they will be ignored if
* the router is defined in the configuration file for the actor being used.

View file

@ -0,0 +1,236 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.sharding
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.Failure
import scala.util.Success
import scala.util.Try
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Deploy
import akka.actor.Props
import akka.actor.Terminated
import akka.persistence._
import akka.persistence.journal.leveldb.SharedLeveldbJournal
import akka.persistence.journal.leveldb.SharedLeveldbStore
/**
* Utility program that removes the internal data stored with Akka Persistence
* by the Cluster Sharding coordinator. The data contains the locations of the
* shards using Akka Persistence and it can safely be removed when restarting
* the whole Akka Cluster. Note that this is not application data.
*
* <b>Never use this program while there are running Akka Cluster that is
* using Cluster Sharding. Stop all Cluster nodes before using this program.</b>
*
* It can be needed to remove the data if the Cluster Sharding coordinator
* cannot startup because of corrupt data, which may happen if accidentally
* two clusters were running at the same time, e.g. caused by using auto-down
* and there was a network partition.
*
* Use this program as a standalone Java main program:
* {{{
* java -classpath <jar files, including akka-cluster-sharding>
* akka.cluster.sharding.RemoveInternalClusterShardingData
* -2.3 entityType1 entityType2 entityType3
* }}}
*
* The program is included in the `akka-cluster-sharding` jar file. It
* is easiest to run it with same classpath and configuration as your ordinary
* application. It can be run from sbt or maven in similar way.
*
* Specify the entity type names (same as you use in the `start` method
* of `ClusterSharding`) as program arguments.
*
* If you specify `-2.3` as the first program argument it will also try
* to remove data that was stored by Cluster Sharding in Akka 2.3.x using
* different persistenceId.
*/
object RemoveInternalClusterShardingData {
/**
* @see [[RemoveInternalClusterShardingData$ RemoveInternalClusterShardingData companion object]]
*/
def main(args: Array[String]): Unit = {
if (args.isEmpty)
println("Specify the Cluster Sharding type names to remove in program arguments")
else {
val system = ActorSystem("RemoveInternalClusterShardingData")
val remove2dot3Data = (args(0) == "-2.3")
val typeNames = if (remove2dot3Data) args.tail.toSet else args.toSet
if (typeNames.isEmpty)
println("Specify the Cluster Sharding type names to remove in program arguments")
else {
val journalPluginId = system.settings.config.getString("akka.cluster.sharding.journal-plugin-id")
import system.dispatcher
remove(system, journalPluginId, typeNames, terminateSystem = true, remove2dot3Data).onComplete { _
system.terminate()
}
}
}
}
/**
* API corresponding to the [[#main]] method as described in the
* [[RemoveInternalClusterShardingData$ RemoveInternalClusterShardingData companion object]]
*/
def remove(system: ActorSystem, journalPluginId: String, typeNames: Set[String],
terminateSystem: Boolean, remove2dot3Data: Boolean): Future[Unit] = {
val resolvedJournalPluginId =
if (journalPluginId == "") system.settings.config.getString("akka.persistence.journal.plugin")
else journalPluginId
if (resolvedJournalPluginId == "akka.persistence.journal.leveldb-shared") {
val store = system.actorOf(Props[SharedLeveldbStore], "store")
SharedLeveldbJournal.setStore(store, system)
}
val completion = Promise[Unit]()
system.actorOf(props(journalPluginId, typeNames, completion, remove2dot3Data),
name = "removeInternalClusterShardingData")
completion.future
}
/**
* INTERNAL API: `Props` for [[RemoveInternalClusterShardingData]] actor.
*/
private[akka] def props(journalPluginId: String, typeNames: Set[String], completion: Promise[Unit], remove2dot3Data: Boolean): Props =
Props(new RemoveInternalClusterShardingData(journalPluginId, typeNames, completion, remove2dot3Data))
.withDeploy(Deploy.local)
/**
* INTERNAL API
*/
private[akka] object RemoveOnePersistenceId {
def props(journalPluginId: String, persistenceId: String, replyTo: ActorRef): Props =
Props(new RemoveOnePersistenceId(journalPluginId, persistenceId: String, replyTo))
case class Result(removals: Try[Removals])
case class Removals(events: Boolean, snapshots: Boolean)
}
/**
* INTERNAL API: Remove all events and snapshots for one specific
* `persistenceId`. It will reply with `RemoveOnePersistenceId.Result`
* when done.
*/
private[akka] class RemoveOnePersistenceId(
override val journalPluginId: String, override val persistenceId: String, replyTo: ActorRef)
extends PersistentActor {
import RemoveInternalClusterShardingData.RemoveOnePersistenceId._
var hasSnapshots = false
override def receiveRecover: Receive = {
case event: ShardCoordinator.Internal.DomainEvent
case SnapshotOffer(_, _)
hasSnapshots = true
case RecoveryCompleted
deleteMessages(Long.MaxValue)
if (hasSnapshots)
deleteSnapshots(SnapshotSelectionCriteria())
else
context.become(waitDeleteMessagesSuccess)
}
override def receiveCommand: Receive = ({
case DeleteSnapshotsSuccess(_)
context.become(waitDeleteMessagesSuccess)
case DeleteMessagesSuccess(_)
context.become(waitDeleteSnapshotsSuccess)
}: Receive).orElse(handleFailure)
def waitDeleteSnapshotsSuccess: Receive = ({
case DeleteSnapshotsSuccess(_) done()
}: Receive).orElse(handleFailure)
def waitDeleteMessagesSuccess: Receive = ({
case DeleteMessagesSuccess(_) done()
}: Receive).orElse(handleFailure)
def handleFailure: Receive = {
case DeleteMessagesFailure(cause, _) failure(cause)
case DeleteSnapshotsFailure(_, cause) failure(cause)
}
def done(): Unit = {
replyTo ! Result(Success(Removals(lastSequenceNr > 0, hasSnapshots)))
context.stop(self)
}
def failure(cause: Throwable): Unit = {
replyTo ! Result(Failure(cause))
context.stop(self)
}
}
}
/**
* @see [[RemoveInternalClusterShardingData$ RemoveInternalClusterShardingData companion object]]
*/
class RemoveInternalClusterShardingData(journalPluginId: String, typeNames: Set[String], completion: Promise[Unit],
remove2dot3Data: Boolean) extends Actor
with ActorLogging {
import RemoveInternalClusterShardingData._
import RemoveOnePersistenceId.Result
var currentPid: String = _
var currentRef: ActorRef = _
var remainingPids = typeNames.map(persistenceId) ++
(if (remove2dot3Data) typeNames.map(persistenceId2dot3) else Set.empty)
def persistenceId(typeName: String): String = s"/sharding/${typeName}Coordinator"
def persistenceId2dot3(typeName: String): String = s"/user/sharding/${typeName}Coordinator/singleton/coordinator"
override def preStart(): Unit = {
removeNext()
}
def removeNext(): Unit = {
currentPid = remainingPids.head
log.info("Removing data for persistenceId [{}]", currentPid)
currentRef = context.actorOf(RemoveOnePersistenceId.props(journalPluginId, currentPid, self))
context.watch(currentRef)
remainingPids -= currentPid
}
def receive = {
case Result(Success(_))
log.info("Removed data for persistenceId [{}]", currentPid)
if (remainingPids.isEmpty) done()
else removeNext()
case Result(Failure(cause))
log.error("Failed to remove data for persistenceId [{}]", currentPid)
failure(cause)
case Terminated(ref)
if (ref == currentRef) {
val msg = s"Failed to remove data for persistenceId [$currentPid], unexpected termination"
log.error(msg)
failure(new IllegalStateException(msg))
}
}
def done(): Unit = {
context.stop(self)
completion.success(())
}
def failure(cause: Throwable): Unit = {
context.stop(self)
completion.failure(cause)
}
}

View file

@ -0,0 +1,207 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.sharding
import java.io.File
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Success
import akka.actor.ActorRef
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.sharding.RemoveInternalClusterShardingData.RemoveOnePersistenceId.Removals
import akka.cluster.sharding.RemoveInternalClusterShardingData.RemoveOnePersistenceId.Result
import akka.persistence.PersistentActor
import akka.persistence.Recovery
import akka.persistence.RecoveryCompleted
import akka.persistence.SnapshotOffer
import akka.persistence.SnapshotSelectionCriteria
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.testkit.TestActors.EchoActor
import org.apache.commons.io.FileUtils
object RemoveInternalClusterShardingDataSpec {
val config = """
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.netty.tcp.port = 0
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.journal.leveldb {
native = off
dir = "target/journal-RemoveInternalClusterShardingDataSpec"
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-RemoveInternalClusterShardingDataSpec"
akka.cluster.sharding.snapshot-after = 5
"""
val extractEntityId: ShardRegion.ExtractEntityId = {
case msg: Int (msg.toString, msg)
}
val extractShardId: ShardRegion.ExtractShardId = {
case msg: Int (msg % 10).toString
}
class HasSnapshots(override val persistenceId: String, replyTo: ActorRef) extends PersistentActor {
var hasSnapshots = false
override def receiveRecover: Receive = {
case SnapshotOffer(_, _)
hasSnapshots = true
case RecoveryCompleted
replyTo ! hasSnapshots
context.stop(self)
case _
}
override def receiveCommand: Receive = {
case _
}
}
class HasEvents(override val persistenceId: String, replyTo: ActorRef) extends PersistentActor {
var hasEvents = false
override def recovery: Recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria.None)
override def receiveRecover: Receive = {
case event: ShardCoordinator.Internal.DomainEvent
hasEvents = true
case RecoveryCompleted
replyTo ! hasEvents
context.stop(self)
}
override def receiveCommand: Receive = {
case _
}
}
}
class RemoveInternalClusterShardingDataSpec extends AkkaSpec(RemoveInternalClusterShardingDataSpec.config)
with ImplicitSender {
import RemoveInternalClusterShardingDataSpec._
val storageLocations = List(
"akka.persistence.journal.leveldb.dir",
"akka.persistence.snapshot-store.local.dir").map(s new File(system.settings.config.getString(s)))
override protected def atStartup() {
storageLocations.foreach(dir if (dir.exists) FileUtils.deleteDirectory(dir))
}
override protected def afterTermination() {
storageLocations.foreach(dir if (dir.exists) FileUtils.deleteDirectory(dir))
}
// same persistenceId as is used by ShardCoordinator
def persistenceId(typeName: String): String = s"/sharding/${typeName}Coordinator"
def hasSnapshots(typeName: String): Boolean = {
system.actorOf(Props(classOf[HasSnapshots], persistenceId(typeName), testActor))
expectMsgType[Boolean]
}
def hasEvents(typeName: String): Boolean = {
system.actorOf(Props(classOf[HasEvents], persistenceId(typeName), testActor))
expectMsgType[Boolean]
}
"RemoveOnePersistenceId" must {
"setup sharding" in {
Cluster(system).join(Cluster(system).selfAddress)
val settings = ClusterShardingSettings(system)
ClusterSharding(system).start("type1", Props[EchoActor], settings, extractEntityId, extractShardId)
ClusterSharding(system).start("type2", Props[EchoActor], settings, extractEntityId, extractShardId)
}
"work when no data" in within(10.seconds) {
hasSnapshots("type1") should ===(false)
hasEvents("type1") should ===(false)
val rm = system.actorOf(RemoveInternalClusterShardingData.RemoveOnePersistenceId.props(
journalPluginId = "", persistenceId("type1"), testActor))
watch(rm)
expectMsg(Result(Success(Removals(false, false))))
expectTerminated(rm)
}
"remove all events when no snapshot" in within(10.seconds) {
val region = ClusterSharding(system).shardRegion("type1")
(1 to 3).foreach(region ! _)
receiveN(3).toSet should be((1 to 3).toSet)
hasSnapshots("type1") should ===(false)
hasEvents("type1") should ===(true)
val rm = system.actorOf(RemoveInternalClusterShardingData.RemoveOnePersistenceId.props(
journalPluginId = "", persistenceId("type1"), testActor))
watch(rm)
expectMsg(Result(Success(Removals(true, false))))
expectTerminated(rm)
hasSnapshots("type1") should ===(false)
hasEvents("type1") should ===(false)
}
"remove all events and snapshots" in within(10.seconds) {
val region = ClusterSharding(system).shardRegion("type2")
(1 to 10).foreach(region ! _)
receiveN(10).toSet should be((1 to 10).toSet)
awaitAssert {
// theoretically it might take a while until snapshot is visible
hasSnapshots("type2") should ===(true)
}
hasEvents("type2") should ===(true)
val rm = system.actorOf(RemoveInternalClusterShardingData.RemoveOnePersistenceId.props(
journalPluginId = "", persistenceId("type2"), testActor))
watch(rm)
expectMsg(Result(Success(Removals(true, true))))
expectTerminated(rm)
hasSnapshots("type2") should ===(false)
hasEvents("type2") should ===(false)
}
}
"RemoveInternalClusterShardingData" must {
val typeNames = List("type10", "type20", "type30")
"setup sharding" in {
Cluster(system).join(Cluster(system).selfAddress)
val settings = ClusterShardingSettings(system)
typeNames.foreach { typeName
ClusterSharding(system).start(typeName, Props[EchoActor], settings, extractEntityId, extractShardId)
}
}
"remove all events and snapshots" in within(10.seconds) {
typeNames.foreach { typeName
val region = ClusterSharding(system).shardRegion(typeName)
(1 to 10).foreach(region ! _)
receiveN(10).toSet should be((1 to 10).toSet)
awaitAssert {
// theoretically it might take a while until snapshot is visible
hasSnapshots(typeName) should ===(true)
}
hasEvents(typeName) should ===(true)
}
val result = RemoveInternalClusterShardingData.remove(
system, journalPluginId = "", typeNames.toSet,
terminateSystem = false, remove2dot3Data = true)
Await.ready(result, remaining)
typeNames.foreach { typeName
hasSnapshots(typeName) should ===(false)
hasEvents(typeName) should ===(false)
}
}
}
}

View file

@ -271,6 +271,46 @@ This is how to do that:
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#graceful-shutdown
.. _RemoveInternalClusterShardingData-java:
Removal of Internal Cluster Sharding Data
-----------------------------------------
The Cluster Sharding coordinator stores the locations of the shards using Akka Persistence.
This data can safely be removed when restarting the whole Akka Cluster.
Note that this is not application data.
There is a utility program ``akka.cluster.sharding.RemoveInternalClusterShardingData``
that removes this data.
.. warning::
Never use this program while there are running Akka Cluster nodes that are
using Cluster Sharding. Stop all Cluster nodes before using this program.
It can be needed to remove the data if the Cluster Sharding coordinator
cannot startup because of corrupt data, which may happen if accidentally
two clusters were running at the same time, e.g. caused by using auto-down
and there was a network partition.
Use this program as a standalone Java main program::
java -classpath <jar files, including akka-cluster-sharding>
akka.cluster.sharding.RemoveInternalClusterShardingData
-2.3 entityType1 entityType2 entityType3
The program is included in the ``akka-cluster-sharding`` jar file. It
is easiest to run it with same classpath and configuration as your ordinary
application. It can be run from sbt or maven in similar way.
Specify the entity type names (same as you use in the ``start`` method
of ``ClusterSharding``) as program arguments.
If you specify ``-2.3`` as the first program argument it will also try
to remove data that was stored by Cluster Sharding in Akka 2.3.x using
different persistenceId.
Dependencies
------------

View file

@ -309,8 +309,8 @@ it should not load such old data, but it can be a problem if you have used a 2.4
milestone release. In that case you should remove the persistent data that the
Cluster Sharding coordinator stored. Note that this is not application data.
Consult the Journal Plugin and Snapshot Store Plugin documentation or backend
data store documentation for information about how to remove such data.
You can use the :ref:`RemoveInternalClusterShardingData <RemoveInternalClusterShardingData-scala>`
utility program to remove this data.
The new ``persistenceId`` is ``s"/sharding/${typeName}Coordinator"``.
The old ``persistenceId`` is ``s"/user/sharding/${typeName}Coordinator/singleton/coordinator"``.

View file

@ -274,6 +274,45 @@ This is how to do that:
.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala#graceful-shutdown
.. _RemoveInternalClusterShardingData-scala:
Removal of Internal Cluster Sharding Data
-----------------------------------------
The Cluster Sharding coordinator stores the locations of the shards using Akka Persistence.
This data can safely be removed when restarting the whole Akka Cluster.
Note that this is not application data.
There is a utility program ``akka.cluster.sharding.RemoveInternalClusterShardingData``
that removes this data.
.. warning::
Never use this program while there are running Akka Cluster nodes that are
using Cluster Sharding. Stop all Cluster nodes before using this program.
It can be needed to remove the data if the Cluster Sharding coordinator
cannot startup because of corrupt data, which may happen if accidentally
two clusters were running at the same time, e.g. caused by using auto-down
and there was a network partition.
Use this program as a standalone Java main program::
java -classpath <jar files, including akka-cluster-sharding>
akka.cluster.sharding.RemoveInternalClusterShardingData
-2.3 entityType1 entityType2 entityType3
The program is included in the ``akka-cluster-sharding`` jar file. It
is easiest to run it with same classpath and configuration as your ordinary
application. It can be run from sbt or maven in similar way.
Specify the entity type names (same as you use in the ``start`` method
of ``ClusterSharding``) as program arguments.
If you specify ``-2.3`` as the first program argument it will also try
to remove data that was stored by Cluster Sharding in Akka 2.3.x using
different persistenceId.
Dependencies
------------

View file

@ -101,7 +101,7 @@ object Dependencies {
val clusterTools = l ++= Seq(Test.junit, Test.scalatest.value)
val clusterSharding = l ++= Seq(Test.junit, Test.scalatest.value, Test.commonsIo)
val clusterSharding = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Test.junit, Test.scalatest.value, Test.commonsIo)
val clusterMetrics = l ++= Seq(Provided.sigarLoader, Test.slf4jJul, Test.slf4jLog4j, Test.logback, Test.mockito)