fix: buffer SSE messages until up-to-date message#2869
Conversation
…pe stream if the SSE connection breaks after processing messages and before receiving the up-to-date message.
Codecov ReportAttention: Patch coverage is
✅ All tests successful. No failed tests found.
Additional details and impacted files@@ Coverage Diff @@
## main #2869 +/- ##
==========================================
+ Coverage 77.79% 79.21% +1.41%
==========================================
Files 157 157
Lines 7495 7523 +28
Branches 284 284
==========================================
+ Hits 5831 5959 +128
+ Misses 1662 1562 -100
Partials 2 2
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
This is great. Thanks @kevin-dp
As an aside I don't like the existing getOffset method that constructs one from the LSN and a _0 suffix. The offset being constructed from an LSN is a backend implementation detail we should not depend on. I think we had this in my original POC as a work around, but we really should replace this by adding the actual offset on the up-to-date message, at least in SSE mode.
edit: filed a bug #2870
This PR addresses the issue raised in electric-sql#2546 (comment). The crux of the issue is that messages might be processed several times if the SSE connection breaks after processing a message but before receiving the up-to-date message. This is because we only advance the offset when processing the up-to-date message, hence, we would fetch again at the old offset and receive some messages that we already processed. The fix is easy: we batch SSE messages until we receive up-to-date and only process the messages at that point. The hard part was to write a unit test that reproduces this scenario. After a bit of trial and error i managed to do that by creating a custom fetch wrapper that proxies the SSE response but filters out the first up-to-date message and then forces a refresh of the shapestream at that point.
This PR addresses the issue raised in #2546 (comment). The crux of the issue is that messages might be processed several times if the SSE connection breaks after processing a message but before receiving the up-to-date message. This is because we only advance the offset when processing the up-to-date message, hence, we would fetch again at the old offset and receive some messages that we already processed.
The fix is easy: we batch SSE messages until we receive up-to-date and only process the messages at that point. The hard part was to write a unit test that reproduces this scenario. After a bit of trial and error i managed to do that by creating a custom fetch wrapper that proxies the SSE response but filters out the first up-to-date message and then forces a refresh of the shapestream at that point.