diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala index 0e077c9caa..8f4a5f53ad 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/ddata/typed/scaladsl/Replicator.scala @@ -78,12 +78,18 @@ object Replicator { } type GetSuccess[A <: ReplicatedData] = dd.Replicator.GetSuccess[A] type NotFound[A <: ReplicatedData] = dd.Replicator.NotFound[A] + object NotFound { + def unapply[A <: ReplicatedData](rsp: NotFound[A]): Option[(Key[A], Option[Any])] = Some((rsp.key, rsp.request)) + } /** * The [[Get]] request could not be fulfill according to the given * [[ReadConsistency consistency level]] and [[ReadConsistency#timeout timeout]]. */ type GetFailure[A <: ReplicatedData] = dd.Replicator.GetFailure[A] + object GetFailure { + def unapply[A <: ReplicatedData](rsp: GetFailure[A]): Option[(Key[A], Option[Any])] = Some((rsp.key, rsp.request)) + } object Update { @@ -145,7 +151,15 @@ object Replicator { type UpdateResponse[A <: ReplicatedData] = dd.Replicator.UpdateResponse[A] type UpdateSuccess[A <: ReplicatedData] = dd.Replicator.UpdateSuccess[A] + object UpdateSuccess { + def unapply[A <: ReplicatedData](rsp: UpdateSuccess[A]): Option[(Key[A], Option[Any])] = + Some((rsp.key, rsp.request)) + } type UpdateFailure[A <: ReplicatedData] = dd.Replicator.UpdateFailure[A] + object UpdateFailure { + def unapply[A <: ReplicatedData](rsp: UpdateFailure[A]): Option[(Key[A], Option[Any])] = + Some((rsp.key, rsp.request)) + } /** * The direct replication of the [[Update]] could not be fulfill according to @@ -157,12 +171,20 @@ object Replicator { * crashes before it has been able to communicate with other replicas. */ type UpdateTimeout[A <: ReplicatedData] = dd.Replicator.UpdateTimeout[A] + object UpdateTimeout { + def unapply[A <: ReplicatedData](rsp: UpdateTimeout[A]): Option[(Key[A], Option[Any])] = + Some((rsp.key, rsp.request)) + } /** * If the `modify` function of the [[Update]] throws an exception the reply message * will be this `ModifyFailure` message. The original exception is included as `cause`. */ type ModifyFailure[A <: ReplicatedData] = dd.Replicator.ModifyFailure[A] + object ModifyFailure { + def unapply[A <: ReplicatedData](rsp: ModifyFailure[A]): Option[(Key[A], String, Throwable, Option[Any])] = + Some((rsp.key, rsp.errorMessage, rsp.cause, rsp.request)) + } /** * The local store or direct replication of the [[Update]] could not be fulfill according to @@ -175,6 +197,10 @@ object Replicator { * crashes before it has been able to communicate with other replicas. */ type StoreFailure[A <: ReplicatedData] = dd.Replicator.StoreFailure[A] + object StoreFailure { + def unapply[A <: ReplicatedData](rsp: StoreFailure[A]): Option[(Key[A], Option[Any])] = + Some((rsp.key, rsp.request)) + } /** * Register a subscriber that will be notified with a [[Changed]] message @@ -239,8 +265,20 @@ object Replicator { type DeleteResponse[A <: ReplicatedData] = dd.Replicator.DeleteResponse[A] type DeleteSuccess[A <: ReplicatedData] = dd.Replicator.DeleteSuccess[A] + object DeleteSuccess { + def unapply[A <: ReplicatedData](rsp: DeleteSuccess[A]): Option[(Key[A], Option[Any])] = + Some((rsp.key, rsp.request)) + } type ReplicationDeleteFailure[A <: ReplicatedData] = dd.Replicator.ReplicationDeleteFailure[A] + object ReplicationDeleteFailure { + def unapply[A <: ReplicatedData](rsp: ReplicationDeleteFailure[A]): Option[(Key[A], Option[Any])] = + Some((rsp.key, rsp.request)) + } type DataDeleted[A <: ReplicatedData] = dd.Replicator.DataDeleted[A] + object DataDeleted { + def unapply[A <: ReplicatedData](rsp: DataDeleted[A]): Option[(Key[A], Option[Any])] = + Some((rsp.key, rsp.request)) + } object GetReplicaCount { @@ -248,7 +286,7 @@ object Replicator { * Convenience for `ask`. */ def apply(): ActorRef[ReplicaCount] => GetReplicaCount = - (replyTo => GetReplicaCount(replyTo)) + replyTo => GetReplicaCount(replyTo) } /** @@ -261,6 +299,10 @@ object Replicator { * Current number of replicas. Reply to `GetReplicaCount`. */ type ReplicaCount = dd.Replicator.ReplicaCount + object ReplicaCount { + def unapply[A <: ReplicatedData](rsp: ReplicaCount): Option[Int] = + Some(rsp.n) + } /** * Notify subscribers of changes now, otherwise they will be notified periodically diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala index 401f4ca5a1..822e522f5a 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/ddata/typed/scaladsl/ReplicatorSpec.scala @@ -114,6 +114,36 @@ object ReplicatorSpec { // suppress unused compiler warnings println("" + reply1 + reply2 + reply3 + reply4) } + + def shouldHaveUnapplyForResponseTypes(): Unit = { + val getResponse: GetResponse[GCounter] = ??? + getResponse match { + case GetSuccess(Key, Some(_)) => + case GetFailure(Key, Some(_)) => + case NotFound(Key, None) => + } + + val updateResponse: UpdateResponse[GCounter] = ??? + updateResponse match { + case UpdateSuccess(Key, Some(_)) => + case ModifyFailure(Key, _, _, None) => + case UpdateTimeout(Key, None) => + case StoreFailure(Key, None) => + case UpdateFailure(Key, Some(_)) => + } + + val deleteResponse: DeleteResponse[GCounter] = ??? + deleteResponse match { + case DeleteSuccess(Key, Some(_)) => + case ReplicationDeleteFailure(Key, Some(_)) => + case DataDeleted(Key, Some(_)) => + } + + val replicaCount: ReplicaCount = ??? + replicaCount match { + case ReplicaCount(_) => + } + } } }