An event handler, or event aggregator, is a function which takes the current aggregate,
applies an event to it to produce a new updated aggregate.
One Aggregator
is required for each event.
Event aggregator DSL
An aggregator AggregatorBuilder
builder can be used to define aggregator functions for multiple event types from a set
of single-event aggregators.
For example:
private static Aggregator<AccountCreated, Optional<Account>> handleAccountCreated() {
return (currentAggregate, event) ->
Optional.of(new Account(event.username(), event.initialFunds(), emptyList()));
}
private static Aggregator<AccountUpdated, Optional<Account>> handleAccountUpdated() {
return (currentAggregate, event) -> currentAggregate.map(a -> a.toBuilder().username(event.username()).build());
}
private static Aggregator<FundsAdded, Optional<Account>> handleFundsAdded() {
return (currentAggregate, event) -> currentAggregate.map(a -> a.addFunds(event.addedFunds()));
}
public static Aggregator<AccountEvent, Optional<Account>> getAggregator() {
return AggregatorBuilder.<AccountEvent, Optional<Account>>newBuilder()
.onEvent(AccountCreated.class, handleAccountCreated())
.onEvent(AccountUpdated.class, handleAccountUpdated())
.onEvent(FundsAdded.class, handleFundsAdded())
.build();
}
Best practices for aggregators
Some best practices need to be followed to implement the aggregator correctly:
Pure function
: the aggregator should not have any side effect, and it should not modify the passed aggregate state. It should rather do a deep copy of the passed aggregate, apply the event and return this altered aggregate.Non-blocking
: The event aggregator is part of a Kafka transaction which has a timeout. So if the aggregator function takes longer than expected, this might cause rebalanced and instability to the whole Simple Sourcing. Any blocking operation should be implemented as a separate client which reads from the generated Simple Sourcing events topic