diff --git a/akka-actor/src/main/scala/akka/routing/TailChopping.scala b/akka-actor/src/main/scala/akka/routing/TailChopping.scala index 0937abe6c1..88ef3d0aa0 100644 --- a/akka-actor/src/main/scala/akka/routing/TailChopping.scala +++ b/akka-actor/src/main/scala/akka/routing/TailChopping.scala @@ -25,7 +25,7 @@ import scala.util.Random * routees in the pool are exhausted, or the `within` duration has passed since the first send. If no * routee sends a response in time, a [[akka.actor.Status.Failure]] wrapping a [[akka.pattern.AskTimeoutException]] * is sent to the sender. - * + * * The goal of this routing algorithm is to decrease tail latencies ("chop off the tail latency") in situations * where multiple routees can perform the same piece of work, and where a routee may occasionally respond * more slowly than expected. In this case, sending the same work request (also known as a "backup request") @@ -103,7 +103,7 @@ private[akka] final case class TailChoppingRoutees( * the `within` duration has passed since the first send. If no routee sends * a response in time, a [[akka.actor.Status.Failure]] wrapping a [[akka.pattern.AskTimeoutException]] * is sent to the sender. - * + * * Refer to [[akka.routing.TailChoppingRoutingLogic]] for comments regarding the goal of this * routing algorithm. * @@ -210,7 +210,7 @@ final case class TailChoppingPool( * * Refer to [[akka.routing.TailChoppingRoutingLogic]] for comments regarding the goal of this * routing algorithm. - * + * * The configuration parameter trumps the constructor arguments. This means that * if you provide `paths` during instantiation they will be ignored if * the router is defined in the configuration file for the actor being used. diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/RemoveInternalClusterShardingData.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/RemoveInternalClusterShardingData.scala new file mode 100644 index 0000000000..a8e7e8d019 --- /dev/null +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/RemoveInternalClusterShardingData.scala @@ -0,0 +1,236 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.cluster.sharding + +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.util.Failure +import scala.util.Success +import scala.util.Try + +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.ActorSystem +import akka.actor.Deploy +import akka.actor.Props +import akka.actor.Terminated +import akka.persistence._ +import akka.persistence.journal.leveldb.SharedLeveldbJournal +import akka.persistence.journal.leveldb.SharedLeveldbStore + +/** + * Utility program that removes the internal data stored with Akka Persistence + * by the Cluster Sharding coordinator. The data contains the locations of the + * shards using Akka Persistence and it can safely be removed when restarting + * the whole Akka Cluster. Note that this is not application data. + * + * Never use this program while there are running Akka Cluster that is + * using Cluster Sharding. Stop all Cluster nodes before using this program. + * + * It can be needed to remove the data if the Cluster Sharding coordinator + * cannot startup because of corrupt data, which may happen if accidentally + * two clusters were running at the same time, e.g. caused by using auto-down + * and there was a network partition. + * + * Use this program as a standalone Java main program: + * {{{ + * java -classpath + * akka.cluster.sharding.RemoveInternalClusterShardingData + * -2.3 entityType1 entityType2 entityType3 + * }}} + * + * The program is included in the `akka-cluster-sharding` jar file. It + * is easiest to run it with same classpath and configuration as your ordinary + * application. It can be run from sbt or maven in similar way. + * + * Specify the entity type names (same as you use in the `start` method + * of `ClusterSharding`) as program arguments. + * + * If you specify `-2.3` as the first program argument it will also try + * to remove data that was stored by Cluster Sharding in Akka 2.3.x using + * different persistenceId. + */ +object RemoveInternalClusterShardingData { + + /** + * @see [[RemoveInternalClusterShardingData$ RemoveInternalClusterShardingData companion object]] + */ + def main(args: Array[String]): Unit = { + if (args.isEmpty) + println("Specify the Cluster Sharding type names to remove in program arguments") + else { + val system = ActorSystem("RemoveInternalClusterShardingData") + val remove2dot3Data = (args(0) == "-2.3") + val typeNames = if (remove2dot3Data) args.tail.toSet else args.toSet + if (typeNames.isEmpty) + println("Specify the Cluster Sharding type names to remove in program arguments") + else { + val journalPluginId = system.settings.config.getString("akka.cluster.sharding.journal-plugin-id") + import system.dispatcher + remove(system, journalPluginId, typeNames, terminateSystem = true, remove2dot3Data).onComplete { _ ⇒ + system.terminate() + } + } + } + } + + /** + * API corresponding to the [[#main]] method as described in the + * [[RemoveInternalClusterShardingData$ RemoveInternalClusterShardingData companion object]] + */ + def remove(system: ActorSystem, journalPluginId: String, typeNames: Set[String], + terminateSystem: Boolean, remove2dot3Data: Boolean): Future[Unit] = { + + val resolvedJournalPluginId = + if (journalPluginId == "") system.settings.config.getString("akka.persistence.journal.plugin") + else journalPluginId + if (resolvedJournalPluginId == "akka.persistence.journal.leveldb-shared") { + val store = system.actorOf(Props[SharedLeveldbStore], "store") + SharedLeveldbJournal.setStore(store, system) + } + + val completion = Promise[Unit]() + system.actorOf(props(journalPluginId, typeNames, completion, remove2dot3Data), + name = "removeInternalClusterShardingData") + completion.future + } + + /** + * INTERNAL API: `Props` for [[RemoveInternalClusterShardingData]] actor. + */ + private[akka] def props(journalPluginId: String, typeNames: Set[String], completion: Promise[Unit], remove2dot3Data: Boolean): Props = + Props(new RemoveInternalClusterShardingData(journalPluginId, typeNames, completion, remove2dot3Data)) + .withDeploy(Deploy.local) + + /** + * INTERNAL API + */ + private[akka] object RemoveOnePersistenceId { + def props(journalPluginId: String, persistenceId: String, replyTo: ActorRef): Props = + Props(new RemoveOnePersistenceId(journalPluginId, persistenceId: String, replyTo)) + + case class Result(removals: Try[Removals]) + case class Removals(events: Boolean, snapshots: Boolean) + } + + /** + * INTERNAL API: Remove all events and snapshots for one specific + * `persistenceId`. It will reply with `RemoveOnePersistenceId.Result` + * when done. + */ + private[akka] class RemoveOnePersistenceId( + override val journalPluginId: String, override val persistenceId: String, replyTo: ActorRef) + extends PersistentActor { + + import RemoveInternalClusterShardingData.RemoveOnePersistenceId._ + + var hasSnapshots = false + + override def receiveRecover: Receive = { + case event: ShardCoordinator.Internal.DomainEvent ⇒ + + case SnapshotOffer(_, _) ⇒ + hasSnapshots = true + + case RecoveryCompleted ⇒ + deleteMessages(Long.MaxValue) + if (hasSnapshots) + deleteSnapshots(SnapshotSelectionCriteria()) + else + context.become(waitDeleteMessagesSuccess) + } + + override def receiveCommand: Receive = ({ + case DeleteSnapshotsSuccess(_) ⇒ + context.become(waitDeleteMessagesSuccess) + case DeleteMessagesSuccess(_) ⇒ + context.become(waitDeleteSnapshotsSuccess) + }: Receive).orElse(handleFailure) + + def waitDeleteSnapshotsSuccess: Receive = ({ + case DeleteSnapshotsSuccess(_) ⇒ done() + }: Receive).orElse(handleFailure) + + def waitDeleteMessagesSuccess: Receive = ({ + case DeleteMessagesSuccess(_) ⇒ done() + }: Receive).orElse(handleFailure) + + def handleFailure: Receive = { + case DeleteMessagesFailure(cause, _) ⇒ failure(cause) + case DeleteSnapshotsFailure(_, cause) ⇒ failure(cause) + } + + def done(): Unit = { + replyTo ! Result(Success(Removals(lastSequenceNr > 0, hasSnapshots))) + context.stop(self) + } + + def failure(cause: Throwable): Unit = { + replyTo ! Result(Failure(cause)) + context.stop(self) + } + + } + +} + +/** + * @see [[RemoveInternalClusterShardingData$ RemoveInternalClusterShardingData companion object]] + */ +class RemoveInternalClusterShardingData(journalPluginId: String, typeNames: Set[String], completion: Promise[Unit], + remove2dot3Data: Boolean) extends Actor + with ActorLogging { + import RemoveInternalClusterShardingData._ + import RemoveOnePersistenceId.Result + + var currentPid: String = _ + var currentRef: ActorRef = _ + var remainingPids = typeNames.map(persistenceId) ++ + (if (remove2dot3Data) typeNames.map(persistenceId2dot3) else Set.empty) + + def persistenceId(typeName: String): String = s"/sharding/${typeName}Coordinator" + + def persistenceId2dot3(typeName: String): String = s"/user/sharding/${typeName}Coordinator/singleton/coordinator" + + override def preStart(): Unit = { + removeNext() + } + + def removeNext(): Unit = { + currentPid = remainingPids.head + log.info("Removing data for persistenceId [{}]", currentPid) + currentRef = context.actorOf(RemoveOnePersistenceId.props(journalPluginId, currentPid, self)) + context.watch(currentRef) + remainingPids -= currentPid + } + + def receive = { + case Result(Success(_)) ⇒ + log.info("Removed data for persistenceId [{}]", currentPid) + if (remainingPids.isEmpty) done() + else removeNext() + + case Result(Failure(cause)) ⇒ + log.error("Failed to remove data for persistenceId [{}]", currentPid) + failure(cause) + + case Terminated(ref) ⇒ + if (ref == currentRef) { + val msg = s"Failed to remove data for persistenceId [$currentPid], unexpected termination" + log.error(msg) + failure(new IllegalStateException(msg)) + } + } + + def done(): Unit = { + context.stop(self) + completion.success(()) + } + + def failure(cause: Throwable): Unit = { + context.stop(self) + completion.failure(cause) + } +} diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala new file mode 100644 index 0000000000..fce61544b2 --- /dev/null +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RemoveInternalClusterShardingDataSpec.scala @@ -0,0 +1,207 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.cluster.sharding + +import java.io.File + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.Success + +import akka.actor.ActorRef +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.sharding.RemoveInternalClusterShardingData.RemoveOnePersistenceId.Removals +import akka.cluster.sharding.RemoveInternalClusterShardingData.RemoveOnePersistenceId.Result +import akka.persistence.PersistentActor +import akka.persistence.Recovery +import akka.persistence.RecoveryCompleted +import akka.persistence.SnapshotOffer +import akka.persistence.SnapshotSelectionCriteria +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.testkit.TestActors.EchoActor +import org.apache.commons.io.FileUtils + +object RemoveInternalClusterShardingDataSpec { + val config = """ + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.netty.tcp.port = 0 + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb" + akka.persistence.journal.leveldb { + native = off + dir = "target/journal-RemoveInternalClusterShardingDataSpec" + } + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshots-RemoveInternalClusterShardingDataSpec" + akka.cluster.sharding.snapshot-after = 5 + """ + + val extractEntityId: ShardRegion.ExtractEntityId = { + case msg: Int ⇒ (msg.toString, msg) + } + + val extractShardId: ShardRegion.ExtractShardId = { + case msg: Int ⇒ (msg % 10).toString + } + + class HasSnapshots(override val persistenceId: String, replyTo: ActorRef) extends PersistentActor { + + var hasSnapshots = false + + override def receiveRecover: Receive = { + case SnapshotOffer(_, _) ⇒ + hasSnapshots = true + case RecoveryCompleted ⇒ + replyTo ! hasSnapshots + context.stop(self) + + case _ ⇒ + } + + override def receiveCommand: Receive = { + case _ ⇒ + } + } + + class HasEvents(override val persistenceId: String, replyTo: ActorRef) extends PersistentActor { + + var hasEvents = false + + override def recovery: Recovery = Recovery(fromSnapshot = SnapshotSelectionCriteria.None) + + override def receiveRecover: Receive = { + case event: ShardCoordinator.Internal.DomainEvent ⇒ + hasEvents = true + case RecoveryCompleted ⇒ + replyTo ! hasEvents + context.stop(self) + } + + override def receiveCommand: Receive = { + case _ ⇒ + } + } + +} + +class RemoveInternalClusterShardingDataSpec extends AkkaSpec(RemoveInternalClusterShardingDataSpec.config) + with ImplicitSender { + import RemoveInternalClusterShardingDataSpec._ + + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + override protected def atStartup() { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + + override protected def afterTermination() { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + + // same persistenceId as is used by ShardCoordinator + def persistenceId(typeName: String): String = s"/sharding/${typeName}Coordinator" + + def hasSnapshots(typeName: String): Boolean = { + system.actorOf(Props(classOf[HasSnapshots], persistenceId(typeName), testActor)) + expectMsgType[Boolean] + } + + def hasEvents(typeName: String): Boolean = { + system.actorOf(Props(classOf[HasEvents], persistenceId(typeName), testActor)) + expectMsgType[Boolean] + } + + "RemoveOnePersistenceId" must { + "setup sharding" in { + Cluster(system).join(Cluster(system).selfAddress) + val settings = ClusterShardingSettings(system) + ClusterSharding(system).start("type1", Props[EchoActor], settings, extractEntityId, extractShardId) + ClusterSharding(system).start("type2", Props[EchoActor], settings, extractEntityId, extractShardId) + } + + "work when no data" in within(10.seconds) { + hasSnapshots("type1") should ===(false) + hasEvents("type1") should ===(false) + val rm = system.actorOf(RemoveInternalClusterShardingData.RemoveOnePersistenceId.props( + journalPluginId = "", persistenceId("type1"), testActor)) + watch(rm) + expectMsg(Result(Success(Removals(false, false)))) + expectTerminated(rm) + } + + "remove all events when no snapshot" in within(10.seconds) { + val region = ClusterSharding(system).shardRegion("type1") + (1 to 3).foreach(region ! _) + receiveN(3).toSet should be((1 to 3).toSet) + hasSnapshots("type1") should ===(false) + hasEvents("type1") should ===(true) + + val rm = system.actorOf(RemoveInternalClusterShardingData.RemoveOnePersistenceId.props( + journalPluginId = "", persistenceId("type1"), testActor)) + watch(rm) + expectMsg(Result(Success(Removals(true, false)))) + expectTerminated(rm) + hasSnapshots("type1") should ===(false) + hasEvents("type1") should ===(false) + } + + "remove all events and snapshots" in within(10.seconds) { + val region = ClusterSharding(system).shardRegion("type2") + (1 to 10).foreach(region ! _) + receiveN(10).toSet should be((1 to 10).toSet) + awaitAssert { + // theoretically it might take a while until snapshot is visible + hasSnapshots("type2") should ===(true) + } + hasEvents("type2") should ===(true) + + val rm = system.actorOf(RemoveInternalClusterShardingData.RemoveOnePersistenceId.props( + journalPluginId = "", persistenceId("type2"), testActor)) + watch(rm) + expectMsg(Result(Success(Removals(true, true)))) + expectTerminated(rm) + hasSnapshots("type2") should ===(false) + hasEvents("type2") should ===(false) + } + } + + "RemoveInternalClusterShardingData" must { + val typeNames = List("type10", "type20", "type30") + + "setup sharding" in { + Cluster(system).join(Cluster(system).selfAddress) + val settings = ClusterShardingSettings(system) + typeNames.foreach { typeName ⇒ + ClusterSharding(system).start(typeName, Props[EchoActor], settings, extractEntityId, extractShardId) + } + } + + "remove all events and snapshots" in within(10.seconds) { + typeNames.foreach { typeName ⇒ + val region = ClusterSharding(system).shardRegion(typeName) + (1 to 10).foreach(region ! _) + receiveN(10).toSet should be((1 to 10).toSet) + awaitAssert { + // theoretically it might take a while until snapshot is visible + hasSnapshots(typeName) should ===(true) + } + hasEvents(typeName) should ===(true) + } + + val result = RemoveInternalClusterShardingData.remove( + system, journalPluginId = "", typeNames.toSet, + terminateSystem = false, remove2dot3Data = true) + Await.ready(result, remaining) + + typeNames.foreach { typeName ⇒ + hasSnapshots(typeName) should ===(false) + hasEvents(typeName) should ===(false) + } + } + } +} diff --git a/akka-docs/rst/java/cluster-sharding.rst b/akka-docs/rst/java/cluster-sharding.rst index 528875281a..7f96b0ec06 100644 --- a/akka-docs/rst/java/cluster-sharding.rst +++ b/akka-docs/rst/java/cluster-sharding.rst @@ -271,6 +271,46 @@ This is how to do that: .. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#graceful-shutdown +.. _RemoveInternalClusterShardingData-java: + +Removal of Internal Cluster Sharding Data +----------------------------------------- + +The Cluster Sharding coordinator stores the locations of the shards using Akka Persistence. +This data can safely be removed when restarting the whole Akka Cluster. +Note that this is not application data. + +There is a utility program ``akka.cluster.sharding.RemoveInternalClusterShardingData`` +that removes this data. + +.. warning:: + + Never use this program while there are running Akka Cluster nodes that are + using Cluster Sharding. Stop all Cluster nodes before using this program. + +It can be needed to remove the data if the Cluster Sharding coordinator +cannot startup because of corrupt data, which may happen if accidentally +two clusters were running at the same time, e.g. caused by using auto-down +and there was a network partition. + +Use this program as a standalone Java main program:: + + java -classpath + akka.cluster.sharding.RemoveInternalClusterShardingData + -2.3 entityType1 entityType2 entityType3 + +The program is included in the ``akka-cluster-sharding`` jar file. It +is easiest to run it with same classpath and configuration as your ordinary +application. It can be run from sbt or maven in similar way. + +Specify the entity type names (same as you use in the ``start`` method +of ``ClusterSharding``) as program arguments. + +If you specify ``-2.3`` as the first program argument it will also try +to remove data that was stored by Cluster Sharding in Akka 2.3.x using +different persistenceId. + + Dependencies ------------ diff --git a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst index 5792a16740..a62ba95d20 100644 --- a/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst +++ b/akka-docs/rst/project/migration-guide-2.3.x-2.4.x.rst @@ -309,8 +309,8 @@ it should not load such old data, but it can be a problem if you have used a 2.4 milestone release. In that case you should remove the persistent data that the Cluster Sharding coordinator stored. Note that this is not application data. -Consult the Journal Plugin and Snapshot Store Plugin documentation or backend -data store documentation for information about how to remove such data. +You can use the :ref:`RemoveInternalClusterShardingData ` +utility program to remove this data. The new ``persistenceId`` is ``s"/sharding/${typeName}Coordinator"``. The old ``persistenceId`` is ``s"/user/sharding/${typeName}Coordinator/singleton/coordinator"``. diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index 0eff4bce12..4647231cf5 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -274,6 +274,45 @@ This is how to do that: .. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala#graceful-shutdown +.. _RemoveInternalClusterShardingData-scala: + +Removal of Internal Cluster Sharding Data +----------------------------------------- + +The Cluster Sharding coordinator stores the locations of the shards using Akka Persistence. +This data can safely be removed when restarting the whole Akka Cluster. +Note that this is not application data. + +There is a utility program ``akka.cluster.sharding.RemoveInternalClusterShardingData`` +that removes this data. + +.. warning:: + + Never use this program while there are running Akka Cluster nodes that are + using Cluster Sharding. Stop all Cluster nodes before using this program. + +It can be needed to remove the data if the Cluster Sharding coordinator +cannot startup because of corrupt data, which may happen if accidentally +two clusters were running at the same time, e.g. caused by using auto-down +and there was a network partition. + +Use this program as a standalone Java main program:: + + java -classpath + akka.cluster.sharding.RemoveInternalClusterShardingData + -2.3 entityType1 entityType2 entityType3 + +The program is included in the ``akka-cluster-sharding`` jar file. It +is easiest to run it with same classpath and configuration as your ordinary +application. It can be run from sbt or maven in similar way. + +Specify the entity type names (same as you use in the ``start`` method +of ``ClusterSharding``) as program arguments. + +If you specify ``-2.3`` as the first program argument it will also try +to remove data that was stored by Cluster Sharding in Akka 2.3.x using +different persistenceId. + Dependencies ------------ diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c3d0f9b901..645e052439 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -101,7 +101,7 @@ object Dependencies { val clusterTools = l ++= Seq(Test.junit, Test.scalatest.value) - val clusterSharding = l ++= Seq(Test.junit, Test.scalatest.value, Test.commonsIo) + val clusterSharding = l ++= Seq(Provided.levelDB, Provided.levelDBNative, Test.junit, Test.scalatest.value, Test.commonsIo) val clusterMetrics = l ++= Seq(Provided.sigarLoader, Test.slf4jJul, Test.slf4jLog4j, Test.logback, Test.mockito)