项目实战:go语言实现redis(四)

背景

本系列文章记录如何基于go语言实现Redis,从整体设计到具体细节实现,不仅可以有效的锻炼自己的编码能力,又能加深对redis的认识。文章主要从整体设计思路入手,记录关键的设计步骤,详细的实现可以参考github上的相关代码。主体上有下面几个部分:

  • TCP服务器(一):支持同时监听多个TCP连接,并进行相关处理
  • Redis协议解析器(一):实现相关Handler,命令解析及响应处理
  • 内存数据库(二):实现数据库,注册相关命令,完成支持对数据库的增删改查
  • Redis持久化(三):实现redis中的持久化功能aof
  • **Redis集群(四)**:本文将通过一致性哈希的方式实现cluster集群

本章的项目目录结构如下,在前一篇的基础上新增了cluster相关文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
├─aof

├─cluster
│ client_pool.go
│ cluster_database.go
│ com.go
│ del.go
│ keys.go
│ ping.go
│ rename.go
│ router.go

├─config

├─database

├─datastruct
│ └─dict

├─interface
│ ├─database
│ │
│ ├─dict
│ │
│ ├─resp
│ │
│ └─tcp

├─lib

├─resp
│ ├─connection
│ │
│ ├─handler
│ │
│ ├─parser
│ │
│ └─reply

├─tcp

│ go.mod
│ main.go
│ redis.conf

实现一致性hash

结构体定义
在文件lib/comsistenthash/comsistenthash.go中定义结构体NodeMap,包含下面几个属性,并进行初始化,此处设置了一个默认的哈希函数crc32.ChecksumIEEE

  • hashFunc:类型为func,需要指定一个hash函数

  • nodeHashs:类型为int类型的切片,存储节点哈希后的值,且该切片有序的

  • nodehashMap:类型为map,key为哈希值,value是节点的地址

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // HashFunc defines function to generate hash code
    type HashFunc func(data []byte) uint32

    // NodeMap stores nodes and you can pick node from NodeMap
    type NodeMap struct {
    hashFunc HashFunc
    nodeHashs []int // sorted
    nodehashMap map[int]string
    }

    // NewNodeMap creates a new NodeMap
    func NewNodeMap(fn HashFunc) *NodeMap {
    m := &NodeMap{
    hashFunc: fn,
    nodehashMap: make(map[int]string),
    }
    if m.hashFunc == nil {
    m.hashFunc = crc32.ChecksumIEEE
    }
    return m
    }

    方法实现
    需要实现添加节点到哈希环中和从哈希环取出节点的方法:

    1
    2
    3
    4
    5
    6
    7
    8
    // IsEmpty returns if there is no node in NodeMap
    func (m *NodeMap) IsEmpty() bool {}

    // AddNode add the given nodes into consistent hash circle
    func (m *NodeMap) AddNode(keys ...string) {}

    // PickNode gets the closest item in the hash to the provided key.
    func (m *NodeMap) PickNode(key string) string {}

    集群核心架构

    连接池

    为了支持高并发的连接及能够对连接进行复用,此处引进了一个第三方连接池,通过命令go get "github.com/jolestar/go-commons-pool/v2"进行下载。另外,在cluster/client_pool.go中定义结构体connectionFactory实现其接口PooledObjectFactory

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    type connectionFactory struct {
    Peer string
    }

    func (f *connectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {}

    func (f *connectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {}

    func (f *connectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {}

    func (f *connectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {}

    func (f *connectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {}

    实现ClusterDatabase

    需要对原来单体的Database进行进一步封装,新建文件cluster/cluster_database.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    // ClusterDatabase represents a node of godis cluster
    // it holds part of data and coordinates other nodes to finish transactions
    type ClusterDatabase struct {
    self string

    nodes []string
    peerPicker *consistenthash.NodeMap
    peerConnection map[string]*pool.ObjectPool
    db databaseface.Database
    }

    // MakeClusterDatabase creates and starts a node of cluster
    func MakeClusterDatabase() *ClusterDatabase {
    cluster := &ClusterDatabase{
    self: config.Properties.Self,

    db: database.NewStandaloneDatabase(),
    peerPicker: consistenthash.NewNodeMap(nil),
    peerConnection: make(map[string]*pool.ObjectPool),
    }
    nodes := make([]string, 0, len(config.Properties.Peers)+1)
    for _, peer := range config.Properties.Peers {
    nodes = append(nodes, peer)
    }
    nodes = append(nodes, config.Properties.Self)
    cluster.peerPicker.AddNode(nodes...)
    ctx := context.Background()
    for _, peer := range config.Properties.Peers {
    cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &connectionFactory{
    Peer: peer,
    })
    }
    cluster.nodes = nodes
    return cluster
    }

    另外还需实现以下几个方法。

    1
    2
    3
    4
    5
    6
    7
    8
    // Close stops current node of cluster
    func (cluster *ClusterDatabase) Close() {}

    // Exec executes command on cluster
    func (cluster *ClusterDatabase) Exec(c resp.Connection, cmdLine [][]byte) (result resp.Reply) {}

    // AfterClientClose does some clean after client close connection
    func (cluster *ClusterDatabase) AfterClientClose(c resp.Connection) {}

    操作连接池

    cluster/com.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func (cluster *ClusterDatabase) getPeerClient(peer string) (*client.Client, error) {}

    func (cluster *ClusterDatabase) returnPeerClient(peer string, peerClient *client.Client) error {}

    // relay relays command to peer
    // select db by c.GetDBIndex()
    // cannot call Prepare, Commit, execRollback of self node
    func (cluster *ClusterDatabase) relay(peer string, c resp.Connection, args [][]byte) resp.Reply {}

    // broadcast broadcasts command to all node in cluster
    func (cluster *ClusterDatabase) broadcast(c resp.Connection, args [][]byte) map[string]resp.Reply {}

    指令路由

    需要根据不同的指令,操作对应的节点,这里需要有一个全局的方式,找到执行指令的相关函数,新建文件cluster/router.go

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    func makeRouter() map[string]CmdFunc {
    routerMap := make(map[string]CmdFunc)
    routerMap["ping"] = ping

    routerMap["del"] = Del

    routerMap["exists"] = defaultFunc
    routerMap["type"] = defaultFunc
    routerMap["rename"] = Rename
    routerMap["renamenx"] = Rename

    routerMap["set"] = defaultFunc
    routerMap["setnx"] = defaultFunc
    routerMap["get"] = defaultFunc
    routerMap["getset"] = defaultFunc

    routerMap["flushdb"] = FlushDB

    return routerMap
    }

    // relay command to responsible peer, and return its reply to client
    func defaultFunc(cluster *ClusterDatabase, c resp.Connection, args [][]byte) resp.Reply {
    key := string(args[1])
    peer := cluster.peerPicker.PickNode(key)
    return cluster.relay(peer, c, args)
    }

    修改特殊的指令操作,分别在下述文件中添加对应的函数。

  • cluster/ping.go:直接执行即可

  • cluster/del.go:需要广播至集群的所有节点

  • cluster/rename.go:需要找到键所在的节点执行,并处理重命名后更换节点的情况

  • cluster/keys.go:需要广播至集群的所有节点

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    func ping(cluster *ClusterDatabase, c resp.Connection, cmdAndArgs [][]byte) resp.Reply {}

    // Del atomically removes given writeKeys from cluster, writeKeys can be distributed on any node
    // if the given writeKeys are distributed on different node, Del will use try-commit-catch to remove them
    func Del(cluster *ClusterDatabase, c resp.Connection, args [][]byte) resp.Reply {}

    // Rename renames a key, the origin and the destination must within the same node
    func Rename(cluster *ClusterDatabase, c resp.Connection, args [][]byte) resp.Reply {}

    // FlushDB removes all data in current database
    func FlushDB(cluster *ClusterDatabase, c resp.Connection, args [][]byte) resp.Reply {}

    启用集群模式

    要使用集群需要修改resp/handler/handler.go中初始化Database的逻辑。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    // MakeHandler creates a RespHandler instance
    func MakeHandler() *RespHandler {
    var db databaseface.Database
    if config.Properties.Self != "" &&
    len(config.Properties.Peers) > 0 {
    db = cluster.MakeClusterDatabase()
    } else {
    db = database.NewStandaloneDatabase()
    }

    return &RespHandler{
    db: db,
    }
    }

    总结

    至此,基于go语言实现redis的项目已基本完成了。项目中有许多值得学习的地方,例如整个架构的设计,整体功能的切分及具体实现。每一步的过程都需要有自己的思考,而不是按部就班,随便记录以下就完了,记录下这个系列的文章,也是为了以后的回顾,温故而知新才能不断的加深影响提高自己。不过项目也依然有很多需要优化的地方,这个可以参考开源的项目godis