Now uses 'Duration' for all time properties in config

This commit is contained in:
Jonas Bonér 2010-07-23 04:54:21 +02:00
parent bc29b0ef2e
commit 7ddc553176
10 changed files with 51 additions and 31 deletions

View file

@ -42,8 +42,9 @@ final class ActiveObjectConfiguration {
private[akka] var _host: Option[InetSocketAddress] = None
private[akka] var _messageDispatcher: Option[MessageDispatcher] = None
def timeout(timeout: Long) : ActiveObjectConfiguration = {
_timeout = timeout
def timeout = _timeout
def timeout(timeout: Duration) : ActiveObjectConfiguration = {
_timeout = timeout.toMillis
this
}
@ -181,7 +182,7 @@ object ActiveObject extends Logging {
if (config._messageDispatcher.isDefined) {
actor.dispatcher = config._messageDispatcher.get
}
newInstance(target, actor, config._host, config._timeout)
newInstance(target, actor, config._host, config.timeout)
}
def newInstance[T](intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration): T = {
@ -189,7 +190,7 @@ object ActiveObject extends Logging {
if (config._messageDispatcher.isDefined) {
actor.dispatcher = config._messageDispatcher.get
}
newInstance(intf, target, actor, config._host, config._timeout)
newInstance(intf, target, actor, config._host, config.timeout)
}
@deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead")

View file

@ -9,9 +9,10 @@ import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.Helpers.{narrow, narrowSilently}
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.util.{Logging, Duration}
import com.google.protobuf.Message
import java.util.concurrent.TimeUnit
/**
@ -62,7 +63,7 @@ class ActorInitializationException private[akka](message: String) extends Runtim
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Actor extends Logging {
val TIMEOUT = config.getInt("akka.actor.timeout", 5000)
val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis
val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false)
/**

View file

@ -82,6 +82,8 @@ object Config extends Logging {
if (VERSION != CONFIG_VERSION) throw new ConfigurationException(
"Akka JAR version [" + VERSION + "] is different than the provided config ('akka.conf') version [" + CONFIG_VERSION + "]")
val TIME_UNIT = config.getString("akka.time-unit", "seconds")
val startTime = System.currentTimeMillis
def uptime = (System.currentTimeMillis - startTime) / 1000
}

View file

@ -7,8 +7,8 @@ package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, RemoteActorRef, IllegalActorStateException}
import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import se.scalablesolutions.akka.util.{UUID, Logging}
import se.scalablesolutions.akka.config.Config.config
import se.scalablesolutions.akka.util.{UUID, Logging, Duration}
import se.scalablesolutions.akka.config.Config._
import org.jboss.netty.channel._
import group.DefaultChannelGroup
@ -50,8 +50,8 @@ case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLi
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteClient extends Logging {
val READ_TIMEOUT = config.getInt("akka.remote.client.read-timeout", 10000)
val RECONNECT_DELAY = config.getInt("akka.remote.client.reconnect-delay", 5000)
val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT)
val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT)
private val remoteClients = new HashMap[String, RemoteClient]
private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]]
@ -249,7 +249,7 @@ class RemoteClientPipelineFactory(name: String,
timer: HashedWheelTimer,
client: RemoteClient) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT)
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt)
val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance)
@ -338,7 +338,7 @@ class RemoteClientHandler(val name: String,
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
}
}
}, RemoteClient.RECONNECT_DELAY, TimeUnit.MILLISECONDS)
}, RemoteClient.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS)
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {

View file

@ -13,7 +13,7 @@ import se.scalablesolutions.akka.actor._
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.util._
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
import se.scalablesolutions.akka.config.Config.config
import se.scalablesolutions.akka.config.Config._
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel._
@ -63,7 +63,7 @@ object RemoteServer {
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)
val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000)
val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.remote.server.connection-timeout", 1), TIME_UNIT)
val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib")
val ZLIB_COMPRESSION_LEVEL = {
@ -202,7 +202,7 @@ class RemoteServer extends Logging {
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS)
bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis)
openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port)))
_isRunning = true
Cluster.registerLocalNode(hostname, port)

View file

@ -23,7 +23,6 @@ object TransactionConfig {
val READONLY = null.asInstanceOf[JBoolean]
val MAX_RETRIES = config.getInt("akka.stm.max-retries", 1000)
val TIMEOUT = config.getLong("akka.stm.timeout", 10)
val TIME_UNIT = config.getString("akka.stm.time-unit", "seconds")
val TRACK_READS = null.asInstanceOf[JBoolean]
val WRITE_SKEW = config.getBool("akka.stm.write-skew", true)
val EXPLICIT_RETRIES = config.getBool("akka.stm.explicit-retries", false)

View file

@ -20,8 +20,21 @@ object Duration {
/**
* Utility for working with java.util.concurrent.TimeUnit durations.
*
* <p/>
* Example:
* Examples of usage from Java:
* <pre>
* import se.scalablesolutions.akka.util.Duration;
* import java.util.concurrent.TimeUnit;
*
* Duration duration = new Duration(100, TimeUnit.MILLISECONDS);
* Duration duration = new Duration(5, "seconds");
*
* duration.toNanos();
* </pre>
*
* <p/>
* Examples of usage from Scala:
* <pre>
* import se.scalablesolutions.akka.util.Duration
* import java.util.concurrent.TimeUnit
@ -31,6 +44,7 @@ object Duration {
*
* duration.toNanos
* </pre>
*
* <p/>
* Implicits are also provided for Int and Long. Example usage:
* <pre>
@ -40,6 +54,7 @@ object Duration {
* </pre>
*/
class Duration(val length: Long, val unit: TimeUnit) {
def this(length: Long, unit: String) = this(length, Duration.timeUnit(unit))
def toNanos = unit.toNanos(length)
def toMicros = unit.toMicros(length)
def toMillis = unit.toMillis(length)

View file

@ -10,6 +10,7 @@ import com.atomikos.icatch.jta.{J2eeTransactionManager, J2eeUserTransaction}
import com.atomikos.icatch.config.{TSInitInfo, UserTransactionService, UserTransactionServiceImp}
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.util.Duration
import se.scalablesolutions.akka.stm.{TransactionService, TransactionContainer}
object AtomikosTransactionService extends AtomikosTransactionService
@ -20,8 +21,8 @@ object AtomikosTransactionService extends AtomikosTransactionService
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class AtomikosTransactionService extends TransactionService with TransactionProtocol {
val JTA_TRANSACTION_TIMEOUT = Duration(config.getInt("akka.jta.timeout", 60), TIME_UNIT)
val JTA_TRANSACTION_TIMEOUT: Int = config.getInt("akka.jta.timeout", 60000) / 1000
private val txService: UserTransactionService = new UserTransactionServiceImp
private val info: TSInitInfo = txService.createTSInitInfo
@ -29,10 +30,11 @@ class AtomikosTransactionService extends TransactionService with TransactionProt
try {
txService.init(info)
val tm: TransactionManager = new J2eeTransactionManager
tm.setTransactionTimeout(JTA_TRANSACTION_TIMEOUT)
tm.setTransactionTimeout(JTA_TRANSACTION_TIMEOUT.toSeconds.toInt)
tm
} catch {
case e => throw new SystemException("Could not create a new Atomikos J2EE Transaction Manager, due to: " + e.toString)
case e => throw new SystemException(
"Could not create a new Atomikos J2EE Transaction Manager, due to: " + e.toString)
}
)))
// TODO: gracefully shutdown of the TM

View file

@ -22,7 +22,7 @@ import org.springframework.util.StringUtils
import se.scalablesolutions.akka.actor.{ActiveObjectConfiguration, ActiveObject}
import se.scalablesolutions.akka.config.ScalaConfig.{ShutdownCallback, RestartCallbacks}
import se.scalablesolutions.akka.dispatch.MessageDispatcher
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.util.{Logging, Duration}
/**
* Factory bean for active objects.
@ -167,7 +167,7 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging w
private[akka] def createConfig: ActiveObjectConfiguration = {
val config = new ActiveObjectConfiguration().timeout(timeout)
val config = new ActiveObjectConfiguration().timeout(Duration(timeout, "millis"))
if (hasRestartCallbacks) config.restartCallbacks(pre, post)
if (hasShutdownCallback) config.shutdownCallback(shutdown)
if (transactional) config.makeTransactionRequired

View file

@ -13,15 +13,17 @@ log {
# syslog_host = ""
# syslog_server_name = ""
akka {
akka { # example of package level logging settings
node = "se.scalablesolutions.akka"
level = "info"
level = "debug"
}
}
akka {
version = "0.10"
time-unit = "seconds" # default timeout time unit for all timeout properties throughout the config
# FQN (Fully Qualified Name) to the class doing initial active object/actor
# supervisor bootstrap, should be defined in default constructor
boot = ["sample.camel.Boot",
@ -30,7 +32,7 @@ akka {
"sample.security.Boot"]
actor {
timeout = 5000 # default timeout for future based invocations
timeout = 5 # default timeout for future based invocations
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
throughput = 5 # default throughput for ExecutorBasedEventDrivenDispatcher
}
@ -41,15 +43,13 @@ akka {
# begin (or join), commit or rollback the JTA transaction. Default is 'off'.
timeout = 5 # default timeout for blocking transactions and transaction set (in unit defined by
# the time-unit property)
# FIXME: use 'time-unit' for all timeouts
time-unit = "seconds" # default timeout time unit
}
jta {
provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI)
# "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta',
# e.g. you need the akka-jta JARs on classpath).
timeout = 60000
timeout = 60
}
rest {
@ -85,12 +85,12 @@ akka {
service = on
hostname = "localhost"
port = 9999
connection-timeout = 1000 # in millis (1 sec default)
connection-timeout = 1
}
client {
reconnect-delay = 5000 # in millis (5 sec default)
read-timeout = 10000 # in millis (10 sec default)
reconnect-delay = 5
read-timeout = 10
}
}