etcd 是一个分布式、高可用的键值存储系统,它被设计为可靠的、安全的、快速的,并具有简单的API。
etcd 使用 Go 语言开发,基于 Raft 算法实现了分布式一致性。它可以用于存储集群中的关键配置信息、服务发现、锁等。
etcd 的数据模型类似于一个简单的文件系统,支持 PUT、GET、DELETE 等操作,每个节点的数据会自动同步到其他节点上,因此可以实现高可用、自动故障转移等功能。etcd 还支持 Watch 机制,可以监控数据变化并触发相应的操作。
PS:本文前大半部分都是在讲一些 etcdctl、etcd server、raft 节点间数据流转与处理相关的内容,如不感兴趣,可直接跳转到 迎接请求到来 部分阅读 Put 操作流程。
下面是一个最简单的 etcd put 操作:
# 启动 etcd server
$ etcd
$ etcdctl put gretting "hello"
$ etcdctl get gretting
gretting
hello
当我们发送执行一个 put 操作时,etcdctl 内部会创建一个 etcd 的客户端,向 etcd 服务端发送相应的 grpc 请求来完成操作。

etcd 的源码位于 github.com/etcd-io/etc… 接下来以 etcdctl 为切入点开始源码解读。为了保持简单,我们只关心示例操作的主要流程,比如这篇里我们只关注与 Put 操作相关的流程。
etcdctl
etcdctl 用于向 etcd server 发送请求,下面是 etcdctl 工具的入口方法。
// file: etcdctl/main.go
func main() {
ctlv3.MustStart()
return
}
入口函数内调用了 ctlv3.MustStart(),在 ctlv3 包下,初始化了命令行参数及命令处理函数,MustStart() 方法最终会调用到命令行工具的 Execute() 方法执行命令。
// file: etcdctl/ctlv3/ctl.go
...
var (
rootCmd = &cobra.Command{
Use: cliName,
Short: cliDescription,
SuggestFor: []string{"etcdctl"},
}
)
func init() {
// 初始化并解析命令行参数
rootCmd.PersistentFlags().StringSliceVar(&globalFlags.Endpoints, "endpoints", []string{"127.0.0.1:2379"}, "gRPC endpoints")
rootCmd.PersistentFlags().BoolVar(&globalFlags.Debug, "debug", false, "enable client-side debug logging")
...
}
// 注册各类操作方法
rootCmd.AddCommand(
command.NewGetCommand(),
command.NewPutCommand(),
command.NewDelCommand(),
...
)
func Start() error {
...
// 执行命令
return rootCmd.Execute()
}
func MustStart() {
if err := Start(); err != nil {
...
}
}
可以看到,etcd 默认的 endpoints 为 127.0.0.1:2379,也就是说我们最开始启动的 etcd 服务默认就监听在这个地址。
由于我们是一次 PUT 操作,在 command.NewPutCommand() 中注册了 PUT 命令,命令的处理函数是 putCommandFunc,该函数中真正执行了 PUT 操作。
// file: etcdctl/ctlv3/command/put_command.go
func NewPutCommand() *cobra.Command {
cmd := &cobra.Command{
...
Run: putCommandFunc,
}
...
return cmd
}
func putCommandFunc(cmd *cobra.Command, args []string) {
...
resp, err := mustClientFromCmd(cmd).Put(ctx, key, value, opts...)
...
}
mustClientFromCmd() 实现了将命令行操作转换为 etcd 客户端。在 golang 中,我们通常使用 clientv3.Client 来作为 etcd 客户端。
clientv3.New(clientv3.Config{
Endpoints: conf.AppConf.Etcd.Hosts,
DialTimeout: 5 * time.Second,
...
})
在 mustClientFromCmd() 方法中会先调用 clientConfigFromCmd() 根据命令行参数生成必要的配置信息,然后调用 mustClient 创建客户端。
// file: etcdctl/ctlv3/command/global.go
func mustClientFromCmd(cmd *cobra.Command) *clientv3.Client {
cfg := clientConfigFromCmd(cmd)
return mustClient(cfg)
}
func clientConfigFromCmd(cmd *cobra.Command) *clientv3.ConfigSpec {
...
// 对应 clientv3 的 Endpoints 和 DialTimeout 属性
cfg := &clientv3.ConfigSpec{}
cfg.Endpoints, err = endpointsFromCmd(cmd)
cfg.DialTimeout = dialTimeoutFromCmd(cmd)
...
}
func mustClient(cc *clientv3.ConfigSpec) *clientv3.Client {
...
// 创建客户端
client, err := clientv3.New(*cfg)
...
return client
}
可以看到,etcdctl 工具实际上就是使用 clientv3.New 在内部创建了一个客户端来进行各种操作。我们先来大概了解一下创建客户端时做了些什么。
在 clientv3.Client 结构体中,组合了一些接口,这些接口负责执行各种 etcd 的操作,可以允许我们直接使用 Client 的实例来进行如 Put、Grant、Watch 等操作。
// file: client/v3/client.go
type Client struct {
KV
Lease
Watcher
...
}
// file: client/v3/kv.go
type KV interface {
Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error)
...
}
// file: client/v3/lease.go
type Lease interface {
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
...
}
在调用 clientv3.New() 时,会与 Endpoints 所在 etcd 服务建立连接,接着创建这些接口实现的实例。
// file: client/v3/client.go
func New(cfg Config) (*Client, error) {
...
return newClient(&cfg)
}
func newClient(cfg *Config) (*Client, error) {
...
// 建立连接
conn, err := client.dialWithBalancer()
...
// 创建接口实例
client.KV = NewKV(client)
client.Lease = NewLease(client)
...
}
dialWithBalancer() 方法向 etcd 服务发起了一个 grpc 连接请求,到这里我们知道,etcd 的客户端和服务端使用 grpc 进行通信。注意,客户端选择了 endpoints[0] 的地址发送连接请求,这是因为 Endpoints 列表中的第一个地址通常是 raft 协议的 leader 节点,Raft 协议规定只有 leader 节点才能处理写请求,因此客户端应该优先连接 leader 节点,这里不再细述。
// file: client/v3/client.go
func (c *Client) dialWithBalancer(dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
...
return c.dial(creds, opts...)
}
func (c *Client) dial(creds grpccredentials.TransportCredentials, dopts ...grpc.DialOption) (*grpc.ClientConn, error) {
...
target := fmt.Sprintf("%s://%p/%s", resolver.Schema, c, authority(c.endpoints[0]))
conn, err := grpc.DialContext(dctx, target, opts...)
...
}
再来看创建接口实例部分,由于我们进行的是最普通的 Put 操作,由 KV 接口定义,这里主要看 NewKV() 方法。方法逻辑非常简单,创建了一个 kv 结构体,该结构体实现了 KV 接口定义的所有方法。
// file: client/v3/kv.go
func NewKV(c *Client) KV {
api := &kv{remote: RetryKVClient(c)}
...
return api
}
RetryKVClient() 创建了一个 grpc 客户端,由于是 grpc,我们知道,存在 KVClient 就会存在一个对应的 KVServer,这个我们放到 etcd 服务启动的时候再来回顾。
// file: client/v3/retry.go
func RetryKVClient(c *Client) pb.KVClient {
return &retryKVClient{
kc: pb.NewKVClient(c.conn),
}
}
// file: api/etcdserverpb/rpc.pb.go
func NewKVClient(cc *grpc.ClientConn) KVClient {
return &kVClient{cc}
}
Put
处理完创建完客户端的逻辑之后,紧接着接着调用了 Put() 方法,开始发送 Put 请求。
resp, err := mustClientFromCmd(cmd).Put(ctx, key, value, opts...)
从前面我们知道,Client 调用 Put 方法会执行到 kv 结构体内的 Put() 。Put 方法内调用了 kv.Do(),传入了一个 Put Op 类型。kv.Do() 方法逻辑很简单,根据对应的 Op 类型向 etcd 发送一个 grpc 请求。在前面 NewKv(client) 时我们知道,kv.remote 就是 grpc 的客户端 KVClient,所以这里就是直接发送了一个 grpc put 请求。
// file: client/v3/kv.go
func (kv *kv) Put(ctx context.Context, key, val string, opts ...OpOption) (*PutResponse, error) {
r, err := kv.Do(ctx, OpPut(key, val, opts...))
return r.put, toErr(ctx, err)
}
func (kv *kv) Do(ctx context.Context, op Op) (OpResponse, error) {
var err error
switch op.t {
...
case tPut:
var resp *pb.PutResponse
// 创建请求 Message
r := &pb.PutRequest{Key: op.key, Value: op.val, Lease: int64(op.leaseID), PrevKv: op.prevKV, IgnoreValue: op.ignoreValue, IgnoreLease: op.ignoreLease}
// 调用 KVClient.Put() 方法
resp, err = kv.remote.Put(ctx, r, kv.callOpts...)
if err == nil {
return OpResponse{put: (*PutResponse)(resp)}, nil
}
...
}
return OpResponse{}, toErr(ctx, err)
}
目前为止,我们了解了使用 etcdctl 的一次 put 请求的核心流程,接下来看看 etcd 服务端是如何处理一个 Put 请求的。

etcd 服务启动
etcd server 的入口方法如下:
// file: server/main.go
func main() {
etcdmain.Main(os.Args)
}
由于我们没有给 etcd 传入任何参数,直接进入 startEtcdOrProxyV2() 流程。
// file: server/etcdmain/main.go
func Main(args []string) {
...
startEtcdOrProxyV2(args)
}
先关注最简单的 etcd 命令启动,在 startEtcdOrProxyV2() 中,主要流程就是初始化一些配置、日志,检查部分参数然后启动 etcd,最后阻塞等待 etcd 抛出错误或停止然后退出。
// file: server/etcdmain/etcd.go
func startEtcdOrProxyV2(args []string) {
...
// 配置初始化
cfg := newConfig()
...
// 日志初始化
lg := cfg.ec.GetLogger()
if lg == nil {
...
lg, zapError = logutil.CreateDefaultZapLogger(zap.InfoLevel)
}
...
// 启动
stopped, errc, err = startEtcd(&cfg.ec)
...
// 阻塞进程
select {
case lerr :=



