Resulting context
The primary benefit of the solution is that unrecoverable events, which are usually caused by bugs, will not delay the processing of legitimate events. Stream processors handle many different types of events. Some are of no interest to a specific processor and will be ignored and advance the checkpoint. Other events follow a different processing path that does not encounter an issue, while others follow the same path but the events themselves are valid. This increases stability and resilience because stream processors are able to continue to make forward progress in the face of failures and potentially self-heal.
One drawback is that a large number of fault events can be produced that will require manual intervention and resubmission. In some cases, this may be the best course of action, while in other cases retry may be a better option. Unfortunately, it is not always obvious in advance how errors will manifest themselves. Therefore, these interactions need to be tuned over time. Initially, it is best to take a conservative approach and have every error to produce a fault. Early on, as the processing logic is still evolving and unstable, many, if not most, errors will be bugs that are unrecoverable. The fault events will also be a good source of information for investigating and learning which types of errors should be retried instead.
Monitoring must also be set up to send out alerts when faults occur. These alerts are a critical tool that helps self-sufficient, full-stack teams respond to errors quickly and minimize the time needed to recover from errors. We will be discussing this in more detail in Chapter 8, Monitoring. In addition to monitoring for fault events, it is necessary to monitor the iterator age of each stream processor. For any errors that are not handled, the stream processor will retry until the error corrects itself or the event expires. When we properly choose to retry a transient error, then the error should recover in a short time period. However, if we make the wrong decision or if the recovery is taking abnormally long, then we need a way to be alerted to this condition. The iterator age is the metric that provides us this information. It is a measure of how long a processor is leaving events on the stream before successfully processing them. This metric serves two main purposes. First, it may be an indicator that event volumes are too high, causing the processor to fall behind and thus additional shards need to be added. Alternatively, as in this context, it could be an indication that there is a significant error and the stream processor is stuck in a retry loop. In either case, it is important to be alerted to the fact so that timely, corrective measures can be taken.
Event streaming guarantees that it will deliver events at least once and in the order that they were received. This is a good start, but it means that we have to account for the other possibilities. When a stream processor encounters an unhandled error it will retry and any events in the batch that successfully processed prior to the error will be processed more than once. When we set aside a fault event and later resubmit the affected events, those events will no longer be in order, as other events will have since processed. Neither of these are reasons to abandon stream processing because this is indicative of all messaging systems and of real life. Our stream processors simply have to handle these anomalous scenarios. Consider the fact that producers, particularly offline-first producers, may not even submit events in the correct order. In these cases, there is nothing that the messaging system can do to compensate for this. Therefore, we need to implement stream processors to be idempotent and order-agnostic. When we receive the same event more than once, the resulting calculation should not double count the event. When we receive events out of order, older events should not nullify newer events.
One solution to the issue is what I refer to as the inverse optimistic lock. Traditional optimistic locks will throw an error when an update is performed with stale data and force the logic to retrieve the latest data and retry. The inverse oplock will just ignore events that are received out of order. This works fine when we only want to record the latest and greatest event. When we need to calculate a result based on the aggregation of all the events, then Event Sourcing and the ACID 2.0 transaction model may be the correct solution. We will cover this in more detail in the Command Query Responsibility Segregation (CQRS) pattern in Chapter 4, Boundary Patterns. For now, keep in mind that events are immutable and have a unique identifier, thus we can save them over and over again and the result is the same. We can also receive and store events in any order and trigger a recalculation based on the new set of events.
Take care to perform validation on the data received in events and raise a fault event for invalid data. In the Trilateral API pattern, we discuss the need to publish component's asynchronous interfaces and perform contract testing. However, it is prudent not to solely rely on specifications and contract testing. Do not validate all the events in a micro-batch up front, as this can slow down throughput. It is typically more efficient to validate events as they flow through.
Networks are not reliable, thus it is necessary to retry synchronous calls to avoid throwing unnecessary errors. Most libraries have long default timeouts, so it is important to explicitly set the timeout to a reasonably short value, preferably less than 1,000 ms. The logic should retry at least once quickly to account for network hiccups and perform an exponential backoff for additional retries. Most cloud SDKs already perform this logic by default and it is only necessary to configure the timeout and the number of retries. Make certain that the accumulative total of timeouts and retries is less than the timeout set for the stream processor. Note that there is a correlation between the machine instance size and network throughput. Thus, if you experience frequent timeouts, you may need to increase to a larger instance.
Our objective is to limit synchronous communication to the cloud-native resources within a component. These resources have high availability standards, but they also implement throttling. Therefore, it is necessary to exert back pressure on the stream so as not to overwhelm the resource and cause throttling. Backpressure is a natural byproduct on the functional reactive stream programming model. Data is pulled through the pipeline by successive steps when they are ready to process more data. Traditional procedural programming, on the other hand, simply loops over data as fast as possible and ultimately overwhelms the target systems. In addition to natural backpressure, it is possible to add explicit rate-limiting steps to further avoid throttling and parallel steps to take full advantage of asynchronous non-blocking I/O. Regardless, it is still necessary to handle throttling errors and retry with an exponential backoff, similar to network timeouts. Make certain that the rate-limiting parallel configurations are compatible with the capacity allocated to a resource. If auto-scaling is configured for a resource, then it may be necessary to fail a stream processor and allow it to retry in order to give the auto-scaling policy time to take effect.
Leverage batch or bulk APIs whenever possible to minimize the number of network calls and thus improve throughput. However, these APIs can be more complicated to use if they allow the individual items in the batch to succeed or fail on their own. In these cases, it is necessary to check the status of each item and only retry those that failed. This can significantly increase the complexity of the code. Therefore, it may make sense to iterate towards the use of these APIs to make certain they are worth the additional effort. It may also be possible to batch the processing of events without leveraging a batch API. For example, a micro-batch of events received from the stream may contain related events that should be grouped together to calculate an aggregate result that involves a single database update. In these cases, it is critical to treat the group of events as a unit of work that will succeed or fail as a group. When raising a fault event for a group of events, make certain to include all the events in the fault so that they can be resubmitted together.
In some cases, it might make sense to wrap a resource with a traditional circuit breaker library. When the circuit is open, a fault event will automatically be generating instead of going through the protracted retry flow over and over again. Cloud-native resources are highly available; thus the need for this complexity is lowered. However, in the case of a regional disruption, this might be advantageous for some workloads. During a regional disruption, the most upstream component typically a boundary component, that initiates a flow of events, would failover to another region, at which point all new events would flow in the other region. All event flows that remain in the disrupted region will now take longer to become eventually consistent.
The default behavior would be to let those events continue to retry over and over until the region recovers. In the meantime, some events will trickle through as the region recovers. Once the region fully recovers, the stream processors would proceed at full capacity until they catch up with the backlog of events, which should be a reasonable size assuming that the failover occurred properly and promptly. If a processor were not able to catch up, then it would be necessary to replay the dropped events from the data lake. For some workloads, such as those with very high volumes, it may make sense to siphon off the events proactively as faults and just resubmit those fault events after the region recovers. This is an advanced alternative that you can evolve to when a specific processor so warrants.
Sooner or later your team will have to deal with a stream processor that drops events. Keep in mind that the data lake is the source of record and one of its main purposes is to account for just this situation. Investigate the problem, determine the range of missing events, understand the impacts and implications of replaying those events, and then move forward. Do not treat replay as a crutch, but definitely leverage it as a safety net so that you feel empowered to experiment and evolve your stream processing logic.