-
Notifications
You must be signed in to change notification settings - Fork 1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[DOCS] Add custom lease documentation (#7388)
* Add custom lease documentation * Simplify structure * upgrade unit test
- Loading branch information
Showing
4 changed files
with
346 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
--- | ||
uid: lease | ||
title: Distributed Locks with Akka.Coordination | ||
--- | ||
# Distributed Locks with Akka.Coordination | ||
|
||
## General Definition | ||
|
||
`Akka.Coordination` provides a generalized "distributed lock" implementation called a `Lease` that uses a unique resource identifier inside a backing store (such as Azure Blob Storage or Kubernetes Custom Resource Definitions) to only allow one current "holder" of the lease to perform an action at any given time. | ||
|
||
Akka.NET uses leases internally inside [Split Brain Resolver](../clustering/split-brain-resolver.md), [Cluster.Sharding](../clustering/cluster-sharding.md), and [Cluster Singletons](../clustering/cluster-singleton.md) for this purpose - and in this document you can learn how to call and create leases in your own Akka.NET applications if needed. | ||
|
||
### Officially Supported Lease Implementations | ||
|
||
There are currently two officially supported lease implementations: | ||
|
||
* [Akka.Coordination.KubernetesApi](https://github.com/akkadotnet/Akka.Management/tree/dev/src/coordination/kubernetes/Akka.Coordination.KubernetesApi) | ||
* [Akka.Coordination.Azure](https://github.com/akkadotnet/Akka.Management/tree/dev/src/coordination/azure/Akka.Coordination.Azure) | ||
|
||
All lease implementations in Akka.NET supports automatic expiry or renewal mechanisms. Expiry ensures that leases do not remain active indefinitely, which can prevent resource deadlock or starvation scenarios. | ||
|
||
### Key Characteristics and Components | ||
|
||
* **Lease Name**: A unique identifier for the lease, which specifies the resource to be protected. | ||
* **Owner Name**: A unique identifier for the entity (usually an actor or node) that is attempting to acquire the lease. | ||
* **Lease Timeout**: May also be called "Time To Live" or TTL. A duration parameter that specifies how long the lease should last. Leases may be renewed or revoked depending on the implementation. | ||
|
||
### Public API | ||
|
||
The `Akka.Coordination.Lease` API provides the following methods: | ||
|
||
* **`Task<bool> Acquire()`** | ||
* **`Task<bool> Acquire(Action<Exception> leaseLostCallback)`** | ||
|
||
These asynchronous methods attempts to acquire the lease for the resource. It returns a `Task<bool>`, indicating if the acquisition was successful or not. Parameters may include callback delegate method that will be invoked when a granted lease have been revoked for some reason. | ||
|
||
* **`Task<bool> Release()`** | ||
|
||
This asynchronous method releases the lease, relinquishing the access rights to the resource. It returns a `Task<bool>`, where true indicates successful release. This method is important for ensuring that resources are freed up for other actors or nodes once a task is completed. | ||
|
||
* **`bool CheckLease()`** | ||
|
||
This method checks whether the lease is still valid, typically returning a Boolean. `CheckLease()` is useful for verifying if a lease has expired or been revoked, ensuring that processes do not operate under an invalid lease. | ||
|
||
## Example | ||
|
||
The full code for this example can be seen inside the [Akka.NET repo](https://github.com/akkadotnet/akka.net/blob/dev/src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs) | ||
|
||
### Internal Messages | ||
|
||
The actor using `Lease` will need a few boilerplate internal messages: | ||
|
||
[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs?name=messages)] | ||
|
||
### Obtaining Reference To Lease Implementation | ||
|
||
To obtain a reference to the `Lease` implementation, you will need 4 things: | ||
|
||
* **Lease Name**: A unique identifier for the lease, which specifies the resource to be protected. | ||
* **Owner Name**: A unique identifier for the entity (usually an actor or node) that is attempting to acquire the lease. | ||
* **Configuration Path**: A full HOCON configuration path containing the definition of the lease implementation. | ||
* **Retry Interval**: A time duration needed for failed lease acquisition retry. | ||
|
||
A `Lease` reference is then obtained by calling `LeaseProvider.Get(Context.System).GetLease()` | ||
|
||
[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs?name=constructor)] | ||
|
||
### Actor States | ||
|
||
The actor leverages actor states to separate the lease acquisition and actual working state of the actor. | ||
|
||
* **`AcquiringLease` State** | ||
In this state, the actor will only handle the required internal messages related to lease acquisition. Any other messages not related to lease acquisition will be stashed until the lease is acquired/granted. The actor will automatically retry lease acquisition by calling `AcquireLease()` on a regular basis if it failed to acquire a lease. | ||
* **`Active` State** | ||
In this state, the actor is active and is allowed to process all received messages normally. The only lease related message being processed is the `LeaseLost` internal message that signals lease revocation. | ||
|
||
In the event of a lease revocation, the actor will forcefully shuts down to prevent resource contention. This may be modified to suit user needs. | ||
|
||
[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs?name=actor-states)] | ||
|
||
[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs?name=lease-acquisition)] | ||
|
||
### Lease Lifecycle | ||
|
||
Lease needs to be granted before an actor can perform any of its message handling and the actor needs to stop, forcefully or gracefully, if the lease is revoked. Attention must be taken so that, in the event of revoked lease, there would be no resource contention, or at least with minimal impact. | ||
|
||
In the example code, lease would be acquired inside the `PreStart()` method override by calling `AcquireLease()` and it will be released inside the `PostStop()` method override. | ||
|
||
[!code-csharp[Main](../../../src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs?name=lease-lifecycle)] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
253 changes: 253 additions & 0 deletions
253
src/core/Akka.Docs.Tests/Utilities/LeaseActorDocSpec.cs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,253 @@ | ||
// ----------------------------------------------------------------------- | ||
// <copyright file="LeaseActorDocSpec.cs" company="Akka.NET Project"> | ||
// Copyright (C) 2009-2024 Lightbend Inc. <http://www.lightbend.com> | ||
// Copyright (C) 2013-2024 .NET Foundation <https://github.com/akkadotnet/akka.net> | ||
// </copyright> | ||
// ----------------------------------------------------------------------- | ||
|
||
using System; | ||
using System.Threading.Tasks; | ||
using Akka.Actor; | ||
using Akka.Coordination; | ||
using Akka.Coordination.Tests; | ||
using Akka.Event; | ||
using Akka.TestKit.Xunit2; | ||
using FluentAssertions.Extensions; | ||
using Xunit; | ||
using Xunit.Abstractions; | ||
|
||
#nullable enable | ||
namespace DocsExamples.Utilities.Leases; | ||
|
||
public class LeaseActorDocSpec: TestKit | ||
{ | ||
private class LeaseFailed : Exception | ||
{ | ||
public LeaseFailed(string message) : base(message) | ||
{ | ||
} | ||
|
||
public LeaseFailed(string message, Exception innerEx) | ||
: base(message, innerEx) | ||
{ | ||
} | ||
} | ||
|
||
private const string ResourceId = "protected-resource"; | ||
|
||
private LeaseUsageSettings LeaseSettings => new("test-lease", TimeSpan.FromSeconds(1)); | ||
private string LeaseOwner { get; } = $"leased-actor-{Guid.NewGuid():N}"; | ||
|
||
public LeaseActorDocSpec(ITestOutputHelper output) | ||
: base(TestLease.Configuration, nameof(LeaseActorDocSpec), output) | ||
{ | ||
// start test lease extension | ||
TestLeaseExt.Get(Sys); | ||
} | ||
|
||
private TestLease GetLease() => TestLeaseExt.Get(Sys).GetTestLease(ResourceId); | ||
|
||
[Fact] | ||
public void Actor_with_lease_should_not_be_active_until_lease_is_acquired() | ||
{ | ||
var testActor = Sys.ActorOf(Props.Create(() => new LeaseActor(LeaseSettings, ResourceId, LeaseOwner))); | ||
testActor.Tell("Hi", TestActor); | ||
ExpectNoMsg(200.Milliseconds()); | ||
|
||
var lease = GetLease(); | ||
lease.InitialPromise.SetResult(true); | ||
ExpectMsg("Hi"); | ||
} | ||
|
||
[Fact] | ||
public void Actor_with_lease_should_retry_if_initial_acquire_is_false() | ||
{ | ||
var testActor = Sys.ActorOf(Props.Create(() => new LeaseActor(LeaseSettings, ResourceId, LeaseOwner))); | ||
testActor.Tell("Hi", TestActor); | ||
ExpectNoMsg(200.Milliseconds()); | ||
|
||
var lease = GetLease(); | ||
lease.InitialPromise.SetResult(false); | ||
ExpectNoMsg(200.Milliseconds()); | ||
lease.SetNextAcquireResult(Task.FromResult(true)); | ||
ExpectMsg("Hi"); | ||
} | ||
|
||
[Fact] | ||
public void Actor_with_lease_should_retry_if_initial_acquire_fails() | ||
{ | ||
var testActor = Sys.ActorOf(Props.Create(() => new LeaseActor(LeaseSettings, ResourceId, LeaseOwner))); | ||
testActor.Tell("Hi", TestActor); | ||
ExpectNoMsg(200.Milliseconds()); | ||
|
||
var lease = GetLease(); | ||
lease.InitialPromise.SetException(new LeaseFailed("oh no")); | ||
ExpectNoMsg(200.Milliseconds()); | ||
lease.SetNextAcquireResult(Task.FromResult(true)); | ||
ExpectMsg("Hi"); | ||
} | ||
|
||
[Fact] | ||
public void Actor_with_lease_should_terminate_if_lease_lost() | ||
{ | ||
var testActor = Sys.ActorOf(Props.Create(() => new LeaseActor(LeaseSettings, ResourceId, LeaseOwner))); | ||
testActor.Tell("Hi", TestActor); | ||
ExpectNoMsg(200.Milliseconds()); | ||
|
||
var lease = GetLease(); | ||
lease.InitialPromise.SetResult(true); | ||
ExpectMsg("Hi"); | ||
|
||
Watch(testActor); | ||
lease.GetCurrentCallback()(new LeaseFailed("oh dear")); | ||
ExpectTerminated(testActor); | ||
} | ||
|
||
[Fact] | ||
public void Actor_with_lease_should_release_lease_when_stopped() | ||
{ | ||
var testActor = Sys.ActorOf(Props.Create(() => new LeaseActor(LeaseSettings, ResourceId, LeaseOwner))); | ||
testActor.Tell("Hi", TestActor); | ||
ExpectNoMsg(200.Milliseconds()); | ||
|
||
var lease = GetLease(); | ||
lease.InitialPromise.SetResult(true); | ||
lease.Probe.ExpectMsg(new TestLease.AcquireReq(LeaseOwner)); | ||
ExpectMsg("Hi"); | ||
|
||
Watch(testActor); | ||
lease.GetCurrentCallback()(new LeaseFailed("oh dear")); | ||
ExpectTerminated(testActor); | ||
|
||
lease.Probe.ExpectMsg(new TestLease.ReleaseReq(LeaseOwner)); | ||
} | ||
|
||
} | ||
|
||
public class LeaseActor: ReceiveActor, IWithStash, IWithTimers | ||
{ | ||
#region messages | ||
private sealed record LeaseAcquireResult(bool Acquired, Exception? Reason); | ||
private sealed record LeaseLost(Exception Reason); | ||
private sealed class LeaseRetryTick | ||
{ | ||
public static readonly LeaseRetryTick Instance = new(); | ||
private LeaseRetryTick() { } | ||
} | ||
#endregion | ||
|
||
private const string LeaseRetryTimer = "lease-retry"; | ||
|
||
private readonly string _resourceId; | ||
private readonly Lease _lease; | ||
private readonly TimeSpan _leaseRetryInterval; | ||
private readonly ILoggingAdapter _log; | ||
private readonly string _uniqueId; | ||
|
||
#region constructor | ||
public LeaseActor(LeaseUsageSettings leaseSettings, string resourceId, string actorUniqueId) | ||
{ | ||
_resourceId = resourceId; | ||
_uniqueId = actorUniqueId; | ||
|
||
_lease = LeaseProvider.Get(Context.System).GetLease( | ||
leaseName: _resourceId, | ||
configPath: leaseSettings.LeaseImplementation, | ||
ownerName: _uniqueId); | ||
_leaseRetryInterval = leaseSettings.LeaseRetryInterval; | ||
|
||
_log = Context.GetLogger(); | ||
} | ||
#endregion | ||
|
||
public IStash Stash { get; set; } = null!; | ||
|
||
public ITimerScheduler Timers { get; set; } = null!; | ||
|
||
#region actor-states | ||
private void AcquiringLease() | ||
{ | ||
Receive<LeaseAcquireResult>(lar => | ||
{ | ||
if (lar.Acquired) | ||
{ | ||
_log.Debug("{0}: Lease acquired", _resourceId); | ||
Stash.UnstashAll(); | ||
Become(Active); | ||
} | ||
else | ||
{ | ||
_log.Error(lar.Reason, "{0}: Failed to get lease for unique Id [{1}]. Retry in {2}", | ||
_resourceId, _uniqueId, _leaseRetryInterval); | ||
Timers.StartSingleTimer(LeaseRetryTimer, LeaseRetryTick.Instance, _leaseRetryInterval); | ||
} | ||
}); | ||
|
||
Receive<LeaseRetryTick>(_ => AcquireLease()); | ||
|
||
Receive<LeaseLost>(HandleLeaseLost); | ||
|
||
ReceiveAny(msg => | ||
{ | ||
_log.Debug("{0}: Got msg of type [{1}] from [{2}] while waiting for lease, stashing", | ||
_resourceId, msg.GetType().Name, Sender); | ||
Stash.Stash(); | ||
}); | ||
} | ||
|
||
private void Active() | ||
{ | ||
Receive<LeaseLost>(HandleLeaseLost); | ||
|
||
// TODO: Insert your actor message handlers here | ||
ReceiveAny(msg => Sender.Tell(msg, Self)); | ||
} | ||
|
||
private void HandleLeaseLost(LeaseLost msg) | ||
{ | ||
_log.Error(msg.Reason, "{0}: unique id [{1}] lease lost", _resourceId, _uniqueId); | ||
Context.Stop(Self); | ||
} | ||
#endregion | ||
|
||
#region lease-acquisition | ||
private void AcquireLease() | ||
{ | ||
_log.Info("{0}: Acquiring lease {1}", _resourceId, _lease.Settings); | ||
var self = Self; | ||
Acquire().PipeTo(self); | ||
Become(AcquiringLease); | ||
return; | ||
|
||
async Task<LeaseAcquireResult> Acquire() | ||
{ | ||
try | ||
{ | ||
var result = await _lease.Acquire(reason => { self.Tell(new LeaseLost(reason)); }); | ||
return new LeaseAcquireResult(result, null); | ||
} | ||
catch (Exception ex) | ||
{ | ||
return new LeaseAcquireResult(false, ex); | ||
} | ||
} | ||
} | ||
#endregion | ||
|
||
#region lease-lifecycle | ||
protected override void PreStart() | ||
{ | ||
base.PreStart(); | ||
// Acquire a lease when actor starts | ||
AcquireLease(); | ||
} | ||
|
||
protected override void PostStop() | ||
{ | ||
base.PostStop(); | ||
// Release the lease when actor stops | ||
_lease.Release().GetAwaiter().GetResult(); | ||
} | ||
#endregion | ||
|
||
} |