diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index b83816f4d2..83ac1fc924 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -42,8 +42,9 @@ final class ActiveObjectConfiguration { private[akka] var _host: Option[InetSocketAddress] = None private[akka] var _messageDispatcher: Option[MessageDispatcher] = None - def timeout(timeout: Long) : ActiveObjectConfiguration = { - _timeout = timeout + def timeout = _timeout + def timeout(timeout: Duration) : ActiveObjectConfiguration = { + _timeout = timeout.toMillis this } @@ -181,7 +182,7 @@ object ActiveObject extends Logging { if (config._messageDispatcher.isDefined) { actor.dispatcher = config._messageDispatcher.get } - newInstance(target, actor, config._host, config._timeout) + newInstance(target, actor, config._host, config.timeout) } def newInstance[T](intf: Class[T], target: AnyRef, config: ActiveObjectConfiguration): T = { @@ -189,7 +190,7 @@ object ActiveObject extends Logging { if (config._messageDispatcher.isDefined) { actor.dispatcher = config._messageDispatcher.get } - newInstance(intf, target, actor, config._host, config._timeout) + newInstance(intf, target, actor, config._host, config.timeout) } @deprecated("use newInstance(target: Class[T], config: ActiveObjectConfiguration) instead") diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 244fd8fbc7..b99ce22a93 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -9,9 +9,10 @@ import se.scalablesolutions.akka.config.Config._ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.util.Helpers.{narrow, narrowSilently} -import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.util.{Logging, Duration} import com.google.protobuf.Message + import java.util.concurrent.TimeUnit /** @@ -62,7 +63,7 @@ class ActorInitializationException private[akka](message: String) extends Runtim * @author Jonas Bonér */ object Actor extends Logging { - val TIMEOUT = config.getInt("akka.actor.timeout", 5000) + val TIMEOUT = Duration(config.getInt("akka.actor.timeout", 5), TIME_UNIT).toMillis val SERIALIZE_MESSAGES = config.getBool("akka.actor.serialize-messages", false) /** diff --git a/akka-core/src/main/scala/config/Config.scala b/akka-core/src/main/scala/config/Config.scala index 68842ad1e3..ec44453e59 100644 --- a/akka-core/src/main/scala/config/Config.scala +++ b/akka-core/src/main/scala/config/Config.scala @@ -82,6 +82,8 @@ object Config extends Logging { if (VERSION != CONFIG_VERSION) throw new ConfigurationException( "Akka JAR version [" + VERSION + "] is different than the provided config ('akka.conf') version [" + CONFIG_VERSION + "]") + val TIME_UNIT = config.getString("akka.time-unit", "seconds") + val startTime = System.currentTimeMillis def uptime = (System.currentTimeMillis - startTime) / 1000 } diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index 4c18dcc6c8..2936b73b67 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -7,8 +7,8 @@ package se.scalablesolutions.akka.remote import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ import se.scalablesolutions.akka.actor.{Exit, Actor, ActorRef, RemoteActorRef, IllegalActorStateException} import se.scalablesolutions.akka.dispatch.{DefaultCompletableFuture, CompletableFuture} -import se.scalablesolutions.akka.util.{UUID, Logging} -import se.scalablesolutions.akka.config.Config.config +import se.scalablesolutions.akka.util.{UUID, Logging, Duration} +import se.scalablesolutions.akka.config.Config._ import org.jboss.netty.channel._ import group.DefaultChannelGroup @@ -50,8 +50,8 @@ case class RemoteClientConnected(host: String, port: Int) extends RemoteClientLi * @author Jonas Bonér */ object RemoteClient extends Logging { - val READ_TIMEOUT = config.getInt("akka.remote.client.read-timeout", 10000) - val RECONNECT_DELAY = config.getInt("akka.remote.client.reconnect-delay", 5000) + val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT) + val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT) private val remoteClients = new HashMap[String, RemoteClient] private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]] @@ -249,7 +249,7 @@ class RemoteClientPipelineFactory(name: String, timer: HashedWheelTimer, client: RemoteClient) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { - val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT) + val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt) val lenDec = new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(RemoteReplyProtocol.getDefaultInstance) @@ -338,7 +338,7 @@ class RemoteClientHandler(val name: String, log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) } } - }, RemoteClient.RECONNECT_DELAY, TimeUnit.MILLISECONDS) + }, RemoteClient.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS) } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 4283945de9..9ad33e9f3e 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -13,7 +13,7 @@ import se.scalablesolutions.akka.actor._ import se.scalablesolutions.akka.actor.Actor._ import se.scalablesolutions.akka.util._ import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._ -import se.scalablesolutions.akka.config.Config.config +import se.scalablesolutions.akka.config.Config._ import org.jboss.netty.bootstrap.ServerBootstrap import org.jboss.netty.channel._ @@ -63,7 +63,7 @@ object RemoteServer { val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") val PORT = config.getInt("akka.remote.server.port", 9999) - val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000) + val CONNECTION_TIMEOUT_MILLIS = Duration(config.getInt("akka.remote.server.connection-timeout", 1), TIME_UNIT) val COMPRESSION_SCHEME = config.getString("akka.remote.compression-scheme", "zlib") val ZLIB_COMPRESSION_LEVEL = { @@ -202,7 +202,7 @@ class RemoteServer extends Logging { bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.keepAlive", true) bootstrap.setOption("child.reuseAddress", true) - bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS) + bootstrap.setOption("child.connectTimeoutMillis", RemoteServer.CONNECTION_TIMEOUT_MILLIS.toMillis) openChannels.add(bootstrap.bind(new InetSocketAddress(hostname, port))) _isRunning = true Cluster.registerLocalNode(hostname, port) diff --git a/akka-core/src/main/scala/stm/TransactionFactory.scala b/akka-core/src/main/scala/stm/TransactionFactory.scala index 24c4712509..329928c011 100644 --- a/akka-core/src/main/scala/stm/TransactionFactory.scala +++ b/akka-core/src/main/scala/stm/TransactionFactory.scala @@ -23,7 +23,6 @@ object TransactionConfig { val READONLY = null.asInstanceOf[JBoolean] val MAX_RETRIES = config.getInt("akka.stm.max-retries", 1000) val TIMEOUT = config.getLong("akka.stm.timeout", 10) - val TIME_UNIT = config.getString("akka.stm.time-unit", "seconds") val TRACK_READS = null.asInstanceOf[JBoolean] val WRITE_SKEW = config.getBool("akka.stm.write-skew", true) val EXPLICIT_RETRIES = config.getBool("akka.stm.explicit-retries", false) diff --git a/akka-core/src/main/scala/util/Duration.scala b/akka-core/src/main/scala/util/Duration.scala index f49e1ae04b..0dee2fc139 100644 --- a/akka-core/src/main/scala/util/Duration.scala +++ b/akka-core/src/main/scala/util/Duration.scala @@ -20,8 +20,21 @@ object Duration { /** * Utility for working with java.util.concurrent.TimeUnit durations. + * *

- * Example: + * Examples of usage from Java: + *

+ * import se.scalablesolutions.akka.util.Duration;
+ * import java.util.concurrent.TimeUnit;
+ *
+ * Duration duration = new Duration(100, TimeUnit.MILLISECONDS);
+ * Duration duration = new Duration(5, "seconds");
+ *
+ * duration.toNanos();
+ * 
+ * + *

+ * Examples of usage from Scala: *

  * import se.scalablesolutions.akka.util.Duration
  * import java.util.concurrent.TimeUnit
@@ -31,6 +44,7 @@ object Duration {
  *
  * duration.toNanos
  * 
+ * *

* Implicits are also provided for Int and Long. Example usage: *

@@ -40,6 +54,7 @@ object Duration {
  * 
*/ class Duration(val length: Long, val unit: TimeUnit) { + def this(length: Long, unit: String) = this(length, Duration.timeUnit(unit)) def toNanos = unit.toNanos(length) def toMicros = unit.toMicros(length) def toMillis = unit.toMillis(length) diff --git a/akka-jta/src/main/scala/AtomikosTransactionService.scala b/akka-jta/src/main/scala/AtomikosTransactionService.scala index 937f31a54e..f85ff56e6a 100644 --- a/akka-jta/src/main/scala/AtomikosTransactionService.scala +++ b/akka-jta/src/main/scala/AtomikosTransactionService.scala @@ -10,6 +10,7 @@ import com.atomikos.icatch.jta.{J2eeTransactionManager, J2eeUserTransaction} import com.atomikos.icatch.config.{TSInitInfo, UserTransactionService, UserTransactionServiceImp} import se.scalablesolutions.akka.config.Config._ +import se.scalablesolutions.akka.util.Duration import se.scalablesolutions.akka.stm.{TransactionService, TransactionContainer} object AtomikosTransactionService extends AtomikosTransactionService @@ -20,8 +21,8 @@ object AtomikosTransactionService extends AtomikosTransactionService * @author Jonas Bonér */ class AtomikosTransactionService extends TransactionService with TransactionProtocol { + val JTA_TRANSACTION_TIMEOUT = Duration(config.getInt("akka.jta.timeout", 60), TIME_UNIT) - val JTA_TRANSACTION_TIMEOUT: Int = config.getInt("akka.jta.timeout", 60000) / 1000 private val txService: UserTransactionService = new UserTransactionServiceImp private val info: TSInitInfo = txService.createTSInitInfo @@ -29,10 +30,11 @@ class AtomikosTransactionService extends TransactionService with TransactionProt try { txService.init(info) val tm: TransactionManager = new J2eeTransactionManager - tm.setTransactionTimeout(JTA_TRANSACTION_TIMEOUT) + tm.setTransactionTimeout(JTA_TRANSACTION_TIMEOUT.toSeconds.toInt) tm } catch { - case e => throw new SystemException("Could not create a new Atomikos J2EE Transaction Manager, due to: " + e.toString) + case e => throw new SystemException( + "Could not create a new Atomikos J2EE Transaction Manager, due to: " + e.toString) } ))) // TODO: gracefully shutdown of the TM diff --git a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala index 6f62c5a8c4..7e33d0176a 100644 --- a/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala +++ b/akka-spring/src/main/scala/ActiveObjectFactoryBean.scala @@ -22,7 +22,7 @@ import org.springframework.util.StringUtils import se.scalablesolutions.akka.actor.{ActiveObjectConfiguration, ActiveObject} import se.scalablesolutions.akka.config.ScalaConfig.{ShutdownCallback, RestartCallbacks} import se.scalablesolutions.akka.dispatch.MessageDispatcher -import se.scalablesolutions.akka.util.Logging +import se.scalablesolutions.akka.util.{Logging, Duration} /** * Factory bean for active objects. @@ -167,7 +167,7 @@ class ActiveObjectFactoryBean extends AbstractFactoryBean[AnyRef] with Logging w private[akka] def createConfig: ActiveObjectConfiguration = { - val config = new ActiveObjectConfiguration().timeout(timeout) + val config = new ActiveObjectConfiguration().timeout(Duration(timeout, "millis")) if (hasRestartCallbacks) config.restartCallbacks(pre, post) if (hasShutdownCallback) config.shutdownCallback(shutdown) if (transactional) config.makeTransactionRequired diff --git a/config/akka-reference.conf b/config/akka-reference.conf index c504598e31..a3cb3467b0 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -13,15 +13,17 @@ log { # syslog_host = "" # syslog_server_name = "" - akka { + akka { # example of package level logging settings node = "se.scalablesolutions.akka" - level = "info" + level = "debug" } } akka { version = "0.10" + time-unit = "seconds" # default timeout time unit for all timeout properties throughout the config + # FQN (Fully Qualified Name) to the class doing initial active object/actor # supervisor bootstrap, should be defined in default constructor boot = ["sample.camel.Boot", @@ -30,7 +32,7 @@ akka { "sample.security.Boot"] actor { - timeout = 5000 # default timeout for future based invocations + timeout = 5 # default timeout for future based invocations serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability throughput = 5 # default throughput for ExecutorBasedEventDrivenDispatcher } @@ -41,15 +43,13 @@ akka { # begin (or join), commit or rollback the JTA transaction. Default is 'off'. timeout = 5 # default timeout for blocking transactions and transaction set (in unit defined by # the time-unit property) - # FIXME: use 'time-unit' for all timeouts - time-unit = "seconds" # default timeout time unit } jta { provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI) # "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta', # e.g. you need the akka-jta JARs on classpath). - timeout = 60000 + timeout = 60 } rest { @@ -85,12 +85,12 @@ akka { service = on hostname = "localhost" port = 9999 - connection-timeout = 1000 # in millis (1 sec default) + connection-timeout = 1 } client { - reconnect-delay = 5000 # in millis (5 sec default) - read-timeout = 10000 # in millis (10 sec default) + reconnect-delay = 5 + read-timeout = 10 } }