update to Aeron 1.2.5, and fix the SharedMediaDriverSupport
This commit is contained in:
parent
6d22dd1ae2
commit
8e57304c7d
2 changed files with 18 additions and 13 deletions
|
|
@ -19,6 +19,7 @@ import com.typesafe.config.ConfigFactory
|
||||||
import io.aeron.driver.MediaDriver
|
import io.aeron.driver.MediaDriver
|
||||||
import io.aeron.driver.ThreadingMode
|
import io.aeron.driver.ThreadingMode
|
||||||
import org.agrona.IoUtil
|
import org.agrona.IoUtil
|
||||||
|
import io.aeron.CommonContext
|
||||||
|
|
||||||
object SharedMediaDriverSupport {
|
object SharedMediaDriverSupport {
|
||||||
|
|
||||||
|
|
@ -32,16 +33,6 @@ object SharedMediaDriverSupport {
|
||||||
if (arterySettings.Enabled) {
|
if (arterySettings.Enabled) {
|
||||||
val aeronDir = arterySettings.Advanced.AeronDirectoryName
|
val aeronDir = arterySettings.Advanced.AeronDirectoryName
|
||||||
require(aeronDir.nonEmpty, "aeron-dir must be defined")
|
require(aeronDir.nonEmpty, "aeron-dir must be defined")
|
||||||
val driverContext = new MediaDriver.Context
|
|
||||||
driverContext.aeronDirectoryName(aeronDir)
|
|
||||||
driverContext.clientLivenessTimeoutNs(arterySettings.Advanced.ClientLivenessTimeout.toNanos)
|
|
||||||
driverContext.imageLivenessTimeoutNs(arterySettings.Advanced.ImageLivenessTimeout.toNanos)
|
|
||||||
driverContext.driverTimeoutMs(arterySettings.Advanced.DriverTimeout.toMillis)
|
|
||||||
|
|
||||||
val idleCpuLevel = arterySettings.Advanced.IdleCpuLevel
|
|
||||||
driverContext
|
|
||||||
.threadingMode(ThreadingMode.SHARED)
|
|
||||||
.sharedIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
|
|
||||||
|
|
||||||
// Check if the media driver is already started by another multi-node jvm.
|
// Check if the media driver is already started by another multi-node jvm.
|
||||||
// It checks more than one time with a sleep inbetween. The number of checks
|
// It checks more than one time with a sleep inbetween. The number of checks
|
||||||
|
|
@ -49,11 +40,15 @@ object SharedMediaDriverSupport {
|
||||||
@tailrec def isDriverInactive(i: Int): Boolean = {
|
@tailrec def isDriverInactive(i: Int): Boolean = {
|
||||||
if (i < 0) true
|
if (i < 0) true
|
||||||
else {
|
else {
|
||||||
val active = driverContext.isDriverActive(5000, new Consumer[String] {
|
val active = try CommonContext.isDriverActive(new File(aeronDir), 5000, new Consumer[String] {
|
||||||
override def accept(msg: String): Unit = {
|
override def accept(msg: String): Unit = {
|
||||||
println(msg)
|
println(msg)
|
||||||
}
|
}
|
||||||
})
|
}) catch {
|
||||||
|
case NonFatal(e) ⇒
|
||||||
|
println(e.getMessage)
|
||||||
|
false
|
||||||
|
}
|
||||||
if (active) false
|
if (active) false
|
||||||
else {
|
else {
|
||||||
Thread.sleep(500)
|
Thread.sleep(500)
|
||||||
|
|
@ -64,6 +59,16 @@ object SharedMediaDriverSupport {
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (isDriverInactive(MultiNodeSpec.selfIndex)) {
|
if (isDriverInactive(MultiNodeSpec.selfIndex)) {
|
||||||
|
val driverContext = new MediaDriver.Context
|
||||||
|
driverContext.aeronDirectoryName(aeronDir)
|
||||||
|
driverContext.clientLivenessTimeoutNs(arterySettings.Advanced.ClientLivenessTimeout.toNanos)
|
||||||
|
driverContext.imageLivenessTimeoutNs(arterySettings.Advanced.ImageLivenessTimeout.toNanos)
|
||||||
|
driverContext.driverTimeoutMs(arterySettings.Advanced.DriverTimeout.toMillis)
|
||||||
|
val idleCpuLevel = arterySettings.Advanced.IdleCpuLevel
|
||||||
|
driverContext
|
||||||
|
.threadingMode(ThreadingMode.SHARED)
|
||||||
|
.sharedIdleStrategy(TaskRunner.createIdleStrategy(idleCpuLevel))
|
||||||
|
|
||||||
val driver = MediaDriver.launchEmbedded(driverContext)
|
val driver = MediaDriver.launchEmbedded(driverContext)
|
||||||
println(s"Started media driver in directory [${driver.aeronDirectoryName}]")
|
println(s"Started media driver in directory [${driver.aeronDirectoryName}]")
|
||||||
if (!mediaDriver.compareAndSet(None, Some(driver))) {
|
if (!mediaDriver.compareAndSet(None, Some(driver))) {
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ object Dependencies {
|
||||||
val sslConfigVersion = "0.2.1"
|
val sslConfigVersion = "0.2.1"
|
||||||
val slf4jVersion = "1.7.23"
|
val slf4jVersion = "1.7.23"
|
||||||
val scalaXmlVersion = "1.0.6"
|
val scalaXmlVersion = "1.0.6"
|
||||||
val aeronVersion = "1.2.3"
|
val aeronVersion = "1.2.5"
|
||||||
|
|
||||||
val Versions = Seq(
|
val Versions = Seq(
|
||||||
crossScalaVersions := Seq("2.11.8", "2.12.1"),
|
crossScalaVersions := Seq("2.11.8", "2.12.1"),
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue