diff --git a/README.md b/README.md index c4b68291..69cd190b 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,43 @@ Deployment/runbook documents: - `docs/docker_multinode_manual_run.md` (manual `docker run`, 4-5 node cluster on multiple VMs, no docker compose) +## Metrics and Grafana + +Elastickv now exposes Prometheus metrics on `--metricsAddress` (default: `localhost:9090` in `main.go`, `127.0.0.1:9090` in `cmd/server/demo.go` single-node mode). The built-in 3-node demo binds metrics on `0.0.0.0:9091`, `0.0.0.0:9092`, and `0.0.0.0:9093`, and uses the bearer token `demo-metrics-token` unless `--metricsToken` is set. + +The exported metrics cover: + +- DynamoDB-compatible API request rate, success/error split, latency, request/response size, and per-table read/write item counts +- Raft local state, leader identity, current members, commit/applied index, and leader contact lag + +Provisioned monitoring assets live under: + +- `monitoring/prometheus/prometheus.yml` +- `monitoring/grafana/dashboards/elastickv-cluster-overview.json` +- `monitoring/grafana/provisioning/` +- `monitoring/docker-compose.yml` + +If you bind `--metricsAddress` to a non-loopback address, `--metricsToken` is required. Prometheus must send the same bearer token, for example: + +```yaml +scrape_configs: + - job_name: elastickv + authorization: + type: Bearer + credentials: YOUR_METRICS_TOKEN +``` + +To scrape a multi-node deployment, bind `--metricsAddress` to each node's private IP and set `--metricsToken`, for example `--metricsAddress "10.0.0.11:9090" --metricsToken "YOUR_METRICS_TOKEN"`. + +For the local 3-node demo, start Grafana and Prometheus with: + +```bash +cd monitoring +docker compose up -d +``` + +`monitoring/prometheus/prometheus.yml` assumes the demo token `demo-metrics-token`. If you override `--metricsToken` when running `go run ./cmd/server/demo.go`, update `authorization.credentials` in that file to match. + ## Example Usage @@ -41,6 +78,16 @@ To start the server, use the following command: go run cmd/server/demo.go ``` +To expose metrics on a dedicated port: +```bash +go run . \ + --address "127.0.0.1:50051" \ + --redisAddress "127.0.0.1:6379" \ + --dynamoAddress "127.0.0.1:8000" \ + --metricsAddress "127.0.0.1:9090" \ + --raftId "n1" +``` + ### Starting the Client To start the client, use this command: diff --git a/adapter/dynamodb.go b/adapter/dynamodb.go index 532d17ab..f989b936 100644 --- a/adapter/dynamodb.go +++ b/adapter/dynamodb.go @@ -19,6 +19,7 @@ import ( "time" "github.com/bootjp/elastickv/kv" + "github.com/bootjp/elastickv/monitoring" "github.com/bootjp/elastickv/store" "github.com/cockroachdb/errors" ) @@ -69,6 +70,21 @@ const ( dynamoOrderedKeyEncodingV2 = 2 ) +var dynamoOperationTargets = map[string]string{ + batchWriteItemTarget: "BatchWriteItem", + createTableTarget: "CreateTable", + deleteItemTarget: "DeleteItem", + deleteTableTarget: "DeleteTable", + describeTableTarget: "DescribeTable", + getItemTarget: "GetItem", + listTablesTarget: "ListTables", + putItemTarget: "PutItem", + queryTarget: "Query", + scanTarget: "Scan", + transactWriteItemsTarget: "TransactWriteItems", + updateItemTarget: "UpdateItem", +} + const ( dynamoErrValidation = "ValidationException" dynamoErrInternal = "InternalServerError" @@ -86,17 +102,109 @@ const ( dynamoSelectCount = "COUNT" ) +type DynamoDBServerOption func(*DynamoDBServer) + type DynamoDBServer struct { listen net.Listener store store.MVCCStore coordinator kv.Coordinator httpServer *http.Server targetHandlers map[string]func(http.ResponseWriter, *http.Request) + requestObserver monitoring.DynamoDBRequestObserver itemUpdateLocks [itemUpdateLockStripeCount]sync.Mutex tableLocks [tableLockStripeCount]sync.Mutex } -func NewDynamoDBServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordinator) *DynamoDBServer { +// WithDynamoDBRequestObserver enables Prometheus-compatible request metrics. +func WithDynamoDBRequestObserver(observer monitoring.DynamoDBRequestObserver) DynamoDBServerOption { + return func(server *DynamoDBServer) { + server.requestObserver = observer + } +} + +type dynamoRequestMetricsContextKey struct{} + +type dynamoRequestMetricsState struct { + tables map[string]monitoring.DynamoDBTableMetrics +} + +type countingReadCloser struct { + io.ReadCloser + bytesRead int +} + +func (c *countingReadCloser) Read(p []byte) (int, error) { + if c == nil || c.ReadCloser == nil { + return 0, io.EOF + } + n, err := c.ReadCloser.Read(p) + c.bytesRead += n + if err != nil { + if errors.Is(err, io.EOF) { + return n, io.EOF + } + return n, errors.WithStack(err) + } + return n, nil +} + +func (c *countingReadCloser) Close() error { + if c == nil || c.ReadCloser == nil { + return nil + } + if err := c.ReadCloser.Close(); err != nil { + return errors.WithStack(err) + } + return nil +} + +func (c *countingReadCloser) BytesRead() int { + if c == nil { + return 0 + } + return c.bytesRead +} + +type dynamoResponseRecorder struct { + http.ResponseWriter + statusCode int + bytesWritten int +} + +func (r *dynamoResponseRecorder) WriteHeader(statusCode int) { + if r.statusCode == 0 { + r.statusCode = statusCode + } + r.ResponseWriter.WriteHeader(statusCode) +} + +func (r *dynamoResponseRecorder) Write(p []byte) (int, error) { + if r.statusCode == 0 { + r.WriteHeader(http.StatusOK) + } + n, err := r.ResponseWriter.Write(p) + r.bytesWritten += n + if err != nil { + return n, errors.WithStack(err) + } + return n, nil +} + +func (r *dynamoResponseRecorder) StatusCode() int { + if r == nil || r.statusCode == 0 { + return http.StatusOK + } + return r.statusCode +} + +func (r *dynamoResponseRecorder) BytesWritten() int { + if r == nil { + return 0 + } + return r.bytesWritten +} + +func NewDynamoDBServer(listen net.Listener, st store.MVCCStore, coordinate kv.Coordinator, opts ...DynamoDBServerOption) *DynamoDBServer { d := &DynamoDBServer{ listen: listen, store: st, @@ -119,6 +227,11 @@ func NewDynamoDBServer(listen net.Listener, st store.MVCCStore, coordinate kv.Co mux := http.NewServeMux() mux.HandleFunc("/", d.handle) d.httpServer = &http.Server{Handler: mux, ReadHeaderTimeout: time.Second} + for _, opt := range opts { + if opt != nil { + opt(d) + } + } return d } @@ -137,9 +250,34 @@ func (d *DynamoDBServer) Stop() { func (d *DynamoDBServer) handle(w http.ResponseWriter, r *http.Request) { target := r.Header.Get("X-Amz-Target") - if !d.dispatchByTarget(target, w, r) { - writeDynamoError(w, http.StatusBadRequest, dynamoErrValidation, "unsupported operation") + if d.requestObserver == nil { + d.dispatchOrWriteUnsupported(target, w, r) + return } + + operation := dynamoOperationName(target) + d.requestObserver.ObserveInFlightChange(operation, 1) + defer d.requestObserver.ObserveInFlightChange(operation, -1) + + state := &dynamoRequestMetricsState{tables: make(map[string]monitoring.DynamoDBTableMetrics)} + r = r.WithContext(context.WithValue(r.Context(), dynamoRequestMetricsContextKey{}, state)) + bodyCounter := &countingReadCloser{ReadCloser: r.Body} + r.Body = bodyCounter + recorder := &dynamoResponseRecorder{ResponseWriter: w} + started := time.Now() + + d.dispatchOrWriteUnsupported(target, recorder, r) + + d.requestObserver.ObserveDynamoDBRequest(monitoring.DynamoDBRequestReport{ + Operation: operation, + HTTPStatus: recorder.StatusCode(), + ErrorType: recorder.Header().Get("x-amzn-ErrorType"), + Duration: time.Since(started), + RequestBytes: bodyCounter.BytesRead(), + ResponseBytes: recorder.BytesWritten(), + Tables: state.tableNames(), + TableMetrics: state.tableMetrics(), + }) } func maxDynamoBodyReader(w http.ResponseWriter, r *http.Request) io.Reader { @@ -155,6 +293,109 @@ func (d *DynamoDBServer) dispatchByTarget(target string, w http.ResponseWriter, return true } +func (d *DynamoDBServer) dispatchOrWriteUnsupported(target string, w http.ResponseWriter, r *http.Request) { + if d.dispatchByTarget(target, w, r) { + return + } + writeDynamoError(w, http.StatusBadRequest, dynamoErrValidation, "unsupported operation") +} + +func dynamoOperationName(target string) string { + if operation, ok := dynamoOperationTargets[strings.TrimSpace(target)]; ok { + return operation + } + return "unknown" +} + +func dynamoRequestMetricsFromContext(ctx context.Context) *dynamoRequestMetricsState { + if ctx == nil { + return nil + } + state, _ := ctx.Value(dynamoRequestMetricsContextKey{}).(*dynamoRequestMetricsState) + return state +} + +func (s *dynamoRequestMetricsState) recordTable(table string) { + if s == nil { + return + } + table = strings.TrimSpace(table) + if table == "" { + return + } + if s.tables == nil { + s.tables = make(map[string]monitoring.DynamoDBTableMetrics) + } + if _, ok := s.tables[table]; !ok { + s.tables[table] = monitoring.DynamoDBTableMetrics{} + } +} + +func (s *dynamoRequestMetricsState) addTableMetrics(table string, returnedItems int, scannedItems int, writtenItems int) { + if s == nil { + return + } + table = strings.TrimSpace(table) + if table == "" { + return + } + s.recordTable(table) + metrics := s.tables[table] + metrics.ReturnedItems += returnedItems + metrics.ScannedItems += scannedItems + metrics.WrittenItems += writtenItems + s.tables[table] = metrics +} + +func (s *dynamoRequestMetricsState) tableNames() []string { + if s == nil || len(s.tables) == 0 { + return nil + } + names := make([]string, 0, len(s.tables)) + for table := range s.tables { + names = append(names, table) + } + sort.Strings(names) + return names +} + +func (s *dynamoRequestMetricsState) tableMetrics() map[string]monitoring.DynamoDBTableMetrics { + if s == nil || len(s.tables) == 0 { + return nil + } + out := make(map[string]monitoring.DynamoDBTableMetrics, len(s.tables)) + for table, metrics := range s.tables { + out[table] = metrics + } + return out +} + +func (d *DynamoDBServer) observeTables(ctx context.Context, tables ...string) { + state := dynamoRequestMetricsFromContext(ctx) + if state == nil { + return + } + for _, table := range tables { + state.recordTable(table) + } +} + +func (d *DynamoDBServer) observeReadMetrics(ctx context.Context, table string, returnedItems int, scannedItems int) { + state := dynamoRequestMetricsFromContext(ctx) + if state == nil { + return + } + state.addTableMetrics(table, returnedItems, scannedItems, 0) +} + +func (d *DynamoDBServer) observeWrittenItems(ctx context.Context, table string, writtenItems int) { + state := dynamoRequestMetricsFromContext(ctx) + if state == nil { + return + } + state.addTableMetrics(table, 0, 0, writtenItems) +} + type createTableAttributeDefinition struct { AttributeName string `json:"AttributeName"` AttributeType string `json:"AttributeType"` @@ -328,6 +569,39 @@ type dynamoGlobalSecondaryIndex struct { Projection dynamoGSIProjection `json:"projection"` } +func (g *dynamoGlobalSecondaryIndex) UnmarshalJSON(b []byte) error { + type rawGSI struct { + KeySchema *dynamoKeySchema `json:"key_schema"` + Projection *dynamoGSIProjection `json:"projection"` + HashKey string `json:"hash_key"` + RangeKey string `json:"range_key"` + } + + var raw rawGSI + if err := json.Unmarshal(b, &raw); err != nil { + return errors.WithStack(err) + } + + if raw.KeySchema != nil { + g.KeySchema = *raw.KeySchema + } else { + g.KeySchema = dynamoKeySchema{ + HashKey: raw.HashKey, + RangeKey: raw.RangeKey, + } + } + + if raw.Projection != nil && strings.TrimSpace(raw.Projection.ProjectionType) != "" { + g.Projection = *raw.Projection + } else { + // Older schema snapshots stored only the key schema. Those GSIs behaved + // like ALL projections, so preserve that behavior when normalizing. + g.Projection = dynamoGSIProjection{ProjectionType: "ALL"} + } + + return nil +} + type dynamoTableSchema struct { TableName string `json:"table_name"` AttributeDefinitions map[string]string `json:"attribute_definitions,omitempty"` @@ -355,6 +629,7 @@ func (d *DynamoDBServer) createTable(w http.ResponseWriter, r *http.Request) { writeDynamoErrorFromErr(w, err) return } + d.observeTables(r.Context(), schema.TableName) writeDynamoJSON(w, map[string]any{ "TableDescription": map[string]any{ "TableName": in.TableName, @@ -799,6 +1074,7 @@ func (d *DynamoDBServer) putItem(w http.ResponseWriter, r *http.Request) { writeDynamoErrorFromErr(w, err) return } + d.observeWrittenItems(r.Context(), in.TableName, 1) resp := map[string]any{} if attrs := putItemReturnAttributes(in.ReturnValues, plan.current); len(attrs) > 0 { resp["Attributes"] = attrs @@ -988,6 +1264,7 @@ func (d *DynamoDBServer) getItem(w http.ResponseWriter, r *http.Request) { writeDynamoJSON(w, map[string]any{}) return } + d.observeReadMetrics(r.Context(), in.TableName, 1, 1) projected, err := projectItem(current.item, in.ProjectionExpression, in.ExpressionAttributeNames) if err != nil { writeDynamoErrorFromErr(w, err) @@ -1018,6 +1295,9 @@ func (d *DynamoDBServer) deleteItem(w http.ResponseWriter, r *http.Request) { writeDynamoErrorFromErr(w, err) return } + if len(plan.current) > 0 { + d.observeWrittenItems(r.Context(), in.TableName, 1) + } resp := map[string]any{} if shouldReturnOld && len(plan.current) > 0 { resp["Attributes"] = plan.current @@ -1145,6 +1425,7 @@ func (d *DynamoDBServer) updateItem(w http.ResponseWriter, r *http.Request) { writeDynamoErrorFromErr(w, err) return } + d.observeWrittenItems(r.Context(), in.TableName, 1) resp := map[string]any{} if attrs := updateItemReturnAttributes(in.ReturnValues, plan.current, plan.next); len(attrs) > 0 { resp["Attributes"] = attrs @@ -1459,6 +1740,7 @@ func (d *DynamoDBServer) query(w http.ResponseWriter, r *http.Request) { writeDynamoErrorFromErr(w, err) return } + d.observeReadMetrics(r.Context(), in.TableName, out.count, out.scannedCount) resp := map[string]any{ "Items": out.items, "Count": out.count, @@ -1481,6 +1763,7 @@ func (d *DynamoDBServer) scan(w http.ResponseWriter, r *http.Request) { writeDynamoErrorFromErr(w, err) return } + d.observeReadMetrics(r.Context(), in.TableName, out.count, out.scannedCount) resp := map[string]any{ "Items": out.items, "Count": out.count, @@ -3257,6 +3540,9 @@ func (d *DynamoDBServer) batchWriteItem(w http.ResponseWriter, r *http.Request) writeDynamoErrorFromErr(w, err) return } + for table, written := range batchWriteCommittedCounts(in, unprocessed) { + d.observeWrittenItems(r.Context(), table, written) + } writeDynamoJSON(w, map[string]any{"UnprocessedItems": unprocessed}) } @@ -3288,6 +3574,17 @@ func decodeBatchWriteItemInput(bodyReader io.Reader) (batchWriteItemInput, error return in, nil } +func batchWriteCommittedCounts(in batchWriteItemInput, unprocessed map[string][]batchWriteRequest) map[string]int { + out := make(map[string]int, len(in.RequestItems)) + for table, requests := range in.RequestItems { + written := len(requests) - len(unprocessed[table]) + if written > 0 { + out[table] = written + } + } + return out +} + func (d *DynamoDBServer) batchWriteItems( ctx context.Context, in batchWriteItemInput, @@ -3474,14 +3771,13 @@ func (d *DynamoDBServer) transactWriteItems(w http.ResponseWriter, r *http.Reque writeDynamoErrorFromErr(w, err) return } - if _, err := collectTransactWriteTableNames(in); err != nil { - writeDynamoErrorFromErr(w, err) - return - } if err := d.transactWriteItemsWithRetry(r.Context(), in); err != nil { writeDynamoErrorFromErr(w, err) return } + for table, written := range transactWriteWrittenCounts(in) { + d.observeWrittenItems(r.Context(), table, written) + } writeDynamoJSON(w, map[string]any{}) } @@ -3520,6 +3816,21 @@ func collectTransactWriteTableNames(in transactWriteItemsInput) ([]string, error return names, nil } +func transactWriteWrittenCounts(in transactWriteItemsInput) map[string]int { + out := make(map[string]int) + for _, item := range in.TransactItems { + tableName, err := transactWriteItemTableName(item) + if err != nil || strings.TrimSpace(tableName) == "" { + continue + } + switch { + case item.Put != nil, item.Update != nil, item.Delete != nil: + out[tableName]++ + } + } + return out +} + func transactWriteItemTableName(item transactWriteItem) (string, error) { switch countTransactWriteItemActions(item) { case 0: @@ -7065,6 +7376,7 @@ func (d *DynamoDBServer) loadTableSchemaAt(ctx context.Context, tableName string if err := json.Unmarshal(b, schema); err != nil { return nil, false, errors.WithStack(err) } + d.observeTables(ctx, schema.TableName) return schema, true, nil } diff --git a/adapter/dynamodb_metrics_test.go b/adapter/dynamodb_metrics_test.go new file mode 100644 index 00000000..e170c805 --- /dev/null +++ b/adapter/dynamodb_metrics_test.go @@ -0,0 +1,92 @@ +package adapter + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/bootjp/elastickv/monitoring" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func TestDynamoDBServerHandleObservesSuccessMetrics(t *testing.T) { + registry := monitoring.NewRegistry("n1", "10.0.0.1:50051") + server := NewDynamoDBServer(nil, nil, &stubAdapterCoordinator{}, WithDynamoDBRequestObserver(registry.DynamoDBObserver())) + server.targetHandlers = map[string]func(http.ResponseWriter, *http.Request){ + putItemTarget: func(w http.ResponseWriter, r *http.Request) { + _, _ = io.ReadAll(maxDynamoBodyReader(w, r)) + server.observeTables(r.Context(), "orders") + server.observeWrittenItems(r.Context(), "orders", 1) + writeDynamoJSON(w, map[string]any{"ok": true}) + }, + } + + req := httptest.NewRequestWithContext(context.Background(), http.MethodPost, "/", strings.NewReader(`{"TableName":"orders"}`)) + req.Header.Set("X-Amz-Target", putItemTarget) + rec := httptest.NewRecorder() + server.handle(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + err := testutil.GatherAndCompare( + registry.Gatherer(), + strings.NewReader(` +# HELP elastickv_dynamodb_requests_total Total number of DynamoDB-compatible API requests by operation and outcome. +# TYPE elastickv_dynamodb_requests_total counter +elastickv_dynamodb_requests_total{node_address="10.0.0.1:50051",node_id="n1",operation="PutItem",outcome="success"} 1 +# HELP elastickv_dynamodb_written_items_total Total number of items written or deleted by DynamoDB-compatible write APIs. +# TYPE elastickv_dynamodb_written_items_total counter +elastickv_dynamodb_written_items_total{node_address="10.0.0.1:50051",node_id="n1",operation="PutItem",table="orders"} 1 +`), + "elastickv_dynamodb_requests_total", + "elastickv_dynamodb_written_items_total", + ) + require.NoError(t, err) +} + +func TestDynamoDBServerHandleObservesConditionalFailures(t *testing.T) { + registry := monitoring.NewRegistry("n1", "10.0.0.1:50051") + server := NewDynamoDBServer(nil, nil, &stubAdapterCoordinator{}, WithDynamoDBRequestObserver(registry.DynamoDBObserver())) + server.targetHandlers = map[string]func(http.ResponseWriter, *http.Request){ + updateItemTarget: func(w http.ResponseWriter, r *http.Request) { + _, _ = io.ReadAll(maxDynamoBodyReader(w, r)) + server.observeTables(r.Context(), "orders") + writeDynamoError(w, http.StatusBadRequest, dynamoErrConditionalFailed, "condition failed") + }, + } + + req := httptest.NewRequestWithContext(context.Background(), http.MethodPost, "/", strings.NewReader(`{"TableName":"orders"}`)) + req.Header.Set("X-Amz-Target", updateItemTarget) + rec := httptest.NewRecorder() + server.handle(rec, req) + + require.Equal(t, http.StatusBadRequest, rec.Code) + err := testutil.GatherAndCompare( + registry.Gatherer(), + strings.NewReader(` +# HELP elastickv_dynamodb_conditional_check_failed_total Total number of conditional check failures returned by DynamoDB-compatible APIs. +# TYPE elastickv_dynamodb_conditional_check_failed_total counter +elastickv_dynamodb_conditional_check_failed_total{node_address="10.0.0.1:50051",node_id="n1",operation="UpdateItem",table="orders"} 1 +`), + "elastickv_dynamodb_conditional_check_failed_total", + ) + require.NoError(t, err) +} + +func TestDynamoRequestMetricsStateAddsMetricsWithoutExplicitTableRegistration(t *testing.T) { + state := &dynamoRequestMetricsState{} + + state.addTableMetrics("orders", 0, 0, 1) + require.Equal(t, map[string]monitoring.DynamoDBTableMetrics{ + "orders": {WrittenItems: 1}, + }, state.tableMetrics()) +} + +func TestDynamoOperationNameRejectsUnknownTargets(t *testing.T) { + require.Equal(t, "PutItem", dynamoOperationName(putItemTarget)) + require.Equal(t, "unknown", dynamoOperationName(targetPrefix+"PutItem-random")) + require.Equal(t, "unknown", dynamoOperationName("random")) +} diff --git a/adapter/dynamodb_migration_test.go b/adapter/dynamodb_migration_test.go index d0395912..11298048 100644 --- a/adapter/dynamodb_migration_test.go +++ b/adapter/dynamodb_migration_test.go @@ -2,6 +2,7 @@ package adapter import ( "context" + "encoding/json" "fmt" "testing" "time" @@ -151,6 +152,61 @@ func TestDynamoDB_EnsureLegacyTableMigration_MigratesLegacyGeneration(t *testing }, time.Second, 10*time.Millisecond) } +func TestDynamoDB_EnsureLegacyTableMigration_NormalizesLegacyGSIJSONFormat(t *testing.T) { + t.Parallel() + + legacySchema, server, st := newLegacyMigrationTestServer(t, true, "S") + writer := newDynamoFixtureWriter(t, st) + + legacyBody, err := json.Marshal(map[string]any{ + "table_name": legacySchema.TableName, + "attribute_definitions": legacySchema.AttributeDefinitions, + "primary_key": map[string]any{ + "hash_key": legacySchema.PrimaryKey.HashKey, + "range_key": legacySchema.PrimaryKey.RangeKey, + }, + "global_secondary_indexes": map[string]any{ + "status-index": map[string]any{ + "hash_key": "status", + "range_key": "sk", + }, + }, + "generation": legacySchema.Generation, + }) + require.NoError(t, err) + writer.put(dynamoTableMetaKey(legacySchema.TableName), legacyBody) + writer.put(dynamoTableGenerationKey(legacySchema.TableName), []byte(fmt.Sprintf("%d", legacySchema.Generation))) + + writer.writeItem(legacySchema, map[string]attributeValue{ + "pk": newStringAttributeValue("tenant"), + "sk": newStringAttributeValue("2026-03-09T12:00:00Z"), + "status": newStringAttributeValue("open"), + "value": newStringAttributeValue("v1"), + }) + + ctx := context.Background() + require.NoError(t, server.ensureLegacyTableMigration(ctx, legacySchema.TableName)) + + schema, exists, err := server.loadTableSchema(ctx, legacySchema.TableName) + require.NoError(t, err) + require.True(t, exists) + require.Equal(t, "status", schema.GlobalSecondaryIndexes["status-index"].KeySchema.HashKey) + require.Equal(t, "sk", schema.GlobalSecondaryIndexes["status-index"].KeySchema.RangeKey) + require.Equal(t, "ALL", schema.GlobalSecondaryIndexes["status-index"].Projection.ProjectionType) + + out, err := server.queryItems(ctx, queryInput{ + TableName: legacySchema.TableName, + IndexName: "status-index", + KeyConditionExpression: "status = :status", + ExpressionAttributeValues: map[string]attributeValue{ + ":status": newStringAttributeValue("open"), + }, + }) + require.NoError(t, err) + require.Len(t, out.items, 1) + require.Equal(t, newStringAttributeValue("v1"), out.items[0]["value"]) +} + func TestDynamoDB_EnsureLegacyTableMigration_PrefersExistingTargetItems(t *testing.T) { t.Parallel() diff --git a/cmd/server/demo.go b/cmd/server/demo.go index 439a076c..71b8bd5c 100644 --- a/cmd/server/demo.go +++ b/cmd/server/demo.go @@ -7,6 +7,7 @@ import ( "io" "log/slog" "net" + "net/http" "os" "path/filepath" "strings" @@ -18,7 +19,9 @@ import ( raftadminpb "github.com/Jille/raftadmin/proto" "github.com/bootjp/elastickv/adapter" "github.com/bootjp/elastickv/distribution" + internalutil "github.com/bootjp/elastickv/internal" "github.com/bootjp/elastickv/kv" + "github.com/bootjp/elastickv/monitoring" pb "github.com/bootjp/elastickv/proto" "github.com/bootjp/elastickv/store" "github.com/cockroachdb/errors" @@ -31,13 +34,15 @@ import ( ) var ( - address = flag.String("address", ":50051", "gRPC/Raft address") - redisAddress = flag.String("redisAddress", ":6379", "Redis address") - dynamoAddress = flag.String("dynamoAddress", ":8000", "DynamoDB-compatible API address") - raftID = flag.String("raftId", "", "Raft ID") - raftDataDir = flag.String("raftDataDir", "/var/lib/elastickv", "Raft data directory") - raftBootstrap = flag.Bool("raftBootstrap", false, "Bootstrap cluster") - raftRedisMap = flag.String("raftRedisMap", "", "Map of Raft address to Redis address (raftAddr=redisAddr,...)") + address = flag.String("address", ":50051", "gRPC/Raft address") + redisAddress = flag.String("redisAddress", ":6379", "Redis address") + dynamoAddress = flag.String("dynamoAddress", ":8000", "DynamoDB-compatible API address") + metricsAddress = flag.String("metricsAddress", "127.0.0.1:9090", "Prometheus metrics address") + metricsToken = flag.String("metricsToken", "", "Bearer token for Prometheus metrics; required for non-loopback metricsAddress") + raftID = flag.String("raftId", "", "Raft ID") + raftDataDir = flag.String("raftDataDir", "/var/lib/elastickv", "Raft data directory") + raftBootstrap = flag.Bool("raftBootstrap", false, "Bootstrap cluster") + raftRedisMap = flag.String("raftRedisMap", "", "Map of Raft address to Redis address (raftAddr=redisAddr,...)") ) const ( @@ -48,6 +53,7 @@ const ( joinWait = 3 * time.Second joinRetryInterval = 1 * time.Second joinRPCTimeout = 3 * time.Second + raftObserveInterval = 5 * time.Second ) func init() { @@ -57,13 +63,15 @@ func init() { } type config struct { - address string - redisAddress string - dynamoAddress string - raftID string - raftDataDir string - raftBootstrap bool - raftRedisMap string + address string + redisAddress string + dynamoAddress string + metricsAddress string + metricsToken string + raftID string + raftDataDir string + raftBootstrap bool + raftRedisMap string } func main() { @@ -74,13 +82,15 @@ func main() { if *raftID != "" { // Single node mode cfg := config{ - address: *address, - redisAddress: *redisAddress, - dynamoAddress: *dynamoAddress, - raftID: *raftID, - raftDataDir: *raftDataDir, - raftBootstrap: *raftBootstrap, - raftRedisMap: *raftRedisMap, + address: *address, + redisAddress: *redisAddress, + dynamoAddress: *dynamoAddress, + metricsAddress: *metricsAddress, + metricsToken: *metricsToken, + raftID: *raftID, + raftDataDir: *raftDataDir, + raftBootstrap: *raftBootstrap, + raftRedisMap: *raftRedisMap, } if err := run(runCtx, eg, cfg); err != nil { slog.Error(err.Error()) @@ -89,30 +99,37 @@ func main() { } else { // Demo cluster mode (3 nodes) slog.Info("Starting demo cluster with 3 nodes...") + demoMetricsToken := effectiveDemoMetricsToken(*metricsToken) nodes := []config{ { - address: "127.0.0.1:50051", - redisAddress: "127.0.0.1:63791", - dynamoAddress: "127.0.0.1:63801", - raftID: "n1", - raftDataDir: "", // In-memory - raftBootstrap: true, + address: "127.0.0.1:50051", + redisAddress: "127.0.0.1:63791", + dynamoAddress: "127.0.0.1:63801", + metricsAddress: "0.0.0.0:9091", + metricsToken: demoMetricsToken, + raftID: "n1", + raftDataDir: "", // In-memory + raftBootstrap: true, }, { - address: "127.0.0.1:50052", - redisAddress: "127.0.0.1:63792", - dynamoAddress: "127.0.0.1:63802", - raftID: "n2", - raftDataDir: "", - raftBootstrap: false, + address: "127.0.0.1:50052", + redisAddress: "127.0.0.1:63792", + dynamoAddress: "127.0.0.1:63802", + metricsAddress: "0.0.0.0:9092", + metricsToken: demoMetricsToken, + raftID: "n2", + raftDataDir: "", + raftBootstrap: false, }, { - address: "127.0.0.1:50053", - redisAddress: "127.0.0.1:63793", - dynamoAddress: "127.0.0.1:63803", - raftID: "n3", - raftDataDir: "", - raftBootstrap: false, + address: "127.0.0.1:50053", + redisAddress: "127.0.0.1:63793", + dynamoAddress: "127.0.0.1:63803", + metricsAddress: "0.0.0.0:9093", + metricsToken: demoMetricsToken, + raftID: "n3", + raftDataDir: "", + raftBootstrap: false, }, } @@ -173,6 +190,14 @@ func main() { } } +func effectiveDemoMetricsToken(token string) string { + token = strings.TrimSpace(token) + if token != "" { + return token + } + return strings.Join([]string{"demo", "metrics", "token"}, "-") +} + func joinCluster(ctx context.Context, nodes []config) error { leader := nodes[0] // Give servers some time to start @@ -332,6 +357,8 @@ func setupRedis(ctx context.Context, lc net.ListenConfig, st store.MVCCStore, co func run(ctx context.Context, eg *errgroup.Group, cfg config) error { var lc net.ListenConfig + cleanup := internalutil.CleanupStack{} + defer cleanup.Run() ldb, sdb, fss, err := setupStorage(cfg.raftDataDir) if err != nil { @@ -360,20 +387,8 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error { return errors.WithStack(err) } - if cfg.raftBootstrap { - cfg := raft.Configuration{ - Servers: []raft.Server{ - { - Suffrage: raft.Voter, - ID: raft.ServerID(cfg.raftID), - Address: raft.ServerAddress(cfg.address), - }, - }, - } - f := r.BootstrapCluster(cfg) - if err := f.Error(); err != nil && !errors.Is(err, raft.ErrCantBootstrap) { - return errors.WithStack(err) - } + if err := bootstrapClusterIfNeeded(r, cfg); err != nil { + return err } trx := kv.NewTransaction(r) @@ -388,6 +403,8 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error { distCatalog, adapter.WithDistributionCoordinator(coordinator), ) + metricsRegistry := monitoring.NewRegistry(cfg.raftID, cfg.address) + metricsRegistry.RaftObserver().Start(ctx, []monitoring.RaftRuntime{{GroupID: 1, Raft: r}}, raftObserveInterval) s, grpcSvc := setupGRPC(r, st, tm, coordinator, distServer) @@ -395,16 +412,33 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error { if err != nil { return errors.WithStack(err) } + cleanup.Add(func() { + _ = grpcSock.Close() + }) rd, err := setupRedis(ctx, lc, st, coordinator, cfg.address, cfg.redisAddress, cfg.raftRedisMap) if err != nil { return err } + cleanup.Add(rd.Stop) dynamoL, err := lc.Listen(ctx, "tcp", cfg.dynamoAddress) if err != nil { return errors.WithStack(err) } - ds := adapter.NewDynamoDBServer(dynamoL, st, coordinator) + ds := adapter.NewDynamoDBServer( + dynamoL, + st, + coordinator, + adapter.WithDynamoDBRequestObserver(metricsRegistry.DynamoDBObserver()), + ) + cleanup.Add(ds.Stop) + metricsL, ms, err := setupMetricsHTTPServer(ctx, lc, cfg.metricsAddress, cfg.metricsToken, metricsRegistry.Handler()) + if err != nil { + return err + } + cleanup.Add(func() { + _ = metricsL.Close() + }) eg.Go(catalogWatcherTask(ctx, distCatalog, distEngine)) eg.Go(grpcShutdownTask(ctx, s, grpcSock, cfg.address, grpcSvc)) @@ -413,7 +447,41 @@ func run(ctx context.Context, eg *errgroup.Group, cfg config) error { eg.Go(redisServeTask(rd, cfg.redisAddress)) eg.Go(dynamoShutdownTask(ctx, ds, cfg.dynamoAddress)) eg.Go(dynamoServeTask(ds, cfg.dynamoAddress)) + eg.Go(monitoring.MetricsShutdownTask(ctx, ms, cfg.metricsAddress)) + eg.Go(monitoring.MetricsServeTask(ms, metricsL, cfg.metricsAddress)) + + cleanup.Release() + return nil +} + +func setupMetricsHTTPServer(ctx context.Context, lc net.ListenConfig, metricsAddress string, metricsToken string, handler http.Handler) (net.Listener, *http.Server, error) { + if monitoring.MetricsAddressRequiresToken(metricsAddress) && strings.TrimSpace(metricsToken) == "" { + return nil, nil, errors.New("metricsToken is required when metricsAddress is not loopback") + } + metricsL, err := lc.Listen(ctx, "tcp", metricsAddress) + if err != nil { + return nil, nil, errors.WithStack(err) + } + ms := monitoring.NewMetricsServer(handler, metricsToken) + return metricsL, ms, nil +} +func bootstrapClusterIfNeeded(r *raft.Raft, cfg config) error { + if !cfg.raftBootstrap { + return nil + } + bootstrapCfg := raft.Configuration{ + Servers: []raft.Server{ + { + Suffrage: raft.Voter, + ID: raft.ServerID(cfg.raftID), + Address: raft.ServerAddress(cfg.address), + }, + }, + } + if err := r.BootstrapCluster(bootstrapCfg).Error(); err != nil && !errors.Is(err, raft.ErrCantBootstrap) { + return errors.WithStack(err) + } return nil } diff --git a/cmd/server/demo_test.go b/cmd/server/demo_test.go new file mode 100644 index 00000000..edd92a98 --- /dev/null +++ b/cmd/server/demo_test.go @@ -0,0 +1,12 @@ +package main + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestEffectiveDemoMetricsToken(t *testing.T) { + require.Equal(t, "custom-token", effectiveDemoMetricsToken(" custom-token ")) + require.Equal(t, "demo-metrics-token", effectiveDemoMetricsToken("")) +} diff --git a/docs/docker_multinode_manual_run.md b/docs/docker_multinode_manual_run.md index 9c7a524b..96743b5c 100644 --- a/docs/docker_multinode_manual_run.md +++ b/docs/docker_multinode_manual_run.md @@ -17,6 +17,7 @@ Use private VM IPs for all bind addresses and lock down network access because g - 1 node = 1 VM - Total nodes: 4 or 5 - VMs must be able to reach each other over TCP (at minimum `50051/tcp`) +- Prometheus must be able to reach each node's metrics endpoint if you want centralized monitoring (`9090/tcp` in the examples below) - Docker Engine installed on every VM Example (5 nodes): @@ -83,6 +84,12 @@ Shared fixed bootstrap voters (5-node example): RAFT_BOOTSTRAP_MEMBERS="n1=10.0.0.11:50051,n2=10.0.0.12:50051,n3=10.0.0.13:50051,n4=10.0.0.14:50051,n5=10.0.0.15:50051" ``` +Shared metrics bearer token (required because the examples bind `--metricsAddress` to non-loopback VM IPs): + +```bash +ELASTICKV_METRICS_TOKEN="$(openssl rand -hex 32)" +``` + For a 4-node cluster, remove the `n5` entry from both variables. ## 4) Start Nodes with `docker run` @@ -92,6 +99,8 @@ This guide uses `--network host` and explicit VM private IPs. Binding guidance: - Set `--address`, `--redisAddress`, and `--dynamoAddress` to the VM private IP. +- Set `--metricsAddress` to the VM private IP if Prometheus scrapes from another host. +- Set `--metricsToken` to the same shared bearer token on every node whenever `--metricsAddress` is non-loopback. - Do not use `0.0.0.0` as the advertised address. - Do not use `localhost` for cluster communication. @@ -109,6 +118,8 @@ docker run -d \ --address "10.0.0.11:50051" \ --redisAddress "10.0.0.11:6379" \ --dynamoAddress "10.0.0.11:8000" \ + --metricsAddress "10.0.0.11:9090" \ + --metricsToken "${ELASTICKV_METRICS_TOKEN}" \ --raftId "n1" \ --raftDataDir "/var/lib/elastickv" \ --raftRedisMap "${RAFT_TO_REDIS_MAP}" \ @@ -130,6 +141,8 @@ docker run -d \ --address "10.0.0.12:50051" \ --redisAddress "10.0.0.12:6379" \ --dynamoAddress "10.0.0.12:8000" \ + --metricsAddress "10.0.0.12:9090" \ + --metricsToken "${ELASTICKV_METRICS_TOKEN}" \ --raftId "n2" \ --raftDataDir "/var/lib/elastickv" \ --raftRedisMap "${RAFT_TO_REDIS_MAP}" \ @@ -142,6 +155,7 @@ Start `n3` to `n5` the same way by replacing: - `--address` - `--redisAddress` - `--dynamoAddress` +- `--metricsAddress` - `--raftId` ## Startup Order and Quorum Caution @@ -203,6 +217,27 @@ redis-cli -h 10.0.0.11 -p 6379 SET health ok redis-cli -h 10.0.0.12 -p 6379 GET health ``` +## 6.5) Validate Metrics + +Check the local Prometheus endpoint on any node: + +```bash +curl -fsS -H "Authorization: Bearer ${ELASTICKV_METRICS_TOKEN}" \ + http://10.0.0.11:9090/metrics | grep '^elastickv_' +``` + +Prometheus must send the same bearer token when scraping: + +```yaml +scrape_configs: + - job_name: elastickv + authorization: + type: Bearer + credentials: ${ELASTICKV_METRICS_TOKEN} +``` + +Grafana/Prometheus provisioning examples are available under `monitoring/`. + ## 7) Fault Tolerance Drill With 5 nodes, the cluster should continue serving with up to 2 node failures. diff --git a/internal/cleanup.go b/internal/cleanup.go new file mode 100644 index 00000000..7930e292 --- /dev/null +++ b/internal/cleanup.go @@ -0,0 +1,26 @@ +package internal + +// CleanupStack stores cleanup callbacks and runs them in LIFO order. +type CleanupStack struct { + funcs []func() +} + +// Add registers a cleanup callback. +func (c *CleanupStack) Add(fn func()) { + if fn == nil { + return + } + c.funcs = append(c.funcs, fn) +} + +// Release discards all registered callbacks. +func (c *CleanupStack) Release() { + c.funcs = nil +} + +// Run executes registered callbacks in reverse registration order. +func (c *CleanupStack) Run() { + for i := len(c.funcs) - 1; i >= 0; i-- { + c.funcs[i]() + } +} diff --git a/internal/cleanup_test.go b/internal/cleanup_test.go new file mode 100644 index 00000000..f6e50d28 --- /dev/null +++ b/internal/cleanup_test.go @@ -0,0 +1,29 @@ +package internal + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCleanupStackRunUsesLIFO(t *testing.T) { + stack := CleanupStack{} + var calls []string + + stack.Add(func() { calls = append(calls, "first") }) + stack.Add(func() { calls = append(calls, "second") }) + stack.Run() + + require.Equal(t, []string{"second", "first"}, calls) +} + +func TestCleanupStackReleaseDropsCallbacks(t *testing.T) { + stack := CleanupStack{} + called := false + + stack.Add(func() { called = true }) + stack.Release() + stack.Run() + + require.False(t, called) +} diff --git a/main.go b/main.go index d7366e42..5a2a91ad 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "flag" "log" "net" + "net/http" "strings" "sync" "time" @@ -13,7 +14,9 @@ import ( "github.com/Jille/raftadmin" "github.com/bootjp/elastickv/adapter" "github.com/bootjp/elastickv/distribution" + internalutil "github.com/bootjp/elastickv/internal" "github.com/bootjp/elastickv/kv" + "github.com/bootjp/elastickv/monitoring" pb "github.com/bootjp/elastickv/proto" "github.com/bootjp/elastickv/store" "github.com/cockroachdb/errors" @@ -24,15 +27,18 @@ import ( ) const ( - heartbeatTimeout = 200 * time.Millisecond - electionTimeout = 2000 * time.Millisecond - leaderLease = 100 * time.Millisecond + heartbeatTimeout = 200 * time.Millisecond + electionTimeout = 2000 * time.Millisecond + leaderLease = 100 * time.Millisecond + raftMetricsObserveInterval = 5 * time.Second ) var ( myAddr = flag.String("address", "localhost:50051", "TCP host+port for this node") redisAddr = flag.String("redisAddress", "localhost:6379", "TCP host+port for redis") dynamoAddr = flag.String("dynamoAddress", "localhost:8000", "TCP host+port for DynamoDB-compatible API") + metricsAddr = flag.String("metricsAddress", "localhost:9090", "TCP host+port for Prometheus metrics") + metricsToken = flag.String("metricsToken", "", "Bearer token for Prometheus metrics; required for non-loopback metricsAddress") raftId = flag.String("raftId", "", "Node id used by Raft") raftDir = flag.String("raftDataDir", "data/", "Raft data dir") raftBootstrap = flag.Bool("raftBootstrap", false, "Whether to bootstrap the Raft cluster") @@ -72,16 +78,19 @@ func run() error { return err } + cleanup := internalutil.CleanupStack{} + defer cleanup.Run() + ctx, cancel := context.WithCancel(context.Background()) clock := kv.NewHLC() shardStore := kv.NewShardStore(cfg.engine, shardGroups) - defer func() { - cancel() + cleanup.Add(func() { _ = shardStore.Close() for _, rt := range runtimes { rt.Close() } - }() + }) + cleanup.Add(cancel) coordinate := kv.NewShardedCoordinator(cfg.engine, shardGroups, cfg.defaultGroup, clock, shardStore) distCatalog, err := setupDistributionCatalog(ctx, runtimes, cfg.engine) if err != nil { @@ -96,19 +105,24 @@ func run() error { distCatalog, adapter.WithDistributionCoordinator(coordinate), ) + metricsRegistry := monitoring.NewRegistry(*raftId, *myAddr) + metricsRegistry.RaftObserver().Start(runCtx, raftMonitorRuntimes(runtimes), raftMetricsObserveInterval) runner := runtimeServerRunner{ - ctx: runCtx, - lc: &lc, - eg: eg, - cancel: cancel, - runtimes: runtimes, - shardStore: shardStore, - coordinate: coordinate, - distServer: distServer, - redisAddress: *redisAddr, - leaderRedis: cfg.leaderRedis, - dynamoAddress: *dynamoAddr, + ctx: runCtx, + lc: &lc, + eg: eg, + cancel: cancel, + runtimes: runtimes, + shardStore: shardStore, + coordinate: coordinate, + distServer: distServer, + redisAddress: *redisAddr, + leaderRedis: cfg.leaderRedis, + dynamoAddress: *dynamoAddr, + metricsAddress: *metricsAddr, + metricsToken: *metricsToken, + metricsRegistry: metricsRegistry, } if err := runner.start(); err != nil { return err @@ -256,6 +270,20 @@ func buildShardGroups(raftID string, raftDir string, groups []groupSpec, multi b return runtimes, shardGroups, nil } +func raftMonitorRuntimes(runtimes []*raftGroupRuntime) []monitoring.RaftRuntime { + out := make([]monitoring.RaftRuntime, 0, len(runtimes)) + for _, runtime := range runtimes { + if runtime == nil || runtime.raft == nil { + continue + } + out = append(out, monitoring.RaftRuntime{ + GroupID: runtime.spec.id, + Raft: runtime.raft, + }) + } + return out +} + func startRaftServers(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, runtimes []*raftGroupRuntime, shardStore *kv.ShardStore, coordinate kv.Coordinator, distServer *adapter.DistributionServer) error { for _, rt := range runtimes { gs := grpc.NewServer() @@ -330,12 +358,17 @@ func startRedisServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Gr return nil } -func startDynamoDBServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, dynamoAddr string, shardStore *kv.ShardStore, coordinate kv.Coordinator) error { +func startDynamoDBServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, dynamoAddr string, shardStore *kv.ShardStore, coordinate kv.Coordinator, metricsRegistry *monitoring.Registry) error { dynamoL, err := lc.Listen(ctx, "tcp", dynamoAddr) if err != nil { return errors.Wrapf(err, "failed to listen on %s", dynamoAddr) } - dynamoServer := adapter.NewDynamoDBServer(dynamoL, shardStore, coordinate) + dynamoServer := adapter.NewDynamoDBServer( + dynamoL, + shardStore, + coordinate, + adapter.WithDynamoDBRequestObserver(metricsRegistry.DynamoDBObserver()), + ) eg.Go(func() error { defer dynamoServer.Stop() stop := make(chan struct{}) @@ -356,6 +389,23 @@ func startDynamoDBServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup return nil } +func startMetricsServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, metricsAddr string, metricsToken string, handler http.Handler) error { + if strings.TrimSpace(metricsAddr) == "" || handler == nil { + return nil + } + if monitoring.MetricsAddressRequiresToken(metricsAddr) && strings.TrimSpace(metricsToken) == "" { + return errors.New("metricsToken is required when metricsAddress is not loopback") + } + metricsL, err := lc.Listen(ctx, "tcp", metricsAddr) + if err != nil { + return errors.Wrapf(err, "failed to listen on %s", metricsAddr) + } + metricsServer := monitoring.NewMetricsServer(handler, metricsToken) + eg.Go(monitoring.MetricsShutdownTask(ctx, metricsServer, metricsAddr)) + eg.Go(monitoring.MetricsServeTask(metricsServer, metricsL, metricsAddr)) + return nil +} + func distributionCatalogStoreForGroup(runtimes []*raftGroupRuntime, groupID uint64) *distribution.CatalogStore { for _, rt := range runtimes { if rt == nil || rt.store == nil { @@ -421,17 +471,20 @@ func waitErrgroupAfterStartupFailure(cancel context.CancelFunc, eg *errgroup.Gro } type runtimeServerRunner struct { - ctx context.Context - lc *net.ListenConfig - eg *errgroup.Group - cancel context.CancelFunc - runtimes []*raftGroupRuntime - shardStore *kv.ShardStore - coordinate kv.Coordinator - distServer *adapter.DistributionServer - redisAddress string - leaderRedis map[raft.ServerAddress]string - dynamoAddress string + ctx context.Context + lc *net.ListenConfig + eg *errgroup.Group + cancel context.CancelFunc + runtimes []*raftGroupRuntime + shardStore *kv.ShardStore + coordinate kv.Coordinator + distServer *adapter.DistributionServer + redisAddress string + leaderRedis map[raft.ServerAddress]string + dynamoAddress string + metricsAddress string + metricsToken string + metricsRegistry *monitoring.Registry } func (r runtimeServerRunner) start() error { @@ -441,7 +494,10 @@ func (r runtimeServerRunner) start() error { if err := startRedisServer(r.ctx, r.lc, r.eg, r.redisAddress, r.shardStore, r.coordinate, r.leaderRedis); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } - if err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate); err != nil { + if err := startDynamoDBServer(r.ctx, r.lc, r.eg, r.dynamoAddress, r.shardStore, r.coordinate, r.metricsRegistry); err != nil { + return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) + } + if err := startMetricsServer(r.ctx, r.lc, r.eg, r.metricsAddress, r.metricsToken, r.metricsRegistry.Handler()); err != nil { return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) } return nil diff --git a/monitoring/docker-compose.yml b/monitoring/docker-compose.yml new file mode 100644 index 00000000..c3805b0f --- /dev/null +++ b/monitoring/docker-compose.yml @@ -0,0 +1,23 @@ +services: + prometheus: + image: prom/prometheus:v3.5.0 + command: + - --config.file=/etc/prometheus/prometheus.yml + - --storage.tsdb.path=/prometheus + ports: + - "9095:9090" + volumes: + - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml:ro + extra_hosts: + - "host.docker.internal:host-gateway" + + grafana: + image: grafana/grafana:12.2.0 + depends_on: + - prometheus + ports: + - "3000:3000" + volumes: + - ./grafana/provisioning/datasources:/etc/grafana/provisioning/datasources:ro + - ./grafana/provisioning/dashboards:/etc/grafana/provisioning/dashboards:ro + - ./grafana/dashboards:/var/lib/grafana/dashboards:ro diff --git a/monitoring/dynamodb.go b/monitoring/dynamodb.go new file mode 100644 index 00000000..502ea72b --- /dev/null +++ b/monitoring/dynamodb.go @@ -0,0 +1,344 @@ +package monitoring + +import ( + "net/http" + "slices" + "strings" + "sync" + "time" + + "github.com/prometheus/client_golang/prometheus" +) + +const ( + dynamoOutcomeSuccess = "success" + dynamoOutcomeUserError = "user_error" + dynamoOutcomeSystemError = "system_error" + dynamoOutcomeConditionalCheckFailed = "conditional_check_failed" + dynamoOperationUnknown = "unknown" + dynamoTableOverflow = "_other" + dynamoMaxTrackedTables = 512 +) + +var dynamoOperations = []string{ + "BatchWriteItem", + "CreateTable", + "DeleteItem", + "DeleteTable", + "DescribeTable", + "GetItem", + "ListTables", + "PutItem", + "Query", + "Scan", + "TransactWriteItems", + "UpdateItem", +} + +// DynamoDBRequestObserver records per-request DynamoDB-compatible API metrics. +type DynamoDBRequestObserver interface { + ObserveInFlightChange(operation string, delta float64) + ObserveDynamoDBRequest(report DynamoDBRequestReport) +} + +// DynamoDBRequestReport is the normalized result of a single request. +type DynamoDBRequestReport struct { + Operation string + HTTPStatus int + ErrorType string + Duration time.Duration + RequestBytes int + ResponseBytes int + Tables []string + TableMetrics map[string]DynamoDBTableMetrics +} + +// DynamoDBTableMetrics captures table-scoped work done by a request. +type DynamoDBTableMetrics struct { + ReturnedItems int + ScannedItems int + WrittenItems int +} + +type DynamoDBMetrics struct { + inflightRequests *prometheus.GaugeVec + requestsTotal *prometheus.CounterVec + tableRequests *prometheus.CounterVec + requestDuration *prometheus.HistogramVec + requestSize *prometheus.HistogramVec + responseSize *prometheus.HistogramVec + returnedItems *prometheus.CounterVec + scannedItems *prometheus.CounterVec + writtenItems *prometheus.CounterVec + userErrors *prometheus.CounterVec + systemErrors *prometheus.CounterVec + conditionalFails *prometheus.CounterVec + + mu sync.Mutex + trackedTables map[string]struct{} +} + +func newDynamoDBMetrics(registerer prometheus.Registerer) *DynamoDBMetrics { + m := &DynamoDBMetrics{ + inflightRequests: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_dynamodb_inflight_requests", + Help: "Current number of in-flight DynamoDB-compatible API requests.", + }, + []string{"operation"}, + ), + requestsTotal: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "elastickv_dynamodb_requests_total", + Help: "Total number of DynamoDB-compatible API requests by operation and outcome.", + }, + []string{"operation", "outcome"}, + ), + tableRequests: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "elastickv_dynamodb_table_requests_total", + Help: "Total number of table-scoped DynamoDB-compatible API requests by operation and outcome.", + }, + []string{"operation", "table", "outcome"}, + ), + requestDuration: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "elastickv_dynamodb_request_duration_seconds", + Help: "End-to-end latency of DynamoDB-compatible API requests.", + Buckets: []float64{0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1, 2, 5}, + }, + []string{"operation", "outcome"}, + ), + requestSize: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "elastickv_dynamodb_request_size_bytes", + Help: "Observed DynamoDB-compatible request sizes in bytes.", + Buckets: []float64{128, 512, 1024, 4 * 1024, 16 * 1024, 64 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024}, + }, + []string{"operation"}, + ), + responseSize: prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "elastickv_dynamodb_response_size_bytes", + Help: "Observed DynamoDB-compatible response sizes in bytes.", + Buckets: []float64{128, 512, 1024, 4 * 1024, 16 * 1024, 64 * 1024, 256 * 1024, 512 * 1024, 1024 * 1024}, + }, + []string{"operation", "outcome"}, + ), + returnedItems: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "elastickv_dynamodb_returned_items_total", + Help: "Total number of items returned by DynamoDB-compatible read APIs.", + }, + []string{"operation", "table"}, + ), + scannedItems: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "elastickv_dynamodb_scanned_items_total", + Help: "Total number of items scanned by DynamoDB-compatible read APIs.", + }, + []string{"operation", "table"}, + ), + writtenItems: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "elastickv_dynamodb_written_items_total", + Help: "Total number of items written or deleted by DynamoDB-compatible write APIs.", + }, + []string{"operation", "table"}, + ), + userErrors: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "elastickv_dynamodb_user_errors_total", + Help: "Total number of client-caused DynamoDB-compatible API errors.", + }, + []string{"operation", "table"}, + ), + systemErrors: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "elastickv_dynamodb_system_errors_total", + Help: "Total number of server-side DynamoDB-compatible API errors.", + }, + []string{"operation", "table"}, + ), + conditionalFails: prometheus.NewCounterVec( + prometheus.CounterOpts{ + Name: "elastickv_dynamodb_conditional_check_failed_total", + Help: "Total number of conditional check failures returned by DynamoDB-compatible APIs.", + }, + []string{"operation", "table"}, + ), + trackedTables: make(map[string]struct{}), + } + + registerer.MustRegister( + m.inflightRequests, + m.requestsTotal, + m.tableRequests, + m.requestDuration, + m.requestSize, + m.responseSize, + m.returnedItems, + m.scannedItems, + m.writtenItems, + m.userErrors, + m.systemErrors, + m.conditionalFails, + ) + + return m +} + +// ObserveInFlightChange adjusts the in-flight request gauge for an operation. +func (m *DynamoDBMetrics) ObserveInFlightChange(operation string, delta float64) { + if m == nil { + return + } + operation = normalizeDynamoOperation(operation) + m.inflightRequests.WithLabelValues(operation).Add(delta) +} + +// ObserveDynamoDBRequest records the final outcome of a request. +func (m *DynamoDBMetrics) ObserveDynamoDBRequest(report DynamoDBRequestReport) { + if m == nil { + return + } + + operation := normalizeDynamoOperation(report.Operation) + outcome := classifyDynamoOutcome(report.HTTPStatus, report.ErrorType) + tableMetrics := report.TableMetrics + if tableMetrics == nil { + tableMetrics = map[string]DynamoDBTableMetrics{} + } + tables, tableMetrics := m.normalizeReportTables(report.Tables, tableMetrics) + + m.observeRequest(operation, outcome, report) + m.observeTables(operation, outcome, tables, tableMetrics) +} + +func normalizeDynamoOperation(operation string) string { + operation = strings.TrimSpace(operation) + if operation == "" { + return dynamoOperationUnknown + } + if !slices.Contains(dynamoOperations, operation) { + return dynamoOperationUnknown + } + return operation +} + +func (m *DynamoDBMetrics) observeRequest(operation string, outcome string, report DynamoDBRequestReport) { + m.requestsTotal.WithLabelValues(operation, outcome).Inc() + m.requestDuration.WithLabelValues(operation, outcome).Observe(report.Duration.Seconds()) + m.requestSize.WithLabelValues(operation).Observe(float64(max(report.RequestBytes, 0))) + m.responseSize.WithLabelValues(operation, outcome).Observe(float64(max(report.ResponseBytes, 0))) +} + +func (m *DynamoDBMetrics) observeTables(operation string, outcome string, tables []string, tableMetrics map[string]DynamoDBTableMetrics) { + for _, table := range tables { + m.tableRequests.WithLabelValues(operation, table, outcome).Inc() + m.observeTableActivity(operation, table, tableMetrics[table]) + m.observeTableOutcome(operation, table, outcome) + } +} + +func (m *DynamoDBMetrics) normalizeReportTables(tables []string, tableMetrics map[string]DynamoDBTableMetrics) ([]string, map[string]DynamoDBTableMetrics) { + if m == nil { + return nil, nil + } + normalized := make([]string, 0, len(tables)+len(tableMetrics)) + seen := make(map[string]struct{}, len(tables)+len(tableMetrics)) + aggregated := make(map[string]DynamoDBTableMetrics, len(tableMetrics)) + + addTable := func(table string) string { + table = m.normalizeTable(table) + if table == "" { + return "" + } + if _, ok := seen[table]; ok { + return table + } + seen[table] = struct{}{} + normalized = append(normalized, table) + return table + } + + for _, table := range tables { + addTable(table) + } + for table, metrics := range tableMetrics { + table = addTable(table) + if table == "" { + continue + } + current := aggregated[table] + current.ReturnedItems += metrics.ReturnedItems + current.ScannedItems += metrics.ScannedItems + current.WrittenItems += metrics.WrittenItems + aggregated[table] = current + } + + slices.Sort(normalized) + return normalized, aggregated +} + +func (m *DynamoDBMetrics) normalizeTable(table string) string { + if m == nil { + return "" + } + table = strings.TrimSpace(table) + if table == "" { + return "" + } + if table == dynamoTableOverflow { + return dynamoTableOverflow + } + + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.trackedTables[table]; ok { + return table + } + if len(m.trackedTables) >= dynamoMaxTrackedTables { + return dynamoTableOverflow + } + m.trackedTables[table] = struct{}{} + return table +} + +func (m *DynamoDBMetrics) observeTableActivity(operation string, table string, metrics DynamoDBTableMetrics) { + if metrics.ReturnedItems > 0 { + m.returnedItems.WithLabelValues(operation, table).Add(float64(metrics.ReturnedItems)) + } + if metrics.ScannedItems > 0 { + m.scannedItems.WithLabelValues(operation, table).Add(float64(metrics.ScannedItems)) + } + if metrics.WrittenItems > 0 { + m.writtenItems.WithLabelValues(operation, table).Add(float64(metrics.WrittenItems)) + } +} + +func (m *DynamoDBMetrics) observeTableOutcome(operation string, table string, outcome string) { + switch outcome { + case dynamoOutcomeConditionalCheckFailed: + m.conditionalFails.WithLabelValues(operation, table).Inc() + case dynamoOutcomeUserError: + m.userErrors.WithLabelValues(operation, table).Inc() + case dynamoOutcomeSystemError: + m.systemErrors.WithLabelValues(operation, table).Inc() + } +} + +func classifyDynamoOutcome(status int, errorType string) string { + if strings.EqualFold(strings.TrimSpace(errorType), "ConditionalCheckFailedException") { + return dynamoOutcomeConditionalCheckFailed + } + switch { + case status >= 200 && status < 300: + return dynamoOutcomeSuccess + case status >= http.StatusInternalServerError: + return dynamoOutcomeSystemError + default: + return dynamoOutcomeUserError + } +} diff --git a/monitoring/dynamodb_test.go b/monitoring/dynamodb_test.go new file mode 100644 index 00000000..27501691 --- /dev/null +++ b/monitoring/dynamodb_test.go @@ -0,0 +1,135 @@ +package monitoring + +import ( + "net/http" + "strconv" + "strings" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func TestDynamoDBMetricsObserveRequest(t *testing.T) { + registry := NewRegistry("n1", "10.0.0.1:50051") + metrics, ok := registry.DynamoDBObserver().(*DynamoDBMetrics) + require.True(t, ok) + + metrics.ObserveInFlightChange("PutItem", 1) + metrics.ObserveInFlightChange("PutItem", -1) + metrics.ObserveDynamoDBRequest(DynamoDBRequestReport{ + Operation: "PutItem", + HTTPStatus: 200, + Duration: 12 * time.Millisecond, + RequestBytes: 256, + ResponseBytes: 64, + Tables: []string{"orders"}, + TableMetrics: map[string]DynamoDBTableMetrics{ + "orders": {WrittenItems: 1}, + }, + }) + + err := testutil.GatherAndCompare( + registry.Gatherer(), + strings.NewReader(` +# HELP elastickv_dynamodb_requests_total Total number of DynamoDB-compatible API requests by operation and outcome. +# TYPE elastickv_dynamodb_requests_total counter +elastickv_dynamodb_requests_total{node_address="10.0.0.1:50051",node_id="n1",operation="PutItem",outcome="success"} 1 +# HELP elastickv_dynamodb_table_requests_total Total number of table-scoped DynamoDB-compatible API requests by operation and outcome. +# TYPE elastickv_dynamodb_table_requests_total counter +elastickv_dynamodb_table_requests_total{node_address="10.0.0.1:50051",node_id="n1",operation="PutItem",outcome="success",table="orders"} 1 +# HELP elastickv_dynamodb_written_items_total Total number of items written or deleted by DynamoDB-compatible write APIs. +# TYPE elastickv_dynamodb_written_items_total counter +elastickv_dynamodb_written_items_total{node_address="10.0.0.1:50051",node_id="n1",operation="PutItem",table="orders"} 1 +`), + "elastickv_dynamodb_requests_total", + "elastickv_dynamodb_table_requests_total", + "elastickv_dynamodb_written_items_total", + ) + require.NoError(t, err) +} + +func TestDynamoDBMetricsClassifyConditionalFailure(t *testing.T) { + registry := NewRegistry("n1", "10.0.0.1:50051") + metrics, ok := registry.DynamoDBObserver().(*DynamoDBMetrics) + require.True(t, ok) + + metrics.ObserveDynamoDBRequest(DynamoDBRequestReport{ + Operation: "UpdateItem", + HTTPStatus: 400, + ErrorType: "ConditionalCheckFailedException", + Duration: time.Millisecond, + Tables: []string{"orders"}, + }) + + err := testutil.GatherAndCompare( + registry.Gatherer(), + strings.NewReader(` +# HELP elastickv_dynamodb_conditional_check_failed_total Total number of conditional check failures returned by DynamoDB-compatible APIs. +# TYPE elastickv_dynamodb_conditional_check_failed_total counter +elastickv_dynamodb_conditional_check_failed_total{node_address="10.0.0.1:50051",node_id="n1",operation="UpdateItem",table="orders"} 1 +`), + "elastickv_dynamodb_conditional_check_failed_total", + ) + require.NoError(t, err) +} + +func TestDynamoDBMetricsNormalizesUnknownOperation(t *testing.T) { + registry := NewRegistry("n1", "10.0.0.1:50051") + metrics, ok := registry.DynamoDBObserver().(*DynamoDBMetrics) + require.True(t, ok) + + metrics.ObserveInFlightChange("InjectedOperation", 1) + metrics.ObserveInFlightChange("InjectedOperation", -1) + metrics.ObserveDynamoDBRequest(DynamoDBRequestReport{ + Operation: "InjectedOperation", + HTTPStatus: 200, + Duration: time.Millisecond, + }) + + err := testutil.GatherAndCompare( + registry.Gatherer(), + strings.NewReader(` +# HELP elastickv_dynamodb_requests_total Total number of DynamoDB-compatible API requests by operation and outcome. +# TYPE elastickv_dynamodb_requests_total counter +elastickv_dynamodb_requests_total{node_address="10.0.0.1:50051",node_id="n1",operation="unknown",outcome="success"} 1 +`), + "elastickv_dynamodb_requests_total", + ) + require.NoError(t, err) +} + +func TestDynamoDBMetricsCollapsesOverflowTables(t *testing.T) { + registry := NewRegistry("n1", "10.0.0.1:50051") + metrics, ok := registry.DynamoDBObserver().(*DynamoDBMetrics) + require.True(t, ok) + + for i := 0; i < dynamoMaxTrackedTables; i++ { + metrics.trackedTables["table-"+strconv.Itoa(i)] = struct{}{} + } + + metrics.ObserveDynamoDBRequest(DynamoDBRequestReport{ + Operation: "PutItem", + HTTPStatus: http.StatusOK, + Tables: []string{"orders-overflow"}, + TableMetrics: map[string]DynamoDBTableMetrics{ + "orders-overflow": {WrittenItems: 2}, + }, + }) + + err := testutil.GatherAndCompare( + registry.Gatherer(), + strings.NewReader(` +# HELP elastickv_dynamodb_table_requests_total Total number of table-scoped DynamoDB-compatible API requests by operation and outcome. +# TYPE elastickv_dynamodb_table_requests_total counter +elastickv_dynamodb_table_requests_total{node_address="10.0.0.1:50051",node_id="n1",operation="PutItem",outcome="success",table="_other"} 1 +# HELP elastickv_dynamodb_written_items_total Total number of items written or deleted by DynamoDB-compatible write APIs. +# TYPE elastickv_dynamodb_written_items_total counter +elastickv_dynamodb_written_items_total{node_address="10.0.0.1:50051",node_id="n1",operation="PutItem",table="_other"} 2 +`), + "elastickv_dynamodb_table_requests_total", + "elastickv_dynamodb_written_items_total", + ) + require.NoError(t, err) +} diff --git a/monitoring/grafana/dashboards/elastickv-cluster-overview.json b/monitoring/grafana/dashboards/elastickv-cluster-overview.json new file mode 100644 index 00000000..f74b0e02 --- /dev/null +++ b/monitoring/grafana/dashboards/elastickv-cluster-overview.json @@ -0,0 +1,733 @@ +{ + "annotations": { + "list": [ + { + "builtIn": 1, + "datasource": { + "type": "grafana", + "uid": "-- Grafana --" + }, + "enable": true, + "hide": true, + "iconColor": "rgba(0, 211, 255, 1)", + "name": "Annotations & Alerts", + "type": "dashboard" + } + ] + }, + "editable": true, + "fiscalYearStartMonth": 0, + "graphTooltip": 0, + "id": null, + "links": [], + "panels": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 0 + }, + "id": 1, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum(rate(elastickv_dynamodb_requests_total{job=\"elastickv\"}[5m]))", + "instant": false, + "legendFormat": "requests/s", + "range": true, + "refId": "A" + } + ], + "title": "Request Rate", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "orange", + "value": 0.1 + }, + { + "color": "red", + "value": 1 + } + ] + }, + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 6, + "y": 0 + }, + "id": 2, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum(rate(elastickv_dynamodb_requests_total{job=\"elastickv\",outcome!=\"success\"}[5m]))", + "instant": false, + "legendFormat": "errors/s", + "range": true, + "refId": "A" + } + ], + "title": "Error Rate", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "orange", + "value": 0.05 + }, + { + "color": "red", + "value": 0.25 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 12, + "y": 0 + }, + "id": 3, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "histogram_quantile(0.95, sum by (le) (rate(elastickv_dynamodb_request_duration_seconds_bucket{job=\"elastickv\"}[5m])))", + "instant": false, + "legendFormat": "p95", + "range": true, + "refId": "A" + } + ], + "title": "P95 Latency", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "thresholds" + }, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 18, + "y": 0 + }, + "id": 4, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum(elastickv_dynamodb_inflight_requests{job=\"elastickv\"})", + "instant": false, + "legendFormat": "in-flight", + "range": true, + "refId": "A" + } + ], + "title": "In-flight Requests", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 4 + }, + "id": 5, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum by (operation) (rate(elastickv_dynamodb_requests_total{job=\"elastickv\"}[5m]))", + "legendFormat": "{{operation}}", + "range": true, + "refId": "A" + } + ], + "title": "Requests by Operation", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "unit": "reqps" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 4 + }, + "id": 6, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum by (outcome) (rate(elastickv_dynamodb_requests_total{job=\"elastickv\",outcome!=\"success\"}[5m]))", + "legendFormat": "{{outcome}}", + "range": true, + "refId": "A" + } + ], + "title": "Error Split by Outcome", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "unit": "ops" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 12 + }, + "id": 7, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum by (table) (rate(elastickv_dynamodb_returned_items_total{job=\"elastickv\"}[5m]))", + "legendFormat": "returned {{table}}", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum by (table) (rate(elastickv_dynamodb_scanned_items_total{job=\"elastickv\"}[5m]))", + "legendFormat": "scanned {{table}}", + "range": true, + "refId": "B" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "sum by (table) (rate(elastickv_dynamodb_written_items_total{job=\"elastickv\"}[5m]))", + "legendFormat": "written {{table}}", + "range": true, + "refId": "C" + } + ], + "title": "Per-table Item Activity", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 12 + }, + "id": 8, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "max by (group, node_id) (elastickv_raft_commit_index{job=\"elastickv\"})", + "legendFormat": "commit g{{group}} {{node_id}}", + "range": true, + "refId": "A" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "max by (group, node_id) (elastickv_raft_applied_index{job=\"elastickv\"})", + "legendFormat": "applied g{{group}} {{node_id}}", + "range": true, + "refId": "B" + } + ], + "title": "Raft Index Progress", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "custom": { + "align": "auto", + "cellOptions": { + "type": "auto" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 0, + "y": 20 + }, + "id": 9, + "options": { + "cellHeight": "sm", + "footer": { + "countRows": false, + "fields": "", + "reducer": [ + "sum" + ], + "show": false + }, + "showHeader": true + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "max by (group, leader_id, leader_address) (elastickv_raft_leader_identity{job=\"elastickv\"})", + "format": "table", + "instant": true, + "range": false, + "refId": "A" + } + ], + "title": "Current Leader per Group", + "type": "table" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "custom": { + "align": "auto", + "cellOptions": { + "type": "auto" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 8, + "y": 20 + }, + "id": 10, + "options": { + "cellHeight": "sm", + "footer": { + "countRows": false, + "fields": "", + "reducer": [ + "sum" + ], + "show": false + }, + "showHeader": true + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "max by (group, member_id, member_address, suffrage) (elastickv_raft_member_present{job=\"elastickv\"})", + "format": "table", + "instant": true, + "range": false, + "refId": "A" + } + ], + "title": "Current Members", + "type": "table" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "custom": { + "align": "auto", + "cellOptions": { + "type": "auto" + } + }, + "mappings": [ + { + "options": { + "0": { + "text": "Follower" + }, + "1": { + "text": "Leader" + } + }, + "type": "value" + } + ], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 16, + "y": 20 + }, + "id": 11, + "options": { + "cellHeight": "sm", + "footer": { + "countRows": false, + "fields": "", + "reducer": [ + "sum" + ], + "show": false + }, + "showHeader": true + }, + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "editorMode": "code", + "expr": "max by (group, member_id, member_address) (elastickv_raft_member_is_leader{job=\"elastickv\"})", + "format": "table", + "instant": true, + "range": false, + "refId": "A" + } + ], + "title": "Leader Flag per Member", + "type": "table" + } + ], + "refresh": "10s", + "schemaVersion": 41, + "style": "dark", + "tags": [ + "elastickv", + "raft", + "dynamodb" + ], + "templating": { + "list": [] + }, + "time": { + "from": "now-6h", + "to": "now" + }, + "timepicker": {}, + "timezone": "", + "title": "Elastickv Cluster Overview", + "uid": "elastickv-cluster", + "version": 1, + "weekStart": "" +} diff --git a/monitoring/grafana/provisioning/dashboards/dashboards.yml b/monitoring/grafana/provisioning/dashboards/dashboards.yml new file mode 100644 index 00000000..bd102f31 --- /dev/null +++ b/monitoring/grafana/provisioning/dashboards/dashboards.yml @@ -0,0 +1,11 @@ +apiVersion: 1 + +providers: + - name: Elastickv + orgId: 1 + folder: Elastickv + type: file + disableDeletion: false + updateIntervalSeconds: 10 + options: + path: /var/lib/grafana/dashboards diff --git a/monitoring/grafana/provisioning/datasources/prometheus.yml b/monitoring/grafana/provisioning/datasources/prometheus.yml new file mode 100644 index 00000000..00f99157 --- /dev/null +++ b/monitoring/grafana/provisioning/datasources/prometheus.yml @@ -0,0 +1,10 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + uid: prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true + editable: false diff --git a/monitoring/http.go b/monitoring/http.go new file mode 100644 index 00000000..3738f6f5 --- /dev/null +++ b/monitoring/http.go @@ -0,0 +1,99 @@ +package monitoring + +import ( + "context" + "crypto/subtle" + "log/slog" + "net" + "net/http" + "strings" + "time" + + "github.com/cockroachdb/errors" +) + +const ( + metricsReadHeaderTimeout = time.Second + metricsShutdownTimeout = 5 * time.Second +) + +// MetricsAddressRequiresToken reports whether the metrics endpoint is exposed beyond loopback. +func MetricsAddressRequiresToken(addr string) bool { + host, _, err := net.SplitHostPort(strings.TrimSpace(addr)) + if err != nil { + return true + } + host = strings.TrimSpace(host) + if host == "" || host == "0.0.0.0" || host == "::" { + return true + } + if strings.EqualFold(host, "localhost") { + return false + } + ip := net.ParseIP(host) + return ip == nil || !ip.IsLoopback() +} + +// ProtectHandler wraps a metrics handler with optional bearer-token authentication. +func ProtectHandler(handler http.Handler, bearerToken string) http.Handler { + if handler == nil { + return nil + } + token := strings.TrimSpace(bearerToken) + if token == "" { + return handler + } + expected := "Bearer " + token + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if subtle.ConstantTimeCompare([]byte(strings.TrimSpace(r.Header.Get("Authorization"))), []byte(expected)) != 1 { + w.Header().Set("WWW-Authenticate", "Bearer") + http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized) + return + } + handler.ServeHTTP(w, r) + }) +} + +// NewMetricsServer constructs an HTTP server for the protected metrics handler. +func NewMetricsServer(handler http.Handler, bearerToken string) *http.Server { + if handler == nil { + return nil + } + return &http.Server{ + Handler: ProtectHandler(handler, bearerToken), + ReadHeaderTimeout: metricsReadHeaderTimeout, + } +} + +// MetricsShutdownTask returns an errgroup task that stops the metrics server on context cancellation. +func MetricsShutdownTask(ctx context.Context, server *http.Server, address string) func() error { + return func() error { + if server == nil { + return nil + } + <-ctx.Done() + slog.Info("Shutting down metrics server", "address", address, "reason", ctx.Err()) + shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), metricsShutdownTimeout) + defer cancel() + err := server.Shutdown(shutdownCtx) + if err == nil || errors.Is(err, http.ErrServerClosed) || errors.Is(err, net.ErrClosed) { + return nil + } + return errors.WithStack(err) + } +} + +// MetricsServeTask returns an errgroup task that serves the metrics endpoint until shutdown. +func MetricsServeTask(server *http.Server, listener net.Listener, address string) func() error { + return func() error { + if server == nil || listener == nil { + return nil + } + slog.Info("Starting metrics server", "address", address) + err := server.Serve(listener) + if err == nil || errors.Is(err, http.ErrServerClosed) || errors.Is(err, net.ErrClosed) { + return nil + } + return errors.WithStack(err) + } +} diff --git a/monitoring/http_test.go b/monitoring/http_test.go new file mode 100644 index 00000000..dcc83996 --- /dev/null +++ b/monitoring/http_test.go @@ -0,0 +1,36 @@ +package monitoring + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMetricsAddressRequiresToken(t *testing.T) { + require.False(t, MetricsAddressRequiresToken("localhost:9090")) + require.False(t, MetricsAddressRequiresToken("127.0.0.1:9090")) + require.False(t, MetricsAddressRequiresToken("[::1]:9090")) + require.True(t, MetricsAddressRequiresToken(":9090")) + require.True(t, MetricsAddressRequiresToken("0.0.0.0:9090")) + require.True(t, MetricsAddressRequiresToken("10.0.0.1:9090")) +} + +func TestProtectHandlerRequiresBearerToken(t *testing.T) { + protected := ProtectHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNoContent) + }), "secret-token") + + req := httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/metrics", nil) + rec := httptest.NewRecorder() + protected.ServeHTTP(rec, req) + require.Equal(t, http.StatusUnauthorized, rec.Code) + + req = httptest.NewRequestWithContext(context.Background(), http.MethodGet, "/metrics", nil) + req.Header.Set("Authorization", "Bearer secret-token") + rec = httptest.NewRecorder() + protected.ServeHTTP(rec, req) + require.Equal(t, http.StatusNoContent, rec.Code) +} diff --git a/monitoring/prometheus/prometheus.yml b/monitoring/prometheus/prometheus.yml new file mode 100644 index 00000000..2c51086d --- /dev/null +++ b/monitoring/prometheus/prometheus.yml @@ -0,0 +1,14 @@ +global: + scrape_interval: 5s + evaluation_interval: 5s + +scrape_configs: + - job_name: elastickv + authorization: + type: Bearer + credentials: demo-metrics-token + static_configs: + - targets: + - host.docker.internal:9091 + - host.docker.internal:9092 + - host.docker.internal:9093 diff --git a/monitoring/raft.go b/monitoring/raft.go new file mode 100644 index 00000000..ebdf7357 --- /dev/null +++ b/monitoring/raft.go @@ -0,0 +1,322 @@ +package monitoring + +import ( + "context" + "fmt" + "log/slog" + "strconv" + "strings" + "sync" + "time" + + "github.com/hashicorp/raft" + "github.com/prometheus/client_golang/prometheus" +) + +var raftStates = []string{ + "follower", + "candidate", + "leader", + "shutdown", + "unknown", +} + +var loggedLastContactParseValues sync.Map + +const ( + defaultObserveInterval = 5 * time.Second + lastContactUnknownValue = -1 +) + +// RaftRuntime describes a raft group observed by the metrics exporter. +type RaftRuntime struct { + GroupID uint64 + Raft *raft.Raft +} + +type RaftMetrics struct { + localState *prometheus.GaugeVec + leaderIdentity *prometheus.GaugeVec + memberPresent *prometheus.GaugeVec + memberIsLeader *prometheus.GaugeVec + memberCount *prometheus.GaugeVec + commitIndex *prometheus.GaugeVec + appliedIndex *prometheus.GaugeVec + lastContact *prometheus.GaugeVec +} + +func newRaftMetrics(registerer prometheus.Registerer) *RaftMetrics { + m := &RaftMetrics{ + localState: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_raft_local_state", + Help: "Current local raft state for each group.", + }, + []string{"group", "state"}, + ), + leaderIdentity: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_raft_leader_identity", + Help: "Current leader identity for each raft group, as observed by this node.", + }, + []string{"group", "leader_id", "leader_address"}, + ), + memberPresent: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_raft_member_present", + Help: "Current raft configuration members known to this node.", + }, + []string{"group", "member_id", "member_address", "suffrage"}, + ), + memberIsLeader: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_raft_member_is_leader", + Help: "Whether a raft configuration member is the current leader, as observed by this node.", + }, + []string{"group", "member_id", "member_address"}, + ), + memberCount: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_raft_members", + Help: "Number of raft configuration members currently known for each group.", + }, + []string{"group"}, + ), + commitIndex: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_raft_commit_index", + Help: "Latest raft commit index for each group.", + }, + []string{"group"}, + ), + appliedIndex: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_raft_applied_index", + Help: "Latest raft applied index for each group.", + }, + []string{"group"}, + ), + lastContact: prometheus.NewGaugeVec( + prometheus.GaugeOpts{ + Name: "elastickv_raft_last_contact_seconds", + Help: "Time since the last observed leader contact for each group.", + }, + []string{"group"}, + ), + } + + registerer.MustRegister( + m.localState, + m.leaderIdentity, + m.memberPresent, + m.memberIsLeader, + m.memberCount, + m.commitIndex, + m.appliedIndex, + m.lastContact, + ) + + return m +} + +type RaftObserver struct { + metrics *RaftMetrics + + mu sync.Mutex + leaderLabels map[string]prometheus.Labels + memberLabels map[string]map[string]prometheus.Labels + memberLeaders map[string]map[string]prometheus.Labels +} + +func newRaftObserver(metrics *RaftMetrics) *RaftObserver { + return &RaftObserver{ + metrics: metrics, + leaderLabels: map[string]prometheus.Labels{}, + memberLabels: map[string]map[string]prometheus.Labels{}, + memberLeaders: map[string]map[string]prometheus.Labels{}, + } +} + +// Start polls raft state and configuration on a fixed interval until ctx is canceled. +func (o *RaftObserver) Start(ctx context.Context, runtimes []RaftRuntime, interval time.Duration) { + if o == nil { + return + } + if interval <= 0 { + interval = defaultObserveInterval + } + o.ObserveOnce(runtimes) + ticker := time.NewTicker(interval) + go func() { + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + o.ObserveOnce(runtimes) + } + } + }() +} + +// ObserveOnce captures the latest raft state for all configured runtimes. +func (o *RaftObserver) ObserveOnce(runtimes []RaftRuntime) { + if o == nil { + return + } + for _, runtime := range runtimes { + o.observeRuntime(runtime) + } +} + +func (o *RaftObserver) observeRuntime(runtime RaftRuntime) { + if o == nil || o.metrics == nil || runtime.Raft == nil { + return + } + + group := strconv.FormatUint(runtime.GroupID, 10) + stats := runtime.Raft.Stats() + state := normalizeRaftState(stats["state"]) + for _, candidate := range raftStates { + value := 0.0 + if candidate == state { + value = 1 + } + o.metrics.localState.WithLabelValues(group, candidate).Set(value) + } + + o.metrics.commitIndex.WithLabelValues(group).Set(float64(parseUintMetric(stats["commit_index"]))) + o.metrics.appliedIndex.WithLabelValues(group).Set(float64(parseUintMetric(stats["applied_index"]))) + o.metrics.lastContact.WithLabelValues(group).Set(parseLastContactSeconds(stats["last_contact"])) + + leaderAddr, leaderID := runtime.Raft.LeaderWithID() + o.setLeaderMetric(group, string(leaderID), string(leaderAddr)) + + future := runtime.Raft.GetConfiguration() + if err := future.Error(); err != nil { + return + } + o.metrics.memberCount.WithLabelValues(group).Set(float64(len(future.Configuration().Servers))) + o.setMembers(group, string(leaderID), future.Configuration().Servers) +} + +func (o *RaftObserver) setLeaderMetric(group string, leaderID string, leaderAddress string) { + o.mu.Lock() + defer o.mu.Unlock() + + if previous, ok := o.leaderLabels[group]; ok { + o.metrics.leaderIdentity.Delete(previous) + delete(o.leaderLabels, group) + } + if leaderID == "" && leaderAddress == "" { + return + } + labels := prometheus.Labels{ + "group": group, + "leader_id": leaderID, + "leader_address": leaderAddress, + } + o.metrics.leaderIdentity.With(labels).Set(1) + o.leaderLabels[group] = labels +} + +func (o *RaftObserver) setMembers(group string, leaderID string, servers []raft.Server) { + o.mu.Lock() + defer o.mu.Unlock() + + previousMembers := o.memberLabels[group] + for _, labels := range previousMembers { + o.metrics.memberPresent.Delete(labels) + } + previousLeaders := o.memberLeaders[group] + for _, labels := range previousLeaders { + o.metrics.memberIsLeader.Delete(labels) + } + + nextMembers := make(map[string]prometheus.Labels, len(servers)) + nextLeaders := make(map[string]prometheus.Labels, len(servers)) + for _, server := range servers { + memberID := string(server.ID) + memberAddress := string(server.Address) + memberKey := fmt.Sprintf("%s|%s", memberID, memberAddress) + + memberLabels := prometheus.Labels{ + "group": group, + "member_id": memberID, + "member_address": memberAddress, + "suffrage": normalizeSuffrage(server.Suffrage), + } + o.metrics.memberPresent.With(memberLabels).Set(1) + nextMembers[memberKey] = memberLabels + + leaderLabels := prometheus.Labels{ + "group": group, + "member_id": memberID, + "member_address": memberAddress, + } + if memberID != "" && memberID == leaderID { + o.metrics.memberIsLeader.With(leaderLabels).Set(1) + } else { + o.metrics.memberIsLeader.With(leaderLabels).Set(0) + } + nextLeaders[memberKey] = leaderLabels + } + + o.memberLabels[group] = nextMembers + o.memberLeaders[group] = nextLeaders +} + +func normalizeRaftState(raw string) string { + switch strings.ToLower(strings.TrimSpace(raw)) { + case "follower": + return "follower" + case "candidate": + return "candidate" + case "leader": + return "leader" + case "shutdown": + return "shutdown" + default: + return "unknown" + } +} + +func normalizeSuffrage(s raft.ServerSuffrage) string { + switch s { + case raft.Voter: + return "voter" + case raft.Nonvoter: + return "nonvoter" + case raft.Staging: + return "staging" + default: + return "unknown" + } +} + +func parseUintMetric(raw string) uint64 { + v, err := strconv.ParseUint(strings.TrimSpace(raw), 10, 64) + if err != nil { + return 0 + } + return v +} + +func parseLastContactSeconds(raw string) float64 { + raw = strings.TrimSpace(raw) + switch raw { + case "", "never": + return lastContactUnknownValue + case "0": + return 0 + } + d, err := time.ParseDuration(raw) + if err != nil { + if _, loaded := loggedLastContactParseValues.LoadOrStore(raw, struct{}{}); !loaded { + slog.Warn("failed to parse raft last_contact metric", "raw", raw, "err", err) + } + return lastContactUnknownValue + } + return d.Seconds() +} diff --git a/monitoring/raft_test.go b/monitoring/raft_test.go new file mode 100644 index 00000000..76748509 --- /dev/null +++ b/monitoring/raft_test.go @@ -0,0 +1,49 @@ +package monitoring + +import ( + "strings" + "testing" + + "github.com/hashicorp/raft" + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" +) + +func TestRaftObserverSetMembersReplacesRemovedMembers(t *testing.T) { + registry := NewRegistry("n1", "10.0.0.1:50051") + observer := registry.RaftObserver() + require.NotNil(t, observer) + + observer.setLeaderMetric("1", "n2", "10.0.0.2:50051") + observer.setMembers("1", "n2", []raft.Server{ + {ID: "n1", Address: "10.0.0.1:50051", Suffrage: raft.Voter}, + {ID: "n2", Address: "10.0.0.2:50051", Suffrage: raft.Voter}, + }) + observer.setLeaderMetric("1", "n1", "10.0.0.1:50051") + observer.setMembers("1", "n1", []raft.Server{ + {ID: "n1", Address: "10.0.0.1:50051", Suffrage: raft.Voter}, + }) + + err := testutil.GatherAndCompare( + registry.Gatherer(), + strings.NewReader(` +# HELP elastickv_raft_leader_identity Current leader identity for each raft group, as observed by this node. +# TYPE elastickv_raft_leader_identity gauge +elastickv_raft_leader_identity{group="1",leader_address="10.0.0.1:50051",leader_id="n1",node_address="10.0.0.1:50051",node_id="n1"} 1 +# HELP elastickv_raft_member_present Current raft configuration members known to this node. +# TYPE elastickv_raft_member_present gauge +elastickv_raft_member_present{group="1",member_address="10.0.0.1:50051",member_id="n1",node_address="10.0.0.1:50051",node_id="n1",suffrage="voter"} 1 +`), + "elastickv_raft_leader_identity", + "elastickv_raft_member_present", + ) + require.NoError(t, err) +} + +func TestParseLastContactSeconds(t *testing.T) { + require.Equal(t, float64(lastContactUnknownValue), parseLastContactSeconds("")) + require.Equal(t, float64(lastContactUnknownValue), parseLastContactSeconds("never")) + require.Equal(t, 0.0, parseLastContactSeconds("0")) + require.Equal(t, 0.25, parseLastContactSeconds("250ms")) + require.Equal(t, float64(lastContactUnknownValue), parseLastContactSeconds("invalid")) +} diff --git a/monitoring/registry.go b/monitoring/registry.go new file mode 100644 index 00000000..616ce23c --- /dev/null +++ b/monitoring/registry.go @@ -0,0 +1,68 @@ +package monitoring + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +// Registry owns the Prometheus registry used by a single Elastickv node. +type Registry struct { + baseRegistry *prometheus.Registry + registerer prometheus.Registerer + gatherer prometheus.Gatherer + + dynamo *DynamoDBMetrics + raft *RaftMetrics +} + +// NewRegistry builds a registry with constant labels that identify the local node. +func NewRegistry(nodeID string, nodeAddress string) *Registry { + base := prometheus.NewRegistry() + registerer := prometheus.WrapRegistererWith(prometheus.Labels{ + "node_id": nodeID, + "node_address": nodeAddress, + }, base) + + r := &Registry{ + baseRegistry: base, + registerer: registerer, + gatherer: base, + } + r.dynamo = newDynamoDBMetrics(registerer) + r.raft = newRaftMetrics(registerer) + return r +} + +// Handler returns an HTTP handler that exposes the Prometheus scrape endpoint. +func (r *Registry) Handler() http.Handler { + if r == nil || r.gatherer == nil { + return promhttp.Handler() + } + return promhttp.HandlerFor(r.gatherer, promhttp.HandlerOpts{}) +} + +// Gatherer exposes the underlying gatherer for tests and custom exporters. +func (r *Registry) Gatherer() prometheus.Gatherer { + if r == nil { + return nil + } + return r.gatherer +} + +// DynamoDBObserver returns the DynamoDB request observer backed by this registry. +func (r *Registry) DynamoDBObserver() DynamoDBRequestObserver { + if r == nil { + return nil + } + return r.dynamo +} + +// RaftObserver returns the Raft topology observer backed by this registry. +func (r *Registry) RaftObserver() *RaftObserver { + if r == nil { + return nil + } + return newRaftObserver(r.raft) +}