Skip to content

Comments

pkg/chipingress/batch: add seqnum CloudEvents extension attribute#1848

Open
pkcll wants to merge 9 commits intomainfrom
feat/chipingress-batch-seqnum
Open

pkg/chipingress/batch: add seqnum CloudEvents extension attribute#1848
pkcll wants to merge 9 commits intomainfrom
feat/chipingress-batch-seqnum

Conversation

@pkcll
Copy link
Contributor

@pkcll pkcll commented Feb 20, 2026

Summary

  • Adds a monotonic seqnum CloudEvents extension attribute in the chip ingress batch client
  • Maintains independent sequence counters per (source, type) pair, starting at 1
  • Supports downstream gap detection and ordered-ingestion observability

Changes

  • Add per-key counters via sync.Map + atomic.Uint64
  • Stamp seqnum as a string extension in QueueMessage() (preserves full uint64 range in CloudEvents protobuf)
  • Add configurable cloning (WithEventClone) so queued messages can snapshot caller-owned events safely
  • Derive seqnum from the queued snapshot (eventToQueue) rather than caller-owned event pointer
  • Use a struct key for counters (seqnumKey{source,eventType}) instead of concatenated strings (avoids separator collisions and extra key-string allocations)
  • Clear counter state during Stop() via clearCounters() to avoid unbounded growth across client lifecycle
  • Strengthen concurrency test to fail fast if any QueueMessage call fails, instead of potentially blocking while draining

Jira INFOPLAT-3470

@github-actions
Copy link

github-actions bot commented Feb 20, 2026

✅ API Diff Results - No breaking changes


📄 View full apidiff report

@pkcll pkcll marked this pull request as ready for review February 20, 2026 14:58
@pkcll pkcll requested a review from a team as a code owner February 20, 2026 14:58
Copilot AI review requested due to automatic review settings February 20, 2026 14:58
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds a per-(source,type) monotonic seqnum CloudEvents extension attribute to the chipingress batch client to support downstream gap detection and ordered-ingestion observability.

Changes:

  • Introduces a per-(source,type) counter map in the batch Client and a seqnumFor() helper for thread-safe increments.
  • Stamps/overwrites a "seqnum" CloudEvents extension attribute (as a string) in QueueMessage(), with an option to clone events to avoid mutating caller-owned objects.
  • Adds unit tests covering gaps on dropped messages, pointer reuse behavior (clone on/off), per-key independence, and concurrency uniqueness.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.

File Description
pkg/chipingress/batch/client.go Adds per-key counters, seqnum stamping in QueueMessage, and WithEventClone option.
pkg/chipingress/batch/client_test.go Adds tests validating seqnum behavior (gaps, independence, concurrency) and event cloning toggle.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 39 to 40
counters sync.Map // map[string]*atomic.Uint64 for per-(source,type) seqnum
}
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

counters is an unbounded sync.Map that grows by one entry per unique (source,type) pair and is never cleaned up. If source/type cardinality can be high in production, this becomes a steady memory growth risk; consider documenting bounded cardinality assumptions, adding a reset/clear mechanism (e.g., on Stop()), or using a bounded/LRU approach.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in commit 021434a: 021434a5

Added clearCounters() and invoke it in Stop() after shutdown completes, so per-(source,type) seqnum state is released instead of growing unbounded across client lifecycle.

Comment on lines 1010 to 1021
for g := 0; g < numGoroutines; g++ {
go func(goroutineID int) {
defer wg.Done()
for i := 0; i < eventsPerGoroutine; i++ {
event := &chipingress.CloudEventPb{
Id: strconv.Itoa(goroutineID*eventsPerGoroutine + i),
Source: "concurrent-domain",
Type: "concurrent-type",
}
_ = client.QueueMessage(event, nil)
}
}(g)
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the concurrency subtest, QueueMessage errors are ignored, but the test later blocks on reading exactly totalEvents messages from client.messageBuffer. If QueueMessage ever returns an error (e.g., buffer sizing changes, shutdown, future logic), this test can hang rather than fail fast. Capture and assert QueueMessage returned nil (e.g., require.NoError) or collect errors from goroutines and fail the test explicitly.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in commit 37d29f0: 37d29f06

Updated the concurrent access produces unique seqnums subtest to capture QueueMessage errors from goroutines, assert none occurred, and only then drain messageBuffer, so the test fails explicitly instead of potentially hanging.

// seqnumFor returns the next sequence number for the given source+type pair.
// Each unique (source, type) pair has its own independent counter starting at 1.
func (b *Client) seqnumFor(source, typ string) uint64 {
key := source + "\x00" + typ
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seqnumFor uses a concatenated string key (source + "\x00" + typ). This can collide if either source or type contains the separator (e.g. source="a\x00b", type="c" vs source="a", type="b\x00c"), and it allocates a new string on every call. Prefer using a comparable struct key (e.g. struct{source, typ string}) in the sync.Map to avoid collisions and reduce allocations.

Suggested change
key := source + "\x00" + typ
key := struct {
source string
typ string
}{source: source, typ: typ}

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in commit a2f43e7: a2f43e76

Switched seqnumFor to use a comparable struct key (seqnumKey{source, eventType}) instead of a concatenated string, which avoids separator collisions and per-call key string allocation.

Comment on lines 176 to 177
// Stamp seqnum extension attribute
seq := b.seqnumFor(event.Source, event.Type)
Copy link

Copilot AI Feb 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When cloneEvent is enabled, QueueMessage clones into eventToQueue but still derives the seqnum key from the original event (b.seqnumFor(event.Source, event.Type)). To keep behavior consistent with the buffered snapshot (and avoid any accidental coupling to the caller-owned object), derive the key from eventToQueue.Source/eventToQueue.Type instead.

Suggested change
// Stamp seqnum extension attribute
seq := b.seqnumFor(event.Source, event.Type)
// Stamp seqnum extension attribute using the event snapshot being queued.
seq := b.seqnumFor(eventToQueue.Source, eventToQueue.Type)

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in a9408179: seqnum is now derived from the queued snapshot (eventToQueue.Source/eventToQueue.Type) rather than the caller-owned event. Also renamed the key field from typ to eventType for clarity.

@pkcll pkcll requested a review from hendoxc February 20, 2026 20:34
@pkcll pkcll force-pushed the feat/chipingress-batch-seqnum branch from 1b16abc to 37d29f0 Compare February 23, 2026 14:58
@pkcll pkcll marked this pull request as ready for review February 23, 2026 15:02
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants