pkg/chipingress/batch: add seqnum CloudEvents extension attribute#1848
pkg/chipingress/batch: add seqnum CloudEvents extension attribute#1848
Conversation
✅ API Diff Results - No breaking changes |
There was a problem hiding this comment.
Pull request overview
Adds a per-(source,type) monotonic seqnum CloudEvents extension attribute to the chipingress batch client to support downstream gap detection and ordered-ingestion observability.
Changes:
- Introduces a per-(source,type) counter map in the batch
Clientand aseqnumFor()helper for thread-safe increments. - Stamps/overwrites a
"seqnum"CloudEvents extension attribute (as a string) inQueueMessage(), with an option to clone events to avoid mutating caller-owned objects. - Adds unit tests covering gaps on dropped messages, pointer reuse behavior (clone on/off), per-key independence, and concurrency uniqueness.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| pkg/chipingress/batch/client.go | Adds per-key counters, seqnum stamping in QueueMessage, and WithEventClone option. |
| pkg/chipingress/batch/client_test.go | Adds tests validating seqnum behavior (gaps, independence, concurrency) and event cloning toggle. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pkg/chipingress/batch/client.go
Outdated
| counters sync.Map // map[string]*atomic.Uint64 for per-(source,type) seqnum | ||
| } |
There was a problem hiding this comment.
counters is an unbounded sync.Map that grows by one entry per unique (source,type) pair and is never cleaned up. If source/type cardinality can be high in production, this becomes a steady memory growth risk; consider documenting bounded cardinality assumptions, adding a reset/clear mechanism (e.g., on Stop()), or using a bounded/LRU approach.
| for g := 0; g < numGoroutines; g++ { | ||
| go func(goroutineID int) { | ||
| defer wg.Done() | ||
| for i := 0; i < eventsPerGoroutine; i++ { | ||
| event := &chipingress.CloudEventPb{ | ||
| Id: strconv.Itoa(goroutineID*eventsPerGoroutine + i), | ||
| Source: "concurrent-domain", | ||
| Type: "concurrent-type", | ||
| } | ||
| _ = client.QueueMessage(event, nil) | ||
| } | ||
| }(g) |
There was a problem hiding this comment.
In the concurrency subtest, QueueMessage errors are ignored, but the test later blocks on reading exactly totalEvents messages from client.messageBuffer. If QueueMessage ever returns an error (e.g., buffer sizing changes, shutdown, future logic), this test can hang rather than fail fast. Capture and assert QueueMessage returned nil (e.g., require.NoError) or collect errors from goroutines and fail the test explicitly.
There was a problem hiding this comment.
pkg/chipingress/batch/client.go
Outdated
| // seqnumFor returns the next sequence number for the given source+type pair. | ||
| // Each unique (source, type) pair has its own independent counter starting at 1. | ||
| func (b *Client) seqnumFor(source, typ string) uint64 { | ||
| key := source + "\x00" + typ |
There was a problem hiding this comment.
seqnumFor uses a concatenated string key (source + "\x00" + typ). This can collide if either source or type contains the separator (e.g. source="a\x00b", type="c" vs source="a", type="b\x00c"), and it allocates a new string on every call. Prefer using a comparable struct key (e.g. struct{source, typ string}) in the sync.Map to avoid collisions and reduce allocations.
| key := source + "\x00" + typ | |
| key := struct { | |
| source string | |
| typ string | |
| }{source: source, typ: typ} |
pkg/chipingress/batch/client.go
Outdated
| // Stamp seqnum extension attribute | ||
| seq := b.seqnumFor(event.Source, event.Type) |
There was a problem hiding this comment.
When cloneEvent is enabled, QueueMessage clones into eventToQueue but still derives the seqnum key from the original event (b.seqnumFor(event.Source, event.Type)). To keep behavior consistent with the buffered snapshot (and avoid any accidental coupling to the caller-owned object), derive the key from eventToQueue.Source/eventToQueue.Type instead.
| // Stamp seqnum extension attribute | |
| seq := b.seqnumFor(event.Source, event.Type) | |
| // Stamp seqnum extension attribute using the event snapshot being queued. | |
| seq := b.seqnumFor(eventToQueue.Source, eventToQueue.Type) |
There was a problem hiding this comment.
Addressed in a9408179: seqnum is now derived from the queued snapshot (eventToQueue.Source/eventToQueue.Type) rather than the caller-owned event. Also renamed the key field from typ to eventType for clarity.
484c619 to
021434a
Compare
1b16abc to
37d29f0
Compare
Summary
seqnumCloudEvents extension attribute in the chip ingress batch client(source, type)pair, starting at1Changes
sync.Map+atomic.Uint64seqnumas a string extension inQueueMessage()(preserves fulluint64range in CloudEvents protobuf)WithEventClone) so queued messages can snapshot caller-owned events safelyseqnumfrom the queued snapshot (eventToQueue) rather than caller-owned event pointerseqnumKey{source,eventType}) instead of concatenated strings (avoids separator collisions and extra key-string allocations)Stop()viaclearCounters()to avoid unbounded growth across client lifecycleQueueMessagecall fails, instead of potentially blocking while drainingJira INFOPLAT-3470