Merge pull request #15476 from akka/wip-15472-15440-picks-master-patriknw

#15472 #15440 picks to master (for validation)
This commit is contained in:
Patrik Nordwall 2014-06-30 14:35:42 +02:00
commit 5608abdb9a
3 changed files with 58 additions and 25 deletions

View file

@ -1098,21 +1098,29 @@ object ShardCoordinator {
def updated(event: DomainEvent): State = event match { def updated(event: DomainEvent): State = event match {
case ShardRegionRegistered(region) case ShardRegionRegistered(region)
require(!regions.contains(region), s"Region $region already registered: $this")
copy(regions = regions.updated(region, Vector.empty)) copy(regions = regions.updated(region, Vector.empty))
case ShardRegionProxyRegistered(proxy) case ShardRegionProxyRegistered(proxy)
require(!regionProxies.contains(proxy), s"Region proxy $proxy already registered: $this")
copy(regionProxies = regionProxies + proxy) copy(regionProxies = regionProxies + proxy)
case ShardRegionTerminated(region) case ShardRegionTerminated(region)
require(regions.contains(region), s"Terminated region $region not registered: $this")
copy( copy(
regions = regions - region, regions = regions - region,
shards = shards -- regions(region)) shards = shards -- regions(region))
case ShardRegionProxyTerminated(proxy) case ShardRegionProxyTerminated(proxy)
require(regionProxies.contains(proxy), s"Terminated region proxy $proxy not registered: $this")
copy(regionProxies = regionProxies - proxy) copy(regionProxies = regionProxies - proxy)
case ShardHomeAllocated(shard, region) case ShardHomeAllocated(shard, region)
require(regions.contains(region), s"Region $region not registered: $this")
require(!shards.contains(shard), s"Shard [$shard] already allocated: $this")
copy( copy(
shards = shards.updated(shard, region), shards = shards.updated(shard, region),
regions = regions.updated(region, regions(region) :+ shard)) regions = regions.updated(region, regions(region) :+ shard))
case ShardHomeDeallocated(shard) case ShardHomeDeallocated(shard)
require(shards.contains(shard), s"Shard [$shard] not allocated: $this")
val region = shards(shard) val region = shards(shard)
require(regions.contains(region), s"Region $region for shard [$shard] not registered: $this")
copy( copy(
shards = shards - shard, shards = shards - shard,
regions = regions.updated(region, regions(region).filterNot(_ == shard))) regions = regions.updated(region, regions(region).filterNot(_ == shard)))
@ -1206,28 +1214,32 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite
} }
override def receiveRecover: Receive = { override def receiveRecover: Receive = {
case evt: DomainEvent evt match { case evt: DomainEvent
case ShardRegionRegistered(region) log.debug("receiveRecover {}", evt)
persistentState = persistentState.updated(evt) evt match {
case ShardRegionProxyRegistered(proxy) case ShardRegionRegistered(region)
persistentState = persistentState.updated(evt)
case ShardRegionTerminated(region)
if (persistentState.regions.contains(region))
persistentState = persistentState.updated(evt) persistentState = persistentState.updated(evt)
else { case ShardRegionProxyRegistered(proxy)
log.debug("ShardRegionTerminated, but region {} was not registered. This inconsistency is due to that " + persistentState = persistentState.updated(evt)
" some stored ActorRef in Akka v2.3.0 and v2.3.1 did not contain full address information. It will be " + case ShardRegionTerminated(region)
"removed by later watch.", region) if (persistentState.regions.contains(region))
} persistentState = persistentState.updated(evt)
case ShardRegionProxyTerminated(proxy) else {
persistentState = persistentState.updated(evt) log.debug("ShardRegionTerminated, but region {} was not registered. This inconsistency is due to that " +
case _: ShardHomeAllocated " some stored ActorRef in Akka v2.3.0 and v2.3.1 did not contain full address information. It will be " +
persistentState = persistentState.updated(evt) "removed by later watch.", region)
case _: ShardHomeDeallocated }
persistentState = persistentState.updated(evt) case ShardRegionProxyTerminated(proxy)
} if (persistentState.regionProxies.contains(proxy))
persistentState = persistentState.updated(evt)
case ShardHomeAllocated(shard, region)
persistentState = persistentState.updated(evt)
case _: ShardHomeDeallocated
persistentState = persistentState.updated(evt)
}
case SnapshotOffer(_, state: State) case SnapshotOffer(_, state: State)
log.debug("receiveRecover SnapshotOffer {}", state)
persistentState = state persistentState = state
} }
@ -1274,6 +1286,8 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite
case None case None
if (persistentState.regions.nonEmpty) { if (persistentState.regions.nonEmpty) {
val region = allocationStrategy.allocateShard(sender(), shard, persistentState.regions) val region = allocationStrategy.allocateShard(sender(), shard, persistentState.regions)
require(persistentState.regions.contains(region),
s"Allocated region $region for shard [$shard] must be one of the registered regions: $persistentState")
persist(ShardHomeAllocated(shard, region)) { evt persist(ShardHomeAllocated(shard, region)) { evt
persistentState = persistentState.updated(evt) persistentState = persistentState.updated(evt)
log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region) log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region)
@ -1296,10 +1310,12 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, rebalanceInterval: Finite
case RebalanceDone(shard, ok) case RebalanceDone(shard, ok)
rebalanceInProgress -= shard rebalanceInProgress -= shard
log.debug("Rebalance shard [{}] done [{}]", shard, ok) log.debug("Rebalance shard [{}] done [{}]", shard, ok)
if (ok) persist(ShardHomeDeallocated(shard)) { evt // The shard could have been removed by ShardRegionTerminated
persistentState = persistentState.updated(evt) if (ok && persistentState.shards.contains(shard))
log.debug("Shard [{}] deallocated", evt.shard) persist(ShardHomeDeallocated(shard)) { evt
} persistentState = persistentState.updated(evt)
log.debug("Shard [{}] deallocated", evt.shard)
}
case SnapshotTick case SnapshotTick
log.debug("Saving persistent snapshot") log.debug("Saving persistent snapshot")

View file

@ -369,6 +369,16 @@ private[persistence] trait Eventsourced extends ProcessorImpl {
currentState.aroundReceive(receive, message) currentState.aroundReceive(receive, message)
} }
/**
* INTERNAL API.
*/
override protected[akka] def aroundPreRestart(reason: Throwable, message: Option[Any]): Unit = {
// flushJournalBatch will send outstanding persistAsync and defer events to the journal
// and also prevent those to be unstashed in Processor.aroundPreRestart
flushJournalBatch()
super.aroundPreRestart(reason, message)
}
/** /**
* Calls `super.preRestart` then unstashes all messages from the internal stash. * Calls `super.preRestart` then unstashes all messages from the internal stash.
*/ */

View file

@ -145,8 +145,7 @@ private[akka] trait ProcessorImpl extends Actor with Recovery {
processorBatch.length >= extension.settings.journal.maxMessageBatchSize processorBatch.length >= extension.settings.journal.maxMessageBatchSize
def journalBatch(): Unit = { def journalBatch(): Unit = {
journal ! WriteMessages(processorBatch, self, instanceId) flushJournalBatch()
processorBatch = Vector.empty
batching = true batching = true
} }
} }
@ -257,6 +256,14 @@ private[akka] trait ProcessorImpl extends Actor with Recovery {
journal ! DeleteMessagesTo(persistenceId, toSequenceNr, permanent) journal ! DeleteMessagesTo(persistenceId, toSequenceNr, permanent)
} }
/**
* INTERNAL API
*/
private[akka] def flushJournalBatch(): Unit = {
journal ! WriteMessages(processorBatch, self, instanceId)
processorBatch = Vector.empty
}
/** /**
* INTERNAL API. * INTERNAL API.
*/ */