From 198fdcf93fbeede17bfefc220bf7cc737707a3ea Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Fri, 8 May 2015 14:02:55 +0200 Subject: [PATCH] =act #17415 fix concurrency bug in CircuitBreaker * transition did not account for concurrent transistions (cherry picked from commit 2a73e77f66202b1bd5a5918329b9dd34448ce7b6) --- .../pattern/CircuitBreakerStressSpec.scala | 83 +++++++++++++++++++ .../scala/akka/pattern/CircuitBreaker.scala | 6 +- 2 files changed, 86 insertions(+), 3 deletions(-) create mode 100644 akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerStressSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerStressSpec.scala b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerStressSpec.scala new file mode 100644 index 0000000000..df14553347 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/pattern/CircuitBreakerStressSpec.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.pattern + +import scala.concurrent.Promise +import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.Props +import akka.actor.Status.Failure +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import java.util.concurrent.TimeoutException + +object CircuitBreakerStressSpec { + case object JobDone + case object GetResult + case class Result(doneCount: Int, timeoutCount: Int, failCount: Int, circCount: Int) + + class StressActor(breaker: CircuitBreaker) extends Actor with ActorLogging with PipeToSupport { + import context.dispatcher + + private var doneCount = 0 + private var timeoutCount = 0 + private var failCount = 0 + private var circCount = 0 + + private def job = { + val promise = Promise[JobDone.type]() + + context.system.scheduler.scheduleOnce(ThreadLocalRandom.current.nextInt(300).millisecond) { + promise.success(JobDone) + } + + promise.future + } + + override def receive = { + case JobDone ⇒ + doneCount += 1 + breaker.withCircuitBreaker(job).pipeTo(self) + case Failure(ex: CircuitBreakerOpenException) ⇒ + circCount += 1 + breaker.withCircuitBreaker(job).pipeTo(self) + case Failure(_: TimeoutException) ⇒ + timeoutCount += 1 + breaker.withCircuitBreaker(job).pipeTo(self) + case Failure(_) ⇒ + failCount += 1 + breaker.withCircuitBreaker(job).pipeTo(self) + case GetResult ⇒ + sender() ! Result(doneCount, timeoutCount, failCount, circCount) + } + } +} + +// reproducer for issue #17415 +class CircuitBreakerStressSpec extends AkkaSpec with ImplicitSender { + import CircuitBreakerStressSpec._ + + muteDeadLetters(classOf[AnyRef])(system) + + "A CircuitBreaker" in { + val breaker = CircuitBreaker(system.scheduler, 5, 200.millisecond, 200.seconds) + val stressActors = Vector.fill(3) { + system.actorOf(Props(classOf[StressActor], breaker)) + } + for (_ ← 0 to 1000; a ← stressActors) { + a ! JobDone + } + // let them work for a while + Thread.sleep(3000) + stressActors.foreach { a ⇒ + a ! GetResult + val result = expectMsgType[Result] + result.failCount should be(0) + } + + } + +} diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala index d6c02f2fee..a0be3ff2f6 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreaker.scala @@ -224,11 +224,11 @@ class CircuitBreaker(scheduler: Scheduler, maxFailures: Int, callTimeout: Finite * @param fromState State being transitioning from * @param toState State being transitioning from */ - private def transition(fromState: State, toState: State): Unit = + private def transition(fromState: State, toState: State): Unit = { if (swapState(fromState, toState)) toState.enter() - else - throw new IllegalStateException("Illegal transition attempted from: " + fromState + " to " + toState) + // else some other thread already swapped state + } /** * Trips breaker to an open state. This is valid from Closed or Half-Open states.