Preparing transactors for binary compat

This commit is contained in:
Viktor Klang 2012-05-24 01:38:41 +02:00
parent 2198462ed2
commit 568c02d158
3 changed files with 26 additions and 15 deletions

View file

@ -12,19 +12,29 @@ import java.util.concurrent.Callable
/**
* Akka-specific exception for coordinated transactions.
*/
class CoordinatedTransactionException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null);
class CoordinatedTransactionException(message: String, cause: Throwable) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null)
}
/**
* Coordinated transactions across actors.
*/
object Coordinated {
def apply(message: Any = null)(implicit timeout: Timeout) = new Coordinated(message, createInitialMember(timeout))
/**
* Creates a new Coordinated with the given message and Timeout
* @param message - the message which will be coordinated
* @param timeout - the timeout for the coordination
* @return a new Coordinated
*/
def apply(message: Any = null)(implicit timeout: Timeout): Coordinated =
new Coordinated(message, CommitBarrier(timeout.duration.toMillis).addMember())
/**
* @param c - a Coordinated to be unapplied
* @return the message associated with the given Coordinated
*/
def unapply(c: Coordinated): Option[Any] = Some(c.message)
def createInitialMember(timeout: Timeout) = CommitBarrier(timeout.duration.toMillis).addMember()
}
/**
@ -91,16 +101,15 @@ class Coordinated(val message: Any, member: CommitBarrier.Member) {
// Java API constructors
def this(message: Any, timeout: Timeout) = this(message, Coordinated.createInitialMember(timeout))
def this(message: Any, timeout: Timeout) = this(message, CommitBarrier(timeout.duration.toMillis).addMember())
def this(timeout: Timeout) = this(null, Coordinated.createInitialMember(timeout))
def this(timeout: Timeout) = this(null, timeout)
/**
* Create a new Coordinated object and increment the number of members by one.
* Use this method to ''pass on'' the coordination.
*/
def apply(msg: Any): Coordinated =
new Coordinated(msg, member.commitBarrier.addMember())
def apply(msg: Any): Coordinated = new Coordinated(msg, member.commitBarrier.addMember())
/**
* Create a new Coordinated object but *do not* increment the number of members by one.

View file

@ -176,8 +176,10 @@ trait Transactor extends Actor {
/**
* Default catch-all for the different Receive methods.
*/
def doNothing: Receive = new Receive {
def apply(any: Any) = {}
def isDefinedAt(any: Any) = false
}
def doNothing: Receive = EmptyReceive
}
private[akka] object EmptyReceive extends PartialFunction[Any, Unit] {
def apply(any: Any): Unit = ()
def isDefinedAt(any: Any): Boolean = false
}

View file

@ -15,11 +15,11 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
*/
object TransactorExtension extends ExtensionId[TransactorSettings] with ExtensionIdProvider {
override def get(system: ActorSystem): TransactorSettings = super.get(system)
override def lookup = TransactorExtension
override def lookup: TransactorExtension.type = TransactorExtension
override def createExtension(system: ExtendedActorSystem): TransactorSettings = new TransactorSettings(system.settings.config)
}
class TransactorSettings(val config: Config) extends Extension {
import config._
val CoordinatedTimeout = Timeout(Duration(getMilliseconds("akka.transactor.coordinated-timeout"), MILLISECONDS))
val CoordinatedTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.transactor.coordinated-timeout"), MILLISECONDS))
}