From a8700ddc934c42223c3533bd858c39202d0072bf Mon Sep 17 00:00:00 2001 From: Jonas Boner Date: Mon, 23 Mar 2009 19:17:10 +0100 Subject: [PATCH] initial draft of transactional actors --- kernel/src/main/scala/Transaction.scala | 93 +++++++++++++++++++++++++ 1 file changed, 93 insertions(+) create mode 100644 kernel/src/main/scala/Transaction.scala diff --git a/kernel/src/main/scala/Transaction.scala b/kernel/src/main/scala/Transaction.scala new file mode 100644 index 0000000000..3e4fed16f3 --- /dev/null +++ b/kernel/src/main/scala/Transaction.scala @@ -0,0 +1,93 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel + +import java.util.concurrent.atomic.AtomicLong +import scala.collection.mutable.HashMap + +sealed abstract case class TransactionStatus +object TransactionStatus { + case object New extends TransactionStatus + case object Active extends TransactionStatus + case object Aborted extends TransactionStatus + case object Completed extends TransactionStatus +} + +/** + * Represents a snapshot of the current invocation. + * + * @author Jonas Bonér + */ +object TransactionIdFactory { + private val currentId = new AtomicLong(0L) + def newId = currentId.getAndIncrement +} + +/** + * Represents a snapshot of the current invocation. + * + * @author Jonas Bonér + */ +class Transaction extends Logging { + val id = TransactionIdFactory.newId + + private[this] var parent: Option[Transaction] = None + private[this] var oldActorVersions = new HashMap[GenericServerContainer, GenericServer] + private[this] var precommitted: List[GenericServerContainer] = Nil + @volatile private[this] var status: TransactionStatus = TransactionStatus.New + + def begin(server: GenericServerContainer) = synchronized { + if (status == TransactionStatus.Aborted) throw new IllegalStateException("Can't begin ABORTED transaction") + if (status == TransactionStatus.Completed) throw new IllegalStateException("Can't begin COMPLETED transaction") + if (status == TransactionStatus.New) log.debug("Actor [%s] is starting NEW transaction", server) + else log.debug("Actor [%s] is participating in transaction", server) + val oldVersion = server.cloneServerAndReturnOldVersion + oldActorVersions.put(server, oldVersion) + status = TransactionStatus.Active + } + + def precommit(server: GenericServerContainer) = synchronized { + ensureIsActive + log.debug("Pre-committing transaction for actor [%s]", server) + precommitted ::= server + } + + def commit(server: GenericServerContainer) = synchronized { + ensureIsActive + log.debug("Committing transaction for actor [%s]", server) + val haveAllPreCommitted = + if (oldActorVersions.size == precommitted.size) {{ + for (server <- oldActorVersions.keys) yield { + if (precommitted.exists(_.id == server.id)) true + else false + }}.exists(_ == false) + } else false + + if (haveAllPreCommitted) status = TransactionStatus.Completed + else rollback(server) + } + + def rollback(server: GenericServerContainer) = synchronized { + ensureIsActive + log.debug("Actor [%s] has initiated transaction rollback, rolling back [%s]" , server, oldActorVersions.keys) + oldActorVersions.foreach(entry => { + val (server, backup) = entry + server.swapServer(backup) + }) + status = TransactionStatus.Aborted + } + + private def ensureIsActive = if (status == TransactionStatus.Active) + throw new IllegalStateException("Expected ACTIVE transaction - current status [" + status + "]") + + override def equals(that: Any): Boolean = + that != null && + that.isInstanceOf[Transaction] && + that.asInstanceOf[Transaction].id == this.id + + override def hashCode(): Int = id.toInt + + override def toString(): String = "Transaction[" + id + ", " + status + "]" +}