From 12b9af25cfbb1c65ccf2b678afa5942e049e4c8b Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Fri, 1 Jun 2012 21:29:47 +0200 Subject: [PATCH] #2168 - Exposing more Netty options in remtoe config --- .../akka/pattern/CircuitBreakerMTSpec.scala | 72 +++++++++---------- .../scala/akka/pattern/CircuitBreaker.scala | 2 +- .../CircuitBreakerDocSpec.scala | 14 ++-- .../actor/mailbox/DurableMailboxDocSpec.scala | 6 +- akka-remote/src/main/resources/reference.conf | 9 +++ .../main/scala/akka/remote/netty/Client.scala | 6 ++ .../main/scala/akka/remote/netty/Server.scala | 6 ++ .../scala/akka/remote/netty/Settings.scala | 12 ++++ .../scala/akka/remote/RemoteConfigSpec.scala | 3 + 9 files changed, 83 insertions(+), 47 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala index fab1cbab7a..35f55d703d 100644 --- a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerMTSpec.scala @@ -6,7 +6,7 @@ package akka.pattern import akka.testkit._ import akka.util.duration._ import org.scalatest.BeforeAndAfter -import akka.dispatch.{Promise, Await, Future} +import akka.dispatch.{ Promise, Await, Future } class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter { @@ -17,8 +17,8 @@ class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter { val halfOpenLatch = new TestLatch(1) - val breaker = new CircuitBreaker(system.scheduler,5,100.millis.dilated,500.millis.dilated) - .onHalfOpen(halfOpenLatch.countDown()) + val breaker = new CircuitBreaker(system.scheduler, 5, 100.millis.dilated, 500.millis.dilated) + .onHalfOpen(halfOpenLatch.countDown()) } @@ -28,30 +28,30 @@ class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter { def unreliableCall(param: String) = { param match { - case "fail" => throw new RuntimeException("FAIL") - case _ => param + case "fail" ⇒ throw new RuntimeException("FAIL") + case _ ⇒ param } } def openBreaker: Unit = { - for (i <- 1 to 5) + for (i ← 1 to 5) Await.result(breakers.breaker.withCircuitBreaker(Future(unreliableCall("fail"))) recoverWith { - case _ => Promise.successful("OK") + case _ ⇒ Promise.successful("OK") }, 1.second.dilated) } "A circuit breaker being called by many threads" must { "allow many calls while in closed state with no errors" in { - val futures = for (i <- 1 to 100) yield breakers.breaker.withCircuitBreaker(Future {Thread.sleep(10); unreliableCall("succeed")}) + val futures = for (i ← 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { Thread.sleep(10); unreliableCall("succeed") }) val futureList = Future.sequence(futures) val result = Await.result(futureList, 1.second.dilated) - result.size must be (100) - result.distinct.size must be (1) - result.distinct must contain ("succeed") + result.size must be(100) + result.distinct.size must be(1) + result.distinct must contain("succeed") } @@ -59,19 +59,19 @@ class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter { openBreaker - val futures = for (i <- 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { - Thread.sleep(10); unreliableCall("success") - }) recoverWith { - case _: CircuitBreakerOpenException => Promise.successful("CBO") - } + val futures = for (i ← 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { + Thread.sleep(10); unreliableCall("success") + }) recoverWith { + case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO") + } val futureList = Future.sequence(futures) val result = Await.result(futureList, 1.second.dilated) - result.size must be (100) - result.distinct.size must be (1) - result.distinct must contain ("CBO") + result.size must be(100) + result.distinct.size must be(1) + result.distinct must contain("CBO") } "allow a single call through in half-open state" in { @@ -79,20 +79,20 @@ class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter { Await.ready(breakers.halfOpenLatch, 2.seconds.dilated) - val futures = for (i <- 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { - Thread.sleep(10); unreliableCall("succeed") - }) recoverWith { - case _: CircuitBreakerOpenException => Promise.successful("CBO") - } + val futures = for (i ← 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { + Thread.sleep(10); unreliableCall("succeed") + }) recoverWith { + case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO") + } val futureList = Future.sequence(futures) val result = Await.result(futureList, 1.second.dilated) - result.size must be (100) - result.distinct.size must be (2) - result.distinct must contain ("succeed") - result.distinct must contain ("CBO") + result.size must be(100) + result.distinct.size must be(2) + result.distinct must contain("succeed") + result.distinct must contain("CBO") } "recover and reset the breaker after the reset timeout" in { @@ -102,19 +102,19 @@ class CircuitBreakerMTSpec extends AkkaSpec with BeforeAndAfter { Await.ready(breakers.breaker.withCircuitBreaker(Future(unreliableCall("succeed"))), 1.second.dilated) - val futures = for (i <- 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { - Thread.sleep(10); unreliableCall("succeed") - }) recoverWith { - case _: CircuitBreakerOpenException => Promise.successful("CBO") - } + val futures = for (i ← 1 to 100) yield breakers.breaker.withCircuitBreaker(Future { + Thread.sleep(10); unreliableCall("succeed") + }) recoverWith { + case _: CircuitBreakerOpenException ⇒ Promise.successful("CBO") + } val futureList = Future.sequence(futures) val result = Await.result(futureList, 1.second.dilated) - result.size must be (100) - result.distinct.size must be (1) - result.distinct must contain ("succeed") + result.size must be(100) + result.distinct.size must be(1) + result.distinct must contain("succeed") } } diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index 79eba6aa1b..ac8fd1c5ed 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -142,7 +142,7 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Durati catch { case NonFatal(t) ⇒ Promise.failed(t)(CircuitBreaker.syncExecutionContext) } - }),callTimeout) + }), callTimeout) } /** diff --git a/akka-docs/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala b/akka-docs/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala index bd6c1447ad..c4603017e3 100644 --- a/akka-docs/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala +++ b/akka-docs/common/code/docs/circuitbreaker/CircuitBreakerDocSpec.scala @@ -5,7 +5,7 @@ package docs.circuitbreaker //#imports1 -import akka.util.duration._ // small d is important here +import akka.util.duration._ // small d is important here import akka.pattern.CircuitBreaker import akka.actor.Actor import akka.dispatch.Future @@ -13,7 +13,7 @@ import akka.event.Logging //#imports1 -class CircuitBreakerDocSpec { } +class CircuitBreakerDocSpec {} //#circuit-breaker-initialization class DangerousActor extends Actor { @@ -26,18 +26,18 @@ class DangerousActor extends Actor { def notifyMeOnOpen = log.warning("My CircuitBreaker is now open, and will not close for one minute") -//#circuit-breaker-initialization + //#circuit-breaker-initialization -//#circuit-breaker-usage + //#circuit-breaker-usage def dangerousCall: String = "This really isn't that dangerous of a call after all" def receive = { - case "is my middle name" => + case "is my middle name" ⇒ sender ! breaker.withCircuitBreaker(Future(dangerousCall)) - case "block for me" => + case "block for me" ⇒ sender ! breaker.withSyncCircuitBreaker(dangerousCall) } -//#circuit-breaker-usage + //#circuit-breaker-usage } diff --git a/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala b/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala index 54349f73e0..b51c7bb170 100644 --- a/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala +++ b/akka-docs/modules/code/docs/actor/mailbox/DurableMailboxDocSpec.scala @@ -69,21 +69,21 @@ class MyMessageQueue(_owner: ActorContext) val storage = new QueueStorage // A real-world implmentation would use configuration to set the last // three parameters below - val breaker = CircuitBreaker(_owner.system.scheduler,5,30.seconds,1.minute) + val breaker = CircuitBreaker(_owner.system.scheduler, 5, 30.seconds, 1.minute) def enqueue(receiver: ActorRef, envelope: Envelope): Unit = breaker.withSyncCircuitBreaker { val data: Array[Byte] = serialize(envelope) storage.push(data) } - def dequeue(): Envelope = breaker.withSyncCircuitBreaker { + def dequeue(): Envelope = breaker.withSyncCircuitBreaker { val data: Option[Array[Byte]] = storage.pull() data.map(deserialize).orNull } def hasMessages: Boolean = breaker.withSyncCircuitBreaker { !storage.isEmpty } - def numberOfMessages: Int = breaker.withSyncCircuitBreaker { storage.size } + def numberOfMessages: Int = breaker.withSyncCircuitBreaker { storage.size } /** * Called when the mailbox is disposed. diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 97b85895ed..1ed3a274e9 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -133,6 +133,15 @@ akka { # (I) Maximum total size of all channels, 0 for off max-total-memory-size = 0b + # (I&O) Sets the high water mark for the in and outbound sockets, set to 0b for platform default + write-buffer-high-water-mark = 0b + + # (I&O) Sets the send buffer size of the Sockets, set to 0b for platform default + send-buffer-size = 0b + + # (I&O) Sets the receive buffer size of the Sockets, set to 0b for platform default + receive-buffer-size = 0b + # (O) Time between reconnect attempts for active clients reconnect-delay = 5s diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index c1737831da..cbd49fc202 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -147,6 +147,12 @@ private[akka] class ActiveRemoteClient private[akka] ( b.setOption("tcpNoDelay", true) b.setOption("keepAlive", true) b.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis) + if (settings.ReceiveBufferSize.isDefined) + b.setOption("receiveBufferSize", settings.ReceiveBufferSize.get) + if (settings.SendBufferSize.isDefined) + b.setOption("sendBufferSize", settings.SendBufferSize.get) + if (settings.WriteBufferHighWaterMark.isDefined) + b.setOption("writeBufferHighWaterMark", settings.WriteBufferHighWaterMark.get) settings.OutboundLocalAddress.foreach(s ⇒ b.setOption("localAddress", new InetSocketAddress(s, 0))) bootstrap = b diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index cc3310fada..170f7ebae1 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -45,6 +45,12 @@ private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) { b.setOption("tcpNoDelay", true) b.setOption("child.keepAlive", true) b.setOption("reuseAddress", true) + if (settings.ReceiveBufferSize.isDefined) + b.setOption("receiveBufferSize", settings.ReceiveBufferSize.get) + if (settings.SendBufferSize.isDefined) + b.setOption("sendBufferSize", settings.SendBufferSize.get) + if (settings.WriteBufferHighWaterMark.isDefined) + b.setOption("writeBufferHighWaterMark", settings.WriteBufferHighWaterMark.get) b } diff --git a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala index 64bc184408..a78703ebf5 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala @@ -37,8 +37,20 @@ private[akka] class NettySettings(config: Config, val systemName: String) { val WriteTimeout: Duration = Duration(getMilliseconds("write-timeout"), MILLISECONDS) val AllTimeout: Duration = Duration(getMilliseconds("all-timeout"), MILLISECONDS) val ReconnectDelay: Duration = Duration(getMilliseconds("reconnect-delay"), MILLISECONDS) + val MessageFrameSize: Int = getBytes("message-frame-size").toInt + private[this] def optionSize(s: String): Option[Int] = getBytes(s).toInt match { + case 0 ⇒ None + case x if x < 0 ⇒ + throw new ConfigurationException("Setting '%s' must be 0 or positive (and fit in an Int)" format s) + case other ⇒ Some(other) + } + + val WriteBufferHighWaterMark: Option[Int] = optionSize("write-buffer-high-water-mark") + val SendBufferSize: Option[Int] = optionSize("send-buffer-size") + val ReceiveBufferSize: Option[Int] = optionSize("receive-buffer-size") + val Hostname: String = getString("hostname") match { case "" ⇒ InetAddress.getLocalHost.getHostAddress case value ⇒ value diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index f1809d42a5..5a9e79d67f 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -56,6 +56,9 @@ class RemoteConfigSpec extends AkkaSpec( WriteTimeout must be(10 seconds) AllTimeout must be(0 millis) ReconnectionTimeWindow must be(10 minutes) + WriteBufferHighWaterMark must be(None) + SendBufferSize must be(None) + ReceiveBufferSize must be(None) } }