Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changes/unreleased/Added-20260109-223536.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Added
body: Enabled automatic etcd client ↔ server mode reconfiguration
time: 2026-01-09T22:35:36.887346+05:30
78 changes: 78 additions & 0 deletions server/internal/etcd/embedded.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func (e *EmbeddedEtcd) Start(ctx context.Context) error {
e.mu.Lock()
defer e.mu.Unlock()

if e.etcd != nil {
return nil // already started
}

initialized, err := e.IsInitialized()
if err != nil {
return err
Expand Down Expand Up @@ -292,6 +296,7 @@ func (e *EmbeddedEtcd) Shutdown() error {
}
if e.etcd != nil {
e.etcd.Close()
e.etcd = nil
}
return errors.Join(errs...)
}
Expand Down Expand Up @@ -443,6 +448,79 @@ func (e *EmbeddedEtcd) HealthCheck() common.ComponentStatus {
}
}

func (e *EmbeddedEtcd) ChangeMode(ctx context.Context, mode config.EtcdMode) (Etcd, error) {
if mode != config.EtcdModeClient {
return nil, fmt.Errorf("invalid mode transition from %s to %s", config.EtcdModeServer, mode)
}

if err := e.Start(ctx); err != nil {
return nil, err
}

cfg := e.cfg.Config()

embeddedClient, err := e.GetClient()
if err != nil {
return nil, err
}

// Get the full member list before removing this host
resp, err := embeddedClient.MemberList(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list etcd members for server->client transition: %w", err)
}

var endpoints []string
for _, m := range resp.Members {
// Skip this host's member; we are about to remove it.
if m.Name == cfg.HostID {
continue
}
endpoints = append(endpoints, m.ClientURLs...)
}

if len(endpoints) == 0 {
return nil, fmt.Errorf("cannot demote etcd server on host %s: no remaining cluster members with client URLs", cfg.HostID)
}

generated := e.cfg.GeneratedConfig()
generated.EtcdClient.Endpoints = endpoints
if err := e.cfg.UpdateGeneratedConfig(generated); err != nil {
return nil, fmt.Errorf("failed to update generated config with client endpoints: %w", err)
}

if err := e.Shutdown(); err != nil {
return nil, err
}

remote := NewRemoteEtcd(e.cfg, e.logger)
if err := remote.Start(ctx); err != nil {
return nil, fmt.Errorf("failed to start remote client: %w", err)
}

remoteClient, err := remote.GetClient()
if err != nil {
return nil, fmt.Errorf("failed to get remote client: %w", err)
}

if err := RemoveMember(ctx, remoteClient, cfg.HostID); err != nil {
return nil, fmt.Errorf("failed to remove embedded etcd from cluster: %w", err)
}

if err := os.RemoveAll(e.etcdDir()); err != nil {
return nil, fmt.Errorf("failed to remove embedded etcd data dir: %w", err)
}

generated.EtcdMode = config.EtcdModeClient
generated.EtcdServer = config.EtcdServer{}
generated.EtcdClient = cfg.EtcdClient
if err := e.cfg.UpdateGeneratedConfig(generated); err != nil {
return nil, fmt.Errorf("failed to clear out etcd server settings in generated config: %w", err)
}

return remote, err
}

const maxLearnerStallTime = 5 * time.Minute

type learnerProgress struct {
Expand Down
2 changes: 2 additions & 0 deletions server/internal/etcd/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3"

"github.com/pgEdge/control-plane/server/internal/common"
"github.com/pgEdge/control-plane/server/internal/config"
)

type ClusterMember struct {
Expand Down Expand Up @@ -50,4 +51,5 @@ type Etcd interface {
RemoveHost(ctx context.Context, hostID string) error
JoinToken() (string, error)
VerifyJoinToken(in string) error
ChangeMode(ctx context.Context, mode config.EtcdMode) (Etcd, error)
}
64 changes: 58 additions & 6 deletions server/internal/etcd/provide.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package etcd

import (
"context"
"fmt"
"time"

"github.com/rs/zerolog"
"github.com/samber/do"
Expand All @@ -27,6 +29,18 @@ func provideClient(i *do.Injector) {
})
}

// newEtcdForMode creates an Etcd instance based on the specified mode.
func newEtcdForMode(mode config.EtcdMode, cfg *config.Manager, logger zerolog.Logger) (Etcd, error) {
switch mode {
case config.EtcdModeServer:
return NewEmbeddedEtcd(cfg, logger), nil
case config.EtcdModeClient:
return NewRemoteEtcd(cfg, logger), nil
default:
return nil, fmt.Errorf("invalid etcd mode: %s", mode)
}
}

func provideEtcd(i *do.Injector) {
do.Provide(i, func(i *do.Injector) (Etcd, error) {
cfg, err := do.Invoke[*config.Manager](i)
Expand All @@ -38,13 +52,51 @@ func provideEtcd(i *do.Injector) {
return nil, err
}

switch storageType := cfg.Config().EtcdMode; storageType {
case config.EtcdModeServer:
return NewEmbeddedEtcd(cfg, logger), nil
case config.EtcdModeClient:
return NewRemoteEtcd(cfg, logger), nil
appCfg := cfg.Config()
generated := cfg.GeneratedConfig()

oldMode := generated.EtcdMode
newMode := appCfg.EtcdMode

logger.Info().
Str("old_mode", string(oldMode)).
Str("new_mode", string(newMode)).
Bool("old_mode_empty", oldMode == "").
Bool("modes_equal", oldMode == newMode).
Msg("checking etcd mode for reconfiguration")

// Mode has changed - perform reconfiguration.
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()

switch {
case oldMode == "" || oldMode == newMode:
etcd, err := newEtcdForMode(newMode, cfg, logger)
if err != nil {
return nil, err
}
initialized, err := etcd.IsInitialized()
if err != nil {
return nil, err
}
if initialized {
generated.EtcdMode = appCfg.EtcdMode
generated.EtcdServer = appCfg.EtcdServer
generated.EtcdClient = appCfg.EtcdClient
if err := cfg.UpdateGeneratedConfig(generated); err != nil {
return nil, fmt.Errorf("failed to persist etcd configuration: %w", err)
}
}

return etcd, nil
case oldMode == config.EtcdModeServer && newMode == config.EtcdModeClient:
embedded := NewEmbeddedEtcd(cfg, logger)
return embedded.ChangeMode(ctx, newMode)
case oldMode == config.EtcdModeClient && newMode == config.EtcdModeServer:
remote := NewRemoteEtcd(cfg, logger)
return remote.ChangeMode(ctx, newMode)
default:
return nil, fmt.Errorf("invalid storage type: %s", storageType)
return nil, fmt.Errorf("unsupported etcd mode transition: %s -> %s", oldMode, newMode)
}
})
}
Expand Down
42 changes: 31 additions & 11 deletions server/internal/etcd/rbac.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,18 +51,9 @@ func CreateHostCredentials(

if opts.EmbeddedEtcdEnabled {
// Create a cert for the peer server
serverPrincipal, err := certSvc.EtcdServer(ctx,
opts.HostID,
opts.Hostname,
[]string{"localhost", opts.Hostname},
[]string{"127.0.0.1", opts.IPv4Address},
)
if err != nil {
return nil, fmt.Errorf("failed to create cert for etcd server: %w", err)
if err := addEtcdServerCredentials(ctx, opts.HostID, opts.Hostname, opts.IPv4Address, certSvc, creds); err != nil {
return nil, err
}

creds.ServerCert = serverPrincipal.CertPEM
creds.ServerKey = serverPrincipal.KeyPEM
}

return creds, nil
Expand Down Expand Up @@ -353,9 +344,38 @@ func writeHostCredentials(creds *HostCredentials, cfg *config.Manager) error {
generatedCfg := cfg.GeneratedConfig()
generatedCfg.EtcdUsername = creds.Username
generatedCfg.EtcdPassword = creds.Password
generatedCfg.EtcdMode = appCfg.EtcdMode
generatedCfg.EtcdClient = appCfg.EtcdClient
generatedCfg.EtcdServer = appCfg.EtcdServer

if err := cfg.UpdateGeneratedConfig(generatedCfg); err != nil {
return fmt.Errorf("failed to update generated config: %w", err)
}

return nil
}

func addEtcdServerCredentials(
ctx context.Context,
hostID string,
hostname string,
ipv4Address string,
certSvc *certificates.Service,
creds *HostCredentials,
) error {
// Create a cert for the peer server
serverPrincipal, err := certSvc.EtcdServer(ctx,
hostID,
hostname,
[]string{"localhost", hostname},
[]string{"127.0.0.1", ipv4Address},
)
if err != nil {
return fmt.Errorf("failed to create cert for etcd server: %w", err)
}

creds.ServerCert = serverPrincipal.CertPEM
creds.ServerKey = serverPrincipal.KeyPEM

return nil
}
62 changes: 62 additions & 0 deletions server/internal/etcd/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,65 @@ func (r *RemoteEtcd) updateEndpointsConfig(ctx context.Context, client *clientv3

return nil
}

func (r *RemoteEtcd) ChangeMode(ctx context.Context, mode config.EtcdMode) (Etcd, error) {
if mode != config.EtcdModeServer {
return nil, fmt.Errorf("invalid mode transition from %s to %s", config.EtcdModeClient, mode)
}

if err := r.Start(ctx); err != nil {
return nil, err
}

cfg := r.cfg.Config()

clientPrincipal, err := r.certSvc.HostEtcdUser(ctx, cfg.HostID)
if err != nil {
return nil, fmt.Errorf("failed to get client principal: %w", err)
}

creds := &HostCredentials{
Username: cfg.EtcdUsername,
Password: cfg.EtcdPassword,
CaCert: r.certSvc.CACert(),
ClientCert: clientPrincipal.CertPEM,
ClientKey: clientPrincipal.KeyPEM,
}

if err := addEtcdServerCredentials(ctx, cfg.HostID, cfg.Hostname, cfg.IPv4Address, r.certSvc, creds); err != nil {
return nil, err
}

client, err := r.GetClient()
if err != nil {
return nil, err
}

leader, err := GetClusterLeader(ctx, client)
if err != nil {
return nil, fmt.Errorf("failed to get cluster leader: %w", err)
}

if err := r.Shutdown(); err != nil {
return nil, err
}

embedded := NewEmbeddedEtcd(r.cfg, r.logger)
err = embedded.Join(ctx, JoinOptions{
Leader: leader,
Credentials: creds,
})
if err != nil {
return nil, fmt.Errorf("failed to join embedded etcd to cluster: %w", err)
}

generated := r.cfg.GeneratedConfig()
generated.EtcdMode = config.EtcdModeServer
generated.EtcdClient = config.EtcdClient{}
generated.EtcdServer = cfg.EtcdServer
if err := r.cfg.UpdateGeneratedConfig(generated); err != nil {
return nil, fmt.Errorf("failed to clear out etcd client settings in generated config: %w", err)
}

return embedded, nil
}