A
- a representation of an action command that is shared across all actions in the saga. This is typically a generic type, such as Json, or if using Avro serialization, SpecificRecord or GenericRecordpublic final class SagaApp<A> extends Object
Action execution involves submitting to the action request Kafka topic and waiting for it to finish by listening to the action response topic.
The result of action execution leads to a saga state transition. When this happens the next action(s) can be submitted, or if all actions have completed, finishing the saga and publishing to the saga response topic.
If any of the actions fail, the actions that are already completed are undone, if an undo action is defined.
We can typically create and run an instance of a Saga application with code like this:
SagaApp.of(
sagaSpec,
actionSpec,
sagaTopicBuilder -> sagaTopicBuilder.withDefaultTopicSpec(partitions, replication, retentionInDays)
configBuilder -> configBuilder)
.withActions("event_sourcing_account", "event_sourcing_auction", "event_sourcing_user", "async_payment")
.withRetryStrategy(RetryStrategy.repeat(3, Duration.ofSeconds(10)))
.run(streamAppConfig);
The saga coordinator only needs to know which action types are supported, and how to serialize the action requests.
It does not need to know anything about the action processor implementations. Neither does it need to know the topology of the saga graph. This is left up to the client to define dynamically.
Modifier and Type | Method and Description |
---|---|
static <A> SagaApp<A> |
of(SagaSpec<A> sagaSpec,
ActionSpec<A> actionSpec) |
static <A> SagaApp<A> |
of(SagaSpec<A> sagaSpec,
ActionSpec<A> actionSpec,
TopicConfigBuilder.BuildSteps topicBuildFn) |
static <A> SagaApp<A> |
of(SagaSpec<A> sagaSpec,
ActionSpec<A> actionSpec,
TopicConfigBuilder.BuildSteps topicBuildFn,
PropertiesBuilder.BuildSteps propertiesBuildFn)
Static constructor for a
SagaApp saga coordinator application. |
void |
run()
Run the SagaApp with the given stream app configuration.
|
SagaApp<A> |
withAction(String actionType,
TopicConfigBuilder.BuildSteps buildFn)
Adds a single action, specifying the topic configuration for the request and response topics for that action type
|
SagaApp<A> |
withActions(Collection<String> actionTypes,
TopicConfigBuilder.BuildSteps buildFn)
Adds multiple actions, specifying the topic configuration for the request and response topics for all action types
|
SagaApp<A> |
withActions(String... actionTypes)
Adds multiple actions, using default topic configuration
|
SagaApp<A> |
withActions(TopicConfigBuilder.BuildSteps buildFn,
String... actionTypes)
Adds a variable length argument list of action types, specifying the topic configuration for the request and response topics for all action types
|
SagaApp<A> |
withExecutor(ScheduledExecutorService executor)
Sets an executor for the Saga app.
|
SagaApp<A> |
withRetryStrategy(RetryStrategy strategy)
Sets the retry strategy that applies to all action types, unless specifically overridden
|
SagaApp<A> |
withRetryStrategy(String actionType,
RetryStrategy strategy)
Sets the retry strategy for a specific action type
|
public static <A> SagaApp<A> of(SagaSpec<A> sagaSpec, ActionSpec<A> actionSpec, TopicConfigBuilder.BuildSteps topicBuildFn, PropertiesBuilder.BuildSteps propertiesBuildFn)
SagaApp
saga coordinator application.A
- a representation of an action command that is shared across all actions in the saga. This is typically a generic type, such as Json, or if using Avro serialization, SpecificRecord or GenericRecordsagaSpec
- Information common to all sagas, such as Serdes for the saga topicsactionSpec
- Information common to all saga actions and action processors, such as Serdes for the action topicstopicBuildFn
- a function to set topic configuration details incrementallypropertiesBuildFn
- a function to set kafka configuration properties incrementallypublic static <A> SagaApp<A> of(SagaSpec<A> sagaSpec, ActionSpec<A> actionSpec, TopicConfigBuilder.BuildSteps topicBuildFn)
public static <A> SagaApp<A> of(SagaSpec<A> sagaSpec, ActionSpec<A> actionSpec)
public SagaApp<A> withAction(String actionType, TopicConfigBuilder.BuildSteps buildFn)
actionType
- the action type for the actionbuildFn
- a function to set topic configuration details incrementallypublic SagaApp<A> withActions(Collection<String> actionTypes, TopicConfigBuilder.BuildSteps buildFn)
actionTypes
- the action type to addbuildFn
- a function to set topic configuration details incrementallypublic SagaApp<A> withActions(String... actionTypes)
actionTypes
- the action type to addpublic SagaApp<A> withActions(TopicConfigBuilder.BuildSteps buildFn, String... actionTypes)
actionTypes
- the action type to addbuildFn
- a function to set topic configuration details incrementallypublic SagaApp<A> withExecutor(ScheduledExecutorService executor)
executor
- the executorpublic SagaApp<A> withRetryStrategy(RetryStrategy strategy)
strategy
- the strategypublic SagaApp<A> withRetryStrategy(String actionType, RetryStrategy strategy)
actionType
- the action typestrategy
- the strategypublic void run()
This builds the topology, create the saga and action topic streams, and starts the KStream execution
Copyright © 2019. All rights reserved.