#3405 - fix race between consumer and producer for MPSC
This commit is contained in:
parent
7c8f3ade69
commit
1f694a3612
2 changed files with 50 additions and 6 deletions
|
|
@ -7,11 +7,12 @@ import language.postfixOps
|
|||
|
||||
import java.util.concurrent.{ ConcurrentLinkedQueue, BlockingQueue }
|
||||
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll }
|
||||
import com.typesafe.config.Config
|
||||
import akka.actor.{ RepointableRef, Props, DeadLetter, ActorSystem, ActorRefWithCell, ActorRef, ActorCell }
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
import akka.actor._
|
||||
import akka.testkit.{ EventFilter, AkkaSpec }
|
||||
import scala.concurrent.{ Future, Promise, Await, ExecutionContext }
|
||||
import scala.concurrent.{ Future, Await, ExecutionContext }
|
||||
import scala.concurrent.duration._
|
||||
import akka.dispatch.{ UnboundedMailbox, BoundedMailbox, SingleConsumerOnlyUnboundedMailbox }
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAndAfterEach {
|
||||
|
|
@ -236,3 +237,41 @@ class SingleConsumerOnlyMailboxSpec extends MailboxSpec {
|
|||
case b: BoundedMailbox ⇒ pending; null
|
||||
}
|
||||
}
|
||||
|
||||
object SingleConsumerOnlyMailboxVerificationSpec {
|
||||
case object Ping
|
||||
val mailboxConf = ConfigFactory.parseString("""
|
||||
test-dispatcher {
|
||||
mailbox-type = "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
|
||||
throughput = 1
|
||||
}""")
|
||||
}
|
||||
|
||||
class SingleConsumerOnlyMailboxVerificationSpec extends AkkaSpec(SingleConsumerOnlyMailboxVerificationSpec.mailboxConf) {
|
||||
import SingleConsumerOnlyMailboxVerificationSpec.Ping
|
||||
"A SingleConsumerOnlyMailbox" should {
|
||||
"support pathological ping-ponging" in within(30.seconds) {
|
||||
val total = 2000000
|
||||
val runner = system.actorOf(Props(new Actor {
|
||||
val a, b = context.watch(
|
||||
context.actorOf(Props(new Actor {
|
||||
var n = total / 2
|
||||
def receive = {
|
||||
case Ping ⇒
|
||||
n -= 1
|
||||
sender ! Ping
|
||||
if (n == 0)
|
||||
context stop self
|
||||
}
|
||||
}).withDispatcher("test-dispatcher")))
|
||||
def receive = {
|
||||
case Ping ⇒ a.tell(Ping, b)
|
||||
case Terminated(`a` | `b`) ⇒ if (context.children.isEmpty) context stop self
|
||||
}
|
||||
}))
|
||||
watch(runner)
|
||||
runner ! Ping
|
||||
expectTerminated(runner)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue