Fix java leases usage from java and scala #28685
Leases are always stored as a ScalaLease and the Java LeaseProvider unwraps the adapter. It is important that the same lease can be used from java and scala for mixed langugae code bases.
This commit is contained in:
parent
d392e6a7f1
commit
27da0a23a9
6 changed files with 108 additions and 35 deletions
|
|
@ -0,0 +1,2 @@
|
||||||
|
# Wasn't meant for user extension
|
||||||
|
ProblemFilters.exclude[FinalClassProblem]("akka.coordination.lease.scaladsl.LeaseProvider")
|
||||||
|
|
@ -10,18 +10,19 @@ import java.util.function.Consumer
|
||||||
|
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.coordination.lease.LeaseSettings
|
import akka.coordination.lease.LeaseSettings
|
||||||
import akka.coordination.lease.javadsl.Lease
|
|
||||||
import akka.coordination.lease.scaladsl.{ Lease => ScalaLease }
|
import akka.coordination.lease.scaladsl.{ Lease => ScalaLease }
|
||||||
|
import akka.coordination.lease.javadsl.{ Lease => JavaLease }
|
||||||
|
|
||||||
import scala.compat.java8.FutureConverters._
|
import scala.compat.java8.FutureConverters._
|
||||||
import scala.compat.java8.OptionConverters._
|
import scala.compat.java8.OptionConverters._
|
||||||
import scala.concurrent.ExecutionContext
|
import scala.concurrent.ExecutionContext
|
||||||
|
import scala.concurrent.Future
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
@InternalApi
|
@InternalApi
|
||||||
final private[akka] class LeaseAdapter(delegate: ScalaLease)(implicit val ec: ExecutionContext) extends Lease {
|
final private[akka] class LeaseAdapter(delegate: ScalaLease)(implicit val ec: ExecutionContext) extends JavaLease {
|
||||||
|
|
||||||
override def acquire(): CompletionStage[java.lang.Boolean] = delegate.acquire().map(Boolean.box).toJava
|
override def acquire(): CompletionStage[java.lang.Boolean] = delegate.acquire().map(Boolean.box).toJava
|
||||||
|
|
||||||
|
|
@ -33,3 +34,23 @@ final private[akka] class LeaseAdapter(delegate: ScalaLease)(implicit val ec: Ex
|
||||||
override def checkLease(): Boolean = delegate.checkLease()
|
override def checkLease(): Boolean = delegate.checkLease()
|
||||||
override def getSettings(): LeaseSettings = delegate.settings
|
override def getSettings(): LeaseSettings = delegate.settings
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* INTERNAL API
|
||||||
|
*/
|
||||||
|
@InternalApi
|
||||||
|
final private[akka] class LeaseAdapterToScala(val delegate: JavaLease)(implicit val ec: ExecutionContext)
|
||||||
|
extends ScalaLease(delegate.getSettings()) {
|
||||||
|
|
||||||
|
override def acquire(): Future[Boolean] =
|
||||||
|
delegate.acquire().toScala.map(Boolean.unbox)
|
||||||
|
|
||||||
|
override def acquire(leaseLostCallback: Option[Throwable] => Unit): Future[Boolean] =
|
||||||
|
delegate.acquire(o => leaseLostCallback(o.asScala)).toScala.map(Boolean.unbox)
|
||||||
|
|
||||||
|
override def release(): Future[Boolean] =
|
||||||
|
delegate.release().toScala.map(Boolean.unbox)
|
||||||
|
|
||||||
|
override def checkLease(): Boolean =
|
||||||
|
delegate.checkLease()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -7,6 +7,7 @@ package akka.coordination.lease.javadsl
|
||||||
import akka.actor.ClassicActorSystemProvider
|
import akka.actor.ClassicActorSystemProvider
|
||||||
import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
|
import akka.actor.{ ActorSystem, ExtendedActorSystem, Extension, ExtensionId, ExtensionIdProvider }
|
||||||
import akka.coordination.lease.internal.LeaseAdapter
|
import akka.coordination.lease.internal.LeaseAdapter
|
||||||
|
import akka.coordination.lease.internal.LeaseAdapterToScala
|
||||||
import akka.coordination.lease.scaladsl.{ LeaseProvider => ScalaLeaseProvider }
|
import akka.coordination.lease.scaladsl.{ LeaseProvider => ScalaLeaseProvider }
|
||||||
|
|
||||||
object LeaseProvider extends ExtensionId[LeaseProvider] with ExtensionIdProvider {
|
object LeaseProvider extends ExtensionId[LeaseProvider] with ExtensionIdProvider {
|
||||||
|
|
@ -35,6 +36,10 @@ class LeaseProvider(system: ExtendedActorSystem) extends Extension {
|
||||||
*/
|
*/
|
||||||
def getLease(leaseName: String, configPath: String, ownerName: String): Lease = {
|
def getLease(leaseName: String, configPath: String, ownerName: String): Lease = {
|
||||||
val scalaLease = delegate.getLease(leaseName, configPath, ownerName)
|
val scalaLease = delegate.getLease(leaseName, configPath, ownerName)
|
||||||
new LeaseAdapter(scalaLease)(system.dispatchers.internalDispatcher)
|
// unwrap if this is a java implementation
|
||||||
|
scalaLease match {
|
||||||
|
case adapter: LeaseAdapterToScala => adapter.delegate
|
||||||
|
case _ => new LeaseAdapter(scalaLease)(system.dispatchers.internalDispatcher)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,9 @@ import akka.actor.ExtensionId
|
||||||
import akka.actor.ExtensionIdProvider
|
import akka.actor.ExtensionIdProvider
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import akka.coordination.lease.LeaseSettings
|
import akka.coordination.lease.LeaseSettings
|
||||||
|
import akka.coordination.lease.internal.LeaseAdapterToScala
|
||||||
|
|
||||||
|
import scala.reflect.ClassTag
|
||||||
|
|
||||||
object LeaseProvider extends ExtensionId[LeaseProvider] with ExtensionIdProvider {
|
object LeaseProvider extends ExtensionId[LeaseProvider] with ExtensionIdProvider {
|
||||||
override def get(system: ActorSystem): LeaseProvider = super.get(system)
|
override def get(system: ActorSystem): LeaseProvider = super.get(system)
|
||||||
|
|
@ -29,7 +32,7 @@ object LeaseProvider extends ExtensionId[LeaseProvider] with ExtensionIdProvider
|
||||||
private final case class LeaseKey(leaseName: String, configPath: String, clientName: String)
|
private final case class LeaseKey(leaseName: String, configPath: String, clientName: String)
|
||||||
}
|
}
|
||||||
|
|
||||||
class LeaseProvider(system: ExtendedActorSystem) extends Extension {
|
final class LeaseProvider(system: ExtendedActorSystem) extends Extension {
|
||||||
import LeaseProvider.LeaseKey
|
import LeaseProvider.LeaseKey
|
||||||
|
|
||||||
private val log = Logging(system, getClass)
|
private val log = Logging(system, getClass)
|
||||||
|
|
@ -46,6 +49,10 @@ class LeaseProvider(system: ExtendedActorSystem) extends Extension {
|
||||||
* @param ownerName the owner that will `acquire` the lease, e.g. hostname and port of the ActorSystem
|
* @param ownerName the owner that will `acquire` the lease, e.g. hostname and port of the ActorSystem
|
||||||
*/
|
*/
|
||||||
def getLease(leaseName: String, configPath: String, ownerName: String): Lease = {
|
def getLease(leaseName: String, configPath: String, ownerName: String): Lease = {
|
||||||
|
internalGetLease(leaseName, configPath, ownerName)
|
||||||
|
}
|
||||||
|
|
||||||
|
private[akka] def internalGetLease(leaseName: String, configPath: String, ownerName: String): Lease = {
|
||||||
val leaseKey = LeaseKey(leaseName, configPath, ownerName)
|
val leaseKey = LeaseKey(leaseName, configPath, ownerName)
|
||||||
leases.computeIfAbsent(
|
leases.computeIfAbsent(
|
||||||
leaseKey,
|
leaseKey,
|
||||||
|
|
@ -54,39 +61,54 @@ class LeaseProvider(system: ExtendedActorSystem) extends Extension {
|
||||||
val leaseConfig = system.settings.config
|
val leaseConfig = system.settings.config
|
||||||
.getConfig(configPath)
|
.getConfig(configPath)
|
||||||
.withFallback(system.settings.config.getConfig("akka.coordination.lease"))
|
.withFallback(system.settings.config.getConfig("akka.coordination.lease"))
|
||||||
loadLease(LeaseSettings(leaseConfig, leaseName, ownerName), configPath)
|
|
||||||
|
val settings = LeaseSettings(leaseConfig, leaseName, ownerName)
|
||||||
|
|
||||||
|
// Try and load a scala implementation
|
||||||
|
val lease: Try[Lease] =
|
||||||
|
loadLease[Lease](settings).recoverWith {
|
||||||
|
case _: ClassCastException =>
|
||||||
|
// Try and load a java implementation
|
||||||
|
loadLease[akka.coordination.lease.javadsl.Lease](settings).map(javaLease =>
|
||||||
|
new LeaseAdapterToScala(javaLease)(system.dispatchers.internalDispatcher))
|
||||||
|
}
|
||||||
|
|
||||||
|
lease match {
|
||||||
|
case Success(value) => value
|
||||||
|
case Failure(e) =>
|
||||||
|
log.error(
|
||||||
|
e,
|
||||||
|
"Invalid lease configuration for leaseName [{}], configPath [{}] lease-class [{}]. " +
|
||||||
|
"The class must implement scaladsl.Lease or javadsl.Lease and have constructor with LeaseSettings parameter and " +
|
||||||
|
"optionally ActorSystem parameter.",
|
||||||
|
settings.leaseName,
|
||||||
|
configPath,
|
||||||
|
settings.leaseConfig.getString("lease-class"))
|
||||||
|
throw e
|
||||||
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
private def loadLease(leaseSettings: LeaseSettings, configPath: String): Lease = {
|
/**
|
||||||
|
* The Lease types are separate for Java and Scala and A java lease needs to be loadable
|
||||||
|
* from Scala and vice versa as leases can be in libraries and user should not care what
|
||||||
|
* language it is implemented in.
|
||||||
|
*/
|
||||||
|
private def loadLease[T: ClassTag](leaseSettings: LeaseSettings): Try[T] = {
|
||||||
val fqcn = leaseSettings.leaseConfig.getString("lease-class")
|
val fqcn = leaseSettings.leaseConfig.getString("lease-class")
|
||||||
require(fqcn.nonEmpty, "lease-class must not be empty")
|
require(fqcn.nonEmpty, "lease-class must not be empty")
|
||||||
val dynamicAccess = system.dynamicAccess
|
val dynamicAccess = system.dynamicAccess
|
||||||
val instance: Try[Lease] = dynamicAccess.createInstanceFor[Lease](
|
dynamicAccess.createInstanceFor[T](
|
||||||
fqcn,
|
fqcn,
|
||||||
immutable.Seq((classOf[LeaseSettings], leaseSettings), (classOf[ExtendedActorSystem], system))) match {
|
immutable.Seq((classOf[LeaseSettings], leaseSettings), (classOf[ExtendedActorSystem], system))) match {
|
||||||
case s: Success[Lease] =>
|
case s: Success[T] =>
|
||||||
s
|
s
|
||||||
case Failure(_: NoSuchMethodException) =>
|
case Failure(_: NoSuchMethodException) =>
|
||||||
dynamicAccess.createInstanceFor[Lease](fqcn, immutable.Seq((classOf[LeaseSettings], leaseSettings)))
|
dynamicAccess.createInstanceFor[T](fqcn, immutable.Seq((classOf[LeaseSettings], leaseSettings)))
|
||||||
case f: Failure[_] =>
|
case f: Failure[_] =>
|
||||||
f
|
f
|
||||||
}
|
}
|
||||||
instance match {
|
|
||||||
case Success(value) => value
|
|
||||||
case Failure(e) =>
|
|
||||||
log.error(
|
|
||||||
e,
|
|
||||||
"Invalid lease configuration for leaseName [{}], configPath [{}] lease-class [{}]. " +
|
|
||||||
"The class must implement Lease and have constructor with LeaseSettings parameter and " +
|
|
||||||
"optionally ActorSystem parameter.",
|
|
||||||
leaseSettings.leaseName,
|
|
||||||
configPath,
|
|
||||||
fqcn)
|
|
||||||
throw e
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO how to clean up a lease? Not important for this use case as we'll only have one lease
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,6 @@
|
||||||
package jdocs.akka.coordination.lease;
|
package jdocs.akka.coordination.lease;
|
||||||
|
|
||||||
import akka.actor.ActorSystem;
|
import akka.actor.ActorSystem;
|
||||||
import akka.cluster.Cluster;
|
|
||||||
import akka.coordination.lease.LeaseSettings;
|
import akka.coordination.lease.LeaseSettings;
|
||||||
import akka.coordination.lease.javadsl.Lease;
|
import akka.coordination.lease.javadsl.Lease;
|
||||||
import akka.coordination.lease.javadsl.LeaseProvider;
|
import akka.coordination.lease.javadsl.LeaseProvider;
|
||||||
|
|
@ -14,13 +13,15 @@ import docs.akka.coordination.LeaseDocSpec;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.scalatestplus.junit.JUnitSuite;
|
||||||
|
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
import java.util.concurrent.CompletionStage;
|
import java.util.concurrent.CompletionStage;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
public class LeaseDocTest {
|
public class LeaseDocTest extends JUnitSuite {
|
||||||
// #lease-example
|
// #lease-example
|
||||||
static class SampleLease extends Lease {
|
static class SampleLease extends Lease {
|
||||||
|
|
||||||
|
|
@ -37,22 +38,22 @@ public class LeaseDocTest {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletionStage<Boolean> acquire() {
|
public CompletionStage<Boolean> acquire() {
|
||||||
return null;
|
return CompletableFuture.completedFuture(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletionStage<Boolean> acquire(Consumer<Optional<Throwable>> leaseLostCallback) {
|
public CompletionStage<Boolean> acquire(Consumer<Optional<Throwable>> leaseLostCallback) {
|
||||||
return null;
|
return CompletableFuture.completedFuture(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletionStage<Boolean> release() {
|
public CompletionStage<Boolean> release() {
|
||||||
return null;
|
return CompletableFuture.completedFuture(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean checkLease() {
|
public boolean checkLease() {
|
||||||
return false;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// #lease-example
|
// #lease-example
|
||||||
|
|
@ -73,10 +74,10 @@ public class LeaseDocTest {
|
||||||
private void doSomethingImportant(Optional<Throwable> leaseLostReason) {}
|
private void doSomethingImportant(Optional<Throwable> leaseLostReason) {}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void beLoadable() {
|
public void javaLeaseBeLoadableFromJava() {
|
||||||
// #lease-usage
|
// #lease-usage
|
||||||
Lease lease =
|
Lease lease =
|
||||||
LeaseProvider.get(system).getLease("<name of the lease>", "docs-lease", "<owner name>");
|
LeaseProvider.get(system).getLease("<name of the lease>", "jdocs-lease", "<owner name>");
|
||||||
CompletionStage<Boolean> acquired = lease.acquire();
|
CompletionStage<Boolean> acquired = lease.acquire();
|
||||||
boolean stillAcquired = lease.checkLease();
|
boolean stillAcquired = lease.checkLease();
|
||||||
CompletionStage<Boolean> released = lease.release();
|
CompletionStage<Boolean> released = lease.release();
|
||||||
|
|
@ -87,8 +88,18 @@ public class LeaseDocTest {
|
||||||
// #lost-callback
|
// #lost-callback
|
||||||
|
|
||||||
// #cluster-owner
|
// #cluster-owner
|
||||||
String owner = Cluster.get(system).selfAddress().hostPort();
|
// String owner = Cluster.get(system).selfAddress().hostPort();
|
||||||
// #cluster-owner
|
// #cluster-owner
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void scalaLeaseBeLoadableFromJava() {
|
||||||
|
Lease lease =
|
||||||
|
LeaseProvider.get(system).getLease("<name of the lease>", "docs-lease", "<owner name>");
|
||||||
|
CompletionStage<Boolean> acquired = lease.acquire();
|
||||||
|
boolean stillAcquired = lease.checkLease();
|
||||||
|
CompletionStage<Boolean> released = lease.release();
|
||||||
|
lease.acquire(this::doSomethingImportant);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -35,7 +35,9 @@ class SampleLease(settings: LeaseSettings) extends Lease(settings) {
|
||||||
|
|
||||||
object LeaseDocSpec {
|
object LeaseDocSpec {
|
||||||
|
|
||||||
val config = ConfigFactory.parseString("""
|
def config() =
|
||||||
|
ConfigFactory.parseString("""
|
||||||
|
jdocs-lease.lease-class = "jdocs.akka.coordination.lease.LeaseDocTest$SampleLease"
|
||||||
#lease-config
|
#lease-config
|
||||||
akka.actor.provider = cluster
|
akka.actor.provider = cluster
|
||||||
docs-lease {
|
docs-lease {
|
||||||
|
|
@ -62,7 +64,7 @@ class LeaseDocSpec extends AkkaSpec(LeaseDocSpec.config) {
|
||||||
import LeaseDocSpec._
|
import LeaseDocSpec._
|
||||||
|
|
||||||
"A docs lease" should {
|
"A docs lease" should {
|
||||||
"be loadable" in {
|
"scala lease be loadable from scala" in {
|
||||||
|
|
||||||
//#lease-usage
|
//#lease-usage
|
||||||
val lease = LeaseProvider(system).getLease("<name of the lease>", "docs-lease", "owner")
|
val lease = LeaseProvider(system).getLease("<name of the lease>", "docs-lease", "owner")
|
||||||
|
|
@ -83,6 +85,16 @@ class LeaseDocSpec extends AkkaSpec(LeaseDocSpec.config) {
|
||||||
blackhole(acquired, stillAcquired, released, owner)
|
blackhole(acquired, stillAcquired, released, owner)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"java lease be loadable from scala" in {
|
||||||
|
val lease = LeaseProvider(system).getLease("<name of the lease>", "jdocs-lease", "owner")
|
||||||
|
val acquired: Future[Boolean] = lease.acquire()
|
||||||
|
val stillAcquired: Boolean = lease.checkLease()
|
||||||
|
val released: Future[Boolean] = lease.release()
|
||||||
|
lease.acquire(leaseLostReason => doSomethingImportant(leaseLostReason))
|
||||||
|
|
||||||
|
blackhole(acquired, stillAcquired, released)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue