Add Prometheus/Grafana observability for DynamoDB and Raft and normal…#340
Add Prometheus/Grafana observability for DynamoDB and Raft and normal…#340
Conversation
…ize legacy GSI snapshots for compatibility
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the observability of Elastickv by integrating Prometheus and Grafana. It introduces detailed metrics for DynamoDB-compatible API operations and Raft consensus, allowing users to monitor performance, identify bottlenecks, and understand cluster health. Additionally, it includes a critical compatibility fix for legacy GSI schema snapshots, ensuring smooth operation with existing data. Highlights
Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive observability layer using Prometheus and Grafana for both DynamoDB and Raft components, which is a significant improvement for monitoring and debugging the system. The implementation is well-structured, with a new monitoring package that cleanly separates the metrics logic, and includes a fix to normalize legacy GSI snapshots. However, the current implementation is vulnerable to Denial of Service (DoS) attacks via Prometheus label cardinality explosion. Specifically, the operation and table labels are derived from untrusted user input without sufficient validation, which an attacker could exploit to cause excessive memory usage. It is recommended to implement strict validation for these labels against known operations and existing tables. Additionally, there are minor suggestions for improving error handling in the new monitoring and server code.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces detailed Prometheus metrics for DynamoDB-compatible API and the internal Raft state, significantly enhancing system observability. The implementation is generally solid. The inclusion of a pre-built Grafana dashboard, along with the necessary Docker Compose and configuration files, facilitates easy adoption of the new monitoring capabilities. Furthermore, the change to normalize legacy GSI snapshots improves backward compatibility and data consistency. The code is well-structured, thoroughly tested, and the documentation updates are clear and helpful. Consider the suggestion to simplify the table merging logic.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces comprehensive observability features by adding Prometheus metrics for DynamoDB and Raft operations, along with a pre-configured Grafana dashboard. The implementation is well-structured within a new monitoring package, following best practices for metrics instrumentation in Go. Additionally, it includes a backward-compatibility improvement for legacy GSI snapshots. The code quality is high, with good test coverage for the new functionality. I've found one issue related to the server shutdown sequence that should be addressed.
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive monitoring solution using Prometheus and Grafana for DynamoDB and Raft components, which is a great step towards better observability. It also includes a backward-compatibility improvement for legacy GSI snapshots. The changes are extensive, adding a new monitoring package, instrumenting the dynamodb adapter, and updating server entrypoints. The implementation is well-structured, using functional options for configuration and providing new tests for the added functionality. My review identified a potential concurrency issue in the Raft metrics observer and a minor code duplication that could be refactored for better maintainability.
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces significant observability features by adding Prometheus metrics for DynamoDB and Raft operations, along with a pre-configured Grafana dashboard, and includes a valuable fix for normalizing legacy Global Secondary Index (GSI) snapshots. While the implementation is well-structured, it introduces security risks related to denial of service and unauthorized information disclosure. Specifically, the lack of authentication on the new metrics endpoints and the use of unvalidated user-supplied strings (table names) as Prometheus labels can lead to a label cardinality explosion, causing a denial of service. It is recommended to implement authentication for the metrics server and restrict label values to a known set of authorized resources. Additionally, consider improving the robustness of the metrics collection logic.
|
|
||
| func (m *DynamoDBMetrics) observeTables(operation string, outcome string, tables []string, tableMetrics map[string]DynamoDBTableMetrics) { | ||
| for _, table := range tables { | ||
| m.tableRequests.WithLabelValues(operation, table, outcome).Inc() |
There was a problem hiding this comment.
The application uses user-supplied table names as Prometheus label values without any validation or restriction. Since the DynamoDB-compatible API (specifically the CreateTable operation) lacks authentication, an attacker can programmatically create a large number of tables with unique names. This leads to a 'label cardinality explosion' in the monitoring system (Prometheus), which can cause excessive memory consumption and potentially crash the Prometheus server or the application itself, resulting in a denial of service (DoS).
cmd/server/demo.go
Outdated
| ms := &http.Server{ | ||
| Handler: metricsRegistry.Handler(), | ||
| ReadHeaderTimeout: time.Second, | ||
| } |
There was a problem hiding this comment.
The newly introduced metrics server is started without any authentication mechanism. This endpoint exposes internal system metrics to anyone who can reach the network port. While common in isolated environments, exposing metrics without authentication can lead to information disclosure (e.g., table names, traffic patterns) and increases the attack surface for denial-of-service attacks, such as scraping the endpoint at a high frequency or exploiting the label cardinality issue mentioned in other comments.
main.go
Outdated
| if err := startMetricsServer(r.ctx, r.lc, r.eg, r.metricsAddress, r.metricsRegistry.Handler()); err != nil { | ||
| return waitErrgroupAfterStartupFailure(r.cancel, r.eg, err) | ||
| } |
There was a problem hiding this comment.
| func (s *dynamoRequestMetricsState) addTableMetrics(table string, returnedItems int, scannedItems int, writtenItems int) { | ||
| if s == nil { | ||
| return | ||
| } | ||
| table = strings.TrimSpace(table) | ||
| if table == "" { | ||
| return | ||
| } | ||
| metrics, ok := s.tables[table] | ||
| if !ok { | ||
| return | ||
| } | ||
| metrics.ReturnedItems += returnedItems | ||
| metrics.ScannedItems += scannedItems | ||
| metrics.WrittenItems += writtenItems | ||
| s.tables[table] = metrics | ||
| } |
There was a problem hiding this comment.
The current implementation of addTableMetrics will silently drop metrics if recordTable has not been called for a given table first. This makes the metrics collection fragile, as it relies on every call path correctly calling recordTable before addTableMetrics. To make this more robust, addTableMetrics should ensure the table is recorded itself. Calling recordTable within addTableMetrics is idempotent and will prevent metrics from being silently lost if a new code path is added in the future that doesn't explicitly call recordTable first.
func (s *dynamoRequestMetricsState) addTableMetrics(table string, returnedItems int, scannedItems int, writtenItems int) {
if s == nil {
return
}
table = strings.TrimSpace(table)
if table == "" {
return
}
s.recordTable(table)
metrics := s.tables[table]
metrics.ReturnedItems += returnedItems
metrics.ScannedItems += scannedItems
metrics.WrittenItems += writtenItems
s.tables[table] = metrics
}|
/gemini review |
|
/gemini review |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This is an excellent pull request that adds comprehensive observability features for DynamoDB and Raft using Prometheus and Grafana. The implementation is robust, featuring production-ready practices like metric cardinality management and security for the metrics endpoint. The code is well-structured, thoroughly tested, and accompanied by clear documentation and tooling. The addition of backward compatibility for legacy GSI snapshots is also a valuable improvement. I have one suggestion to improve resource handling in the demo application's startup sequence.
| if err != nil { | ||
| return err | ||
| } |
There was a problem hiding this comment.
There's a potential resource leak in the run function's startup sequence. If this call to setupMetricsHTTPServer fails, the function returns an error, but previously created network listeners (e.g., dynamoL, redisL, grpcSock) are not closed. This can lead to file descriptor leaks.
To address this, you should ensure resources are cleaned up in case of partial startup failure. You could explicitly close the listeners in the error-handling blocks. A more robust solution would be to refactor this run function to adopt the resource management pattern used in main.go with its runtimeServerRunner and start... helper functions.
There was a problem hiding this comment.
Code Review
This is an excellent pull request that adds comprehensive observability for DynamoDB and Raft using Prometheus and Grafana. The implementation is very well-thought-out, especially with the careful handling of metric cardinality, which is crucial for a stable monitoring system. The new monitoring package is well-structured, and the addition of tests and documentation is thorough. I also appreciate the backward compatibility consideration for legacy GSI snapshots. I have one minor suggestion to remove a piece of redundant code.
There was a problem hiding this comment.
Code Review
This pull request introduces significant observability features by adding Prometheus metrics for both the DynamoDB-compatible API and the underlying Raft consensus layer. It also includes pre-built Grafana dashboards for visualization. The implementation is robust, with careful consideration for metric cardinality and a secure-by-default approach for the new metrics endpoint. Additionally, this PR addresses a compatibility issue by normalizing legacy GSI snapshot formats, ensuring backward compatibility. The changes are well-structured and include comprehensive tests. My review focuses on a couple of areas for improvement regarding code consistency and metric accuracy.
Note: Security Review is unavailable for this PR.
main.go
Outdated
| func startMetricsServer(ctx context.Context, lc *net.ListenConfig, eg *errgroup.Group, metricsAddr string, metricsToken string, handler http.Handler) error { | ||
| if strings.TrimSpace(metricsAddr) == "" || handler == nil { | ||
| return nil | ||
| } | ||
| if monitoring.MetricsAddressRequiresToken(metricsAddr) && strings.TrimSpace(metricsToken) == "" { | ||
| return errors.New("metricsToken is required when metricsAddress is not loopback") | ||
| } | ||
| metricsL, err := lc.Listen(ctx, "tcp", metricsAddr) | ||
| if err != nil { | ||
| return errors.Wrapf(err, "failed to listen on %s", metricsAddr) | ||
| } | ||
| metricsServer := &http.Server{ | ||
| Handler: monitoring.ProtectHandler(handler, metricsToken), | ||
| ReadHeaderTimeout: time.Second, | ||
| } | ||
| eg.Go(func() error { | ||
| return serveMetricsUntilCanceled(ctx, metricsServer, metricsL, metricsAddr) | ||
| }) | ||
| return nil | ||
| } | ||
|
|
||
| func serveMetricsUntilCanceled(ctx context.Context, server *http.Server, listener net.Listener, metricsAddr string) error { | ||
| stop := make(chan struct{}) | ||
| go watchMetricsShutdown(ctx, server, stop, metricsAddr) | ||
|
|
||
| err := server.Serve(listener) | ||
| close(stop) | ||
| if errors.Is(err, http.ErrServerClosed) || errors.Is(err, net.ErrClosed) { | ||
| return nil | ||
| } | ||
| return errors.WithStack(err) | ||
| } | ||
|
|
||
| func watchMetricsShutdown(ctx context.Context, server *http.Server, stop <-chan struct{}, metricsAddr string) { | ||
| select { | ||
| case <-ctx.Done(): | ||
| shutdownCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), metricsShutdownTimeout) | ||
| defer cancel() | ||
| if err := server.Shutdown(shutdownCtx); err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, net.ErrClosed) { | ||
| log.Printf("metrics server shutdown error on %s: %v", metricsAddr, err) | ||
| } | ||
| case <-stop: | ||
| } | ||
| } |
There was a problem hiding this comment.
The logic for starting and stopping the metrics server here is implemented differently from the one in cmd/server/demo.go. The demo.go file uses errgroup tasks (metricsServeTask, metricsShutdownTask) which is cleaner and more consistent with how other services are managed in this application.
To improve code consistency and reduce duplication, I recommend refactoring this implementation to use the same task-based pattern as in cmd/server/demo.go. You could potentially move the shared task functions into the monitoring package to be reused by both main.go and demo.go.
References
- Centralize common complex logic, such as server startup/shutdown tasks, into shared helper functions or packages to avoid code duplication and improve maintainability, similar to how retry/backoff mechanisms are centralized.
| func parseLastContactSeconds(raw string) float64 { | ||
| raw = strings.TrimSpace(raw) | ||
| switch raw { | ||
| case "", "never": | ||
| return 0 | ||
| case "0": | ||
| return 0 | ||
| } | ||
| d, err := time.ParseDuration(raw) | ||
| if err != nil { | ||
| if _, loaded := loggedLastContactParseValues.LoadOrStore(raw, struct{}{}); !loaded { | ||
| slog.Warn("failed to parse raft last_contact metric", "raw", raw, "err", err) | ||
| } | ||
| return 0 | ||
| } | ||
| return d.Seconds() | ||
| } |
There was a problem hiding this comment.
The function parseLastContactSeconds currently returns 0 in cases of a parse error or when the value is "never". This can be misleading, as a value of 0 for elastickv_raft_last_contact_seconds typically implies that contact was made just now. This could mask issues with Raft stats reporting or lead to incorrect interpretations of the metric.
To make the metric clearer, I suggest returning a sentinel value like -1 for unknown states (e.g., "never" or parse errors). This makes it explicit that the value is not a valid duration, and the Grafana dashboard can be configured to handle this special case appropriately (e.g., by displaying 'N/A' or hiding the value).
| func parseLastContactSeconds(raw string) float64 { | |
| raw = strings.TrimSpace(raw) | |
| switch raw { | |
| case "", "never": | |
| return 0 | |
| case "0": | |
| return 0 | |
| } | |
| d, err := time.ParseDuration(raw) | |
| if err != nil { | |
| if _, loaded := loggedLastContactParseValues.LoadOrStore(raw, struct{}{}); !loaded { | |
| slog.Warn("failed to parse raft last_contact metric", "raw", raw, "err", err) | |
| } | |
| return 0 | |
| } | |
| return d.Seconds() | |
| } | |
| func parseLastContactSeconds(raw string) float64 { | |
| raw = strings.TrimSpace(raw) | |
| switch raw { | |
| case "", "never": | |
| return -1 // Indicate unknown/never | |
| case "0": | |
| return 0 | |
| } | |
| d, err := time.ParseDuration(raw) | |
| if err != nil { | |
| if _, loaded := loggedLastContactParseValues.LoadOrStore(raw, struct{}{}); !loaded { | |
| slog.Warn("failed to parse raft last_contact metric", "raw", raw, "err", err) | |
| } | |
| return -1 // Indicate parse error | |
| } | |
| return d.Seconds() | |
| } |
|
/gemini review |
There was a problem hiding this comment.
Code Review
This pull request introduces comprehensive observability for DynamoDB and Raft components using Prometheus and Grafana, which is a significant and valuable addition. The implementation is thorough, including metrics for request rates, latency, errors, and Raft state. It also includes a compatibility layer for legacy GSI snapshots. The code is well-structured, with a new monitoring package that cleanly separates concerns. The inclusion of tests, documentation updates, and pre-configured Grafana dashboards makes this a high-quality contribution. I have one minor suggestion for code simplification.
| func (c *countingReadCloser) Read(p []byte) (int, error) { | ||
| if c == nil || c.ReadCloser == nil { | ||
| return 0, io.EOF | ||
| } | ||
| n, err := c.ReadCloser.Read(p) | ||
| c.bytesRead += n | ||
| if err == nil { | ||
| return n, nil | ||
| } | ||
| if errors.Is(err, io.EOF) { | ||
| return n, io.EOF | ||
| } | ||
| return n, errors.WithStack(err) | ||
| } |
There was a problem hiding this comment.
This function's implementation is correct, but it can be simplified for better readability by reducing the number of conditional return paths.
func (c *countingReadCloser) Read(p []byte) (int, error) {
if c == nil || c.ReadCloser == nil {
return 0, io.EOF
}
n, err := c.ReadCloser.Read(p)
c.bytesRead += n
if err != nil && !errors.Is(err, io.EOF) {
return n, errors.WithStack(err)
}
return n, err
}
…ize legacy GSI snapshots for compatibility