Fixing ticket 372

This commit is contained in:
Viktor Klang 2010-08-06 17:13:31 +02:00
parent 8788e7261f
commit 652c1ad8cf
2 changed files with 14 additions and 41 deletions

View file

@ -24,13 +24,13 @@ import org.multiverse.api.exceptions.DeadTransactionException
import java.net.InetSocketAddress
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}
import java.util.{Map => JMap}
import java.lang.reflect.Field
import jsr166x.{Deque, ConcurrentLinkedDeque}
import com.google.protobuf.ByteString
import java.util.concurrent.{ScheduledFuture, ConcurrentHashMap, TimeUnit}
/**
* ActorRef is an immutable and serializable handle to an Actor.
@ -72,7 +72,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
@volatile protected[this] var _isShutDown = false
@volatile protected[akka] var _isBeingRestarted = false
@volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT)
@volatile protected[akka] var _timeoutActor: Option[ActorRef] = None
@volatile protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None
@volatile protected[akka] var startOnCreation = false
@volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false
protected[this] val guard = new ReentrantGuard
@ -559,14 +559,16 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef]
cancelReceiveTimeout
receiveTimeout.foreach { time =>
log.debug("Scheduling timeout for %s", this)
_timeoutActor = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, time, TimeUnit.MILLISECONDS))
_futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, time, TimeUnit.MILLISECONDS))
}
}
protected[akka] def cancelReceiveTimeout = _timeoutActor.foreach { timeoutActor =>
if (timeoutActor.isRunning) Scheduler.unschedule(timeoutActor)
_timeoutActor = None
log.debug("Timeout canceled for %s", this)
protected[akka] def cancelReceiveTimeout = {
if(_futureTimeout.isDefined) {
_futureTimeout.get.cancel(true)
_futureTimeout = None
log.debug("Timeout canceled for %s", this)
}
}
}

View file

@ -23,59 +23,39 @@ import se.scalablesolutions.akka.util.Logging
object Scheduler extends Logging {
import Actor._
case object UnSchedule
case class SchedulerException(msg: String, e: Throwable) extends RuntimeException(msg, e)
private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private val schedulers = new ConcurrentHashMap[ActorRef, ActorRef]
log.info("Starting up Scheduler")
def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ActorRef = {
def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
log.trace(
"Schedule scheduled event\n\tevent = [%s]\n\treceiver = [%s]\n\tinitialDelay = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]",
message, receiver, initialDelay, delay, timeUnit)
try {
val future = service.scheduleAtFixedRate(
service.scheduleAtFixedRate(
new Runnable { def run = receiver ! message },
initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
createAndStoreScheduleActorForFuture(future)
val scheduler = actorOf(new ScheduleActor(future)).start
schedulers.put(scheduler, scheduler)
scheduler
} catch {
case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e)
}
}
def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ActorRef = {
def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
log.trace(
"Schedule one-time event\n\tevent = [%s]\n\treceiver = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]",
message, receiver, delay, timeUnit)
try {
val future = service.schedule(
service.schedule(
new Runnable { def run = receiver ! message }, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
createAndStoreScheduleActorForFuture(future)
} catch {
case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e)
}
}
private def createAndStoreScheduleActorForFuture(future: ScheduledFuture[AnyRef]): ActorRef = {
val scheduler = actorOf(new ScheduleActor(future)).start
schedulers.put(scheduler, scheduler)
scheduler
}
def unschedule(scheduleActor: ActorRef) = {
scheduleActor ! UnSchedule
schedulers.remove(scheduleActor)
}
def shutdown = {
log.info("Shutting down Scheduler")
import scala.collection.JavaConversions._
schedulers.values.foreach(_ ! UnSchedule)
schedulers.clear
service.shutdown
}
@ -86,15 +66,6 @@ object Scheduler extends Logging {
}
}
private class ScheduleActor(future: ScheduledFuture[AnyRef]) extends Actor {
def receive = {
case Scheduler.UnSchedule =>
Scheduler.log.trace("Unschedule event handled by scheduleActor\n\tactorRef = [%s]", self.toString)
future.cancel(true)
self.stop
}
}
private object SchedulerThreadFactory extends ThreadFactory {
private var count = 0
val threadFactory = Executors.defaultThreadFactory()