Implementing Durable Entities in Kotlin - Calling Entities
Introduction
So far, we have built a solid foundation for Durable Entities in Kotlin. We have established stateful entities that persist data and allowed them to interact dynamically through signaling. We have also introduced cross-entity communication, where entities can notify each other of significant events.
However, all interactions so far have been fire-and-forget. Entities can send signals, but they don't receive immediate responses. This approach is sufficient for many scenarios, but real-world applications often require two-way communication.
In this post, we introduce entity calling, which allows entities to invoke operations and receive responses. This unlocks more advanced workflows, such as querying state, making decisions based on results, and chaining interactions in a more structured way.
To demonstrate this, we enhance our Counter
entity to support calls, allowing clients and
orchestrations to retrieve its current value. Along the way, we'll introduce best practices for
handling entity calls safely with Durable Functions constraints.
Constraints in Durable Orchestrations
In a previous post where we introduced state persistence, we demonstrated how we can query the state of an entity using the custom status metadata. However, as we have previously seen, orchestrations replay their executions to ensure reliability. Any non-deterministic operation, such as fetching a new timestamp or making an external API request, could lead to inconsistent results. Because of this, entity calls need a structured, replay-safe mechanism to avoid breaking the orchestration model.
Instead of calling entities directly, we establish a request-response patterns where the caller requests an operation from an entity, the entity processes the request and sends a response, and the caller waits for and retrieves the response.
Another challenge is ensuring that responses are mapped to their corresponding requests. Since each entity call runs within an orchestrator that may replay multiple times, using a traditional random ID to track calls would introduce non-determinism. Instead, we need a deterministic way to generate unique event names to safely map responses to their originating requests.
Building a Request-Response Pattern for Entity Calls
Extending the Data Model
To support entity calls, we extend our data classes. Instead of just sending operation names and input values, we now include metadata for responses.
data class Operation(
val name: String,
val input: Any? = null,
val requesterId: String? = null,
val responseEventName: String? = null,
)
data class SignalEntityInput(
val entityId: String,
val operation: Operation,
val eventName: String? = null,
)
The requesterId
field identifies who made the request, while responseEventName
specifies the
name of the event to which the response should be sent. The eventName
in the SignalEntityInput
ensures the result is mapped correctly to its corresponding request.
Implementing the Call Logic
With our extended data model in place, we define a callEntity
function that facilitates
request-response interactions between orchestrators and entities. Unlike fire-and-forget signals,
this function ensures that an orchestrator can invoke an operation and reliably retrieve a response.
To achieve this, we introduce a structured event-based mechanism that maintains determinism and orchestration safety. When an orchestrator calls an entity, it generates a deterministically unique event name to track its corresponding response. The orchestrator then waits for an external event to receive the result before continuing execution.
fun <T> TaskOrchestrationContext.callEntity(
entityId: String,
operationName: String,
operationInput: Any? = null,
returnType: Class<T>,
): T {
// Generate a deterministically safe and unique response event name
val responseEventName = "ResponseEvent_${this.newUUID()}"
// Set up the listener for the response event
val resultTask = this.waitForExternalEvent(responseEventName, String::class.java)
// Generate the payload
val operation =
Operation(
name = operationName,
input = operationInput,
requesterId = this.instanceId,
responseEventName = responseEventName,
)
val input = SignalEntityInput(entityId = entityId, operation = operation)
// Emit the signal to the entity
this.signalEntity(input)
// Wait for the response event
val resultJson = resultTask.await()
// Parse operation input into requested return type and return it
val resultOperation = objectMapper.readValue<Operation>(resultJson)
return objectMapper.convertValue(resultOperation.input, returnType)
}
To maintain determinism, we generate a unique response event name using newUUID()
, which is safe
for orchestration replays as it remains consistent. Using a non-deterministic value like
UUID.randomUUID()
would break replay behavior, making it unsuitable for Durable Functions.
Once the event name is generated, we register an event listener using waitForExternalEvent
. We
then package the request into SignalEntityInput
and send it to the entity. The entity processes
the request and emits a response event back to the orchestrator using the stored
responseEventName
. WHen the orchestrator receives the response, it deserializes the result and
returns it as the expected data type.
Enhancing Signal Logic
To facilitate structured entity communication, we enhance our signaling logic by introducing an
activity function that serves as an intermediary. The signalEntity
function is now responsible for
routing signals safely while ensuring Durable Functions constraints are followed.
fun TaskOrchestrationContext.signalEntity(input: SignalEntityInput) {
this.callActivity("SignalEntity", input)
}
@FunctionName("SignalEntity")
fun signalEntityActivity(
@DurableActivityTrigger(name = "input") input: SignalEntityInput,
@DurableClientInput(name = "ctx") ctx: DurableClientContext,
) {
ctx.client.signalEntity(input)
}
fun DurableTaskClient.signalEntity(input: SignalEntityInput) {
val operationJSON = objectMapper.writeValueAsString(input.operation)
val eventName = input.eventName ?: "EntityOperation"
this.raiseEvent(input.entityId, eventName, operationJSON)
}
By routing signals through an activity function, we ensure that Durable Task Client from within an
orchestrator. Additionally, the eventName
parameter ensures that responses are mapped correctly to
their corresponding requests.
Adding Support for Entity Calls
With a safe signaling mechanism in place, we modify our counter to support calls rather than just
fire-and-forget signals. Specifically, we introduce a Get
operation that returns the current
counter value to the requester.
@FunctionName("Counter")
fun counter(@DurableOrchestrationTrigger(name = "ctx") ctx: TaskOrchestrationContext) {
var counter = 0
ctx.setCustomStatus(counter)
while (true) {
val operation = ctx.waitForEntityOperation()
when (operation.name) {
"Add" -> counter += (operation.input as? Int) ?: 0
"Get" -> {
if (
operation.requesterId != null && operation.responseEventName != null
) {
val resultOperation = Operation(name = "Result", input = counter)
val signalInput =
SignalEntityInput(
entityId = operation.requesterId,
operation = resultOperation,
eventName = operation.responseEventName,
)
ctx.signalEntity(signalInput)
}
}
"Reset" -> counter = 0
}
ctx.setCustomStatus(counter)
}
}
Instead of immediately returning the value, the counter packages the result as an operation and
sends it to the requester. The requesterId
and responseEventName
ensure that the response is
delivered to the correct waiting orchestrator.
Abstracting the Result Logic
We encapsulate the logic for returning results into an extension function:
fun <T> TaskOrchestrationContext.returnResult(operation: Operation, value: T) {
if (operation.requesterId != null && operation.responseEventName != null) {
val resultOperation = Operation(name = "Result", input = value)
val signalInput =
SignalEntityInput(
entityId = operation.requesterId,
operation = resultOperation,
eventName = operation.responseEventName,
)
this.signalEntity(signalInput)
} else {
throw IllegalArgumentException(
"Requester ID or response event name not provided"
)
}
}
With this, we simplify the Counter
entity further:
@FunctionName("Counter")
fun counter(@DurableOrchestrationTrigger(name = "ctx") ctx: TaskOrchestrationContext) {
var counter = 0
ctx.setCustomStatus(counter)
while (true) {
val operation = ctx.waitForEntityOperation()
when (operation.name) {
"Add" -> counter += (operation.input as? Int) ?: 0
"Get" -> ctx.returnResult(operation, counter)
"Reset" -> counter = 0
}
ctx.setCustomStatus(counter)
}
}
Testing Entity Calls
Creating a CounterCaller Orchestration
We define a CounterCaller
orchestration to test the callEntity
function:
@FunctionName("CounterCaller")
fun counterCaller(
@DurableOrchestrationTrigger(name = "ctx") ctx: TaskOrchestrationContext,
context: ExecutionContext,
) {
// Get the ID of the Counter from the input
val counterId = ctx.getInput(String::class.java)
// For demo: Yield control to ensure the runtime registers everything first
ctx.createTimer(Duration.ofSeconds(5)).await()
// Call the Get operation on the Counter
val counterValue =
ctx.callEntity(
entityId = counterEntityId,
operationName = "Get",
returnType = Int::class.java,
)
// Log the returned counter value
context.logger.info("Received $counterValue")
}
Defining the Orchestration HTTP Trigger
We create an HTTP trigger to initialize both the Counter
and CounterCaller
:
@FunctionName("CreateCounterOrchestration")
fun createCounterOrchestration(
@HttpTrigger(
name = "req",
methods = [HttpMethod.GET],
authLevel = AuthorizationLevel.ANONYMOUS,
)
request: HttpRequestMessage<Optional<String>>,
@DurableClientInput(name = "ctx") ctx: DurableClientContext,
): HttpResponseMessage {
// Create a Counter and a CounterCaller
val counterId = Entity(ctx.client, "Counter")
val callerId = Entity(ctx.client, "CounterCaller", counterId)
// Return an HTTP response containing the IDs
val responseBody =
"""
Created Counter and CounterCaller.
Counter ID: $counterId
Caller ID: $callerId
"""
.trimIndent()
return request.success(responseBody)
}
Validating the Output
We run the setup and check the logs for the retrieved counter value:
❯ curl -s https://durable-app.azurewebsites.net/api/CreateCounterOrchestration
Created Counter and CounterCaller.
Counter ID: 3feb953a-0f90-4aaa-81fa-13a96c3c9a6b
Caller ID: 7005eee2-b644-4f71-9ec7-b7d18b09bc55
2024-11-13T07:17:18Z [Information] CounterCaller: Started
2024-11-13T07:17:18Z [Information] CounterCaller: received 0
What's Next?
With this we have developed the functionalities that we need to use Durable Entities in Kotlin. In the next and final post in the series, we will refine our implementation by introducing abstractions to simplify interactions and align our approach with the native Durable Entities experience.
Read Further
This post is part of the Implementing Durable Entities in Kotlin series.
- Previous: Cross-Communication.
- Next: Final Enhancements.