背景
本系列文章记录如何基于go语言实现Redis,从整体设计到具体细节实现,不仅可以有效的锻炼自己的编码能力,又能加深对redis的认识。文章主要从整体设计思路入手,记录关键的设计步骤,详细的实现可以参考github上的相关代码。主体上有下面几个部分:
- TCP服务器(一):支持同时监听多个TCP连接,并进行相关处理
- Redis协议解析器(一):实现相关Handler,命令解析及响应处理
- 内存数据库(二):实现数据库,注册相关命令,完成支持对数据库的增删改查
- Redis持久化(三):实现redis中的持久化功能aof
- **Redis集群(四)**:本文将通过一致性哈希的方式实现cluster集群
本章的项目目录结构如下,在前一篇的基础上新增了cluster相关文件:
1 | ├─aof |
实现一致性hash
结构体定义
在文件lib/comsistenthash/comsistenthash.go
中定义结构体NodeMap
,包含下面几个属性,并进行初始化,此处设置了一个默认的哈希函数crc32.ChecksumIEEE
。
hashFunc
:类型为func,需要指定一个hash函数nodeHashs
:类型为int类型的切片,存储节点哈希后的值,且该切片有序的nodehashMap
:类型为map,key为哈希值,value是节点的地址1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21// HashFunc defines function to generate hash code
type HashFunc func(data []byte) uint32
// NodeMap stores nodes and you can pick node from NodeMap
type NodeMap struct {
hashFunc HashFunc
nodeHashs []int // sorted
nodehashMap map[int]string
}
// NewNodeMap creates a new NodeMap
func NewNodeMap(fn HashFunc) *NodeMap {
m := &NodeMap{
hashFunc: fn,
nodehashMap: make(map[int]string),
}
if m.hashFunc == nil {
m.hashFunc = crc32.ChecksumIEEE
}
return m
}方法实现
需要实现添加节点到哈希环中和从哈希环取出节点的方法:1
2
3
4
5
6
7
8// IsEmpty returns if there is no node in NodeMap
func (m *NodeMap) IsEmpty() bool {}
// AddNode add the given nodes into consistent hash circle
func (m *NodeMap) AddNode(keys ...string) {}
// PickNode gets the closest item in the hash to the provided key.
func (m *NodeMap) PickNode(key string) string {}集群核心架构
连接池
为了支持高并发的连接及能够对连接进行复用,此处引进了一个第三方连接池,通过命令
go get "github.com/jolestar/go-commons-pool/v2"
进行下载。另外,在cluster/client_pool.go
中定义结构体connectionFactory
实现其接口PooledObjectFactory
。1
2
3
4
5
6
7
8
9
10
11
12
13type connectionFactory struct {
Peer string
}
func (f *connectionFactory) MakeObject(ctx context.Context) (*pool.PooledObject, error) {}
func (f *connectionFactory) DestroyObject(ctx context.Context, object *pool.PooledObject) error {}
func (f *connectionFactory) ValidateObject(ctx context.Context, object *pool.PooledObject) bool {}
func (f *connectionFactory) ActivateObject(ctx context.Context, object *pool.PooledObject) error {}
func (f *connectionFactory) PassivateObject(ctx context.Context, object *pool.PooledObject) error {}实现ClusterDatabase
需要对原来单体的Database进行进一步封装,新建文件
cluster/cluster_database.go
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35// ClusterDatabase represents a node of godis cluster
// it holds part of data and coordinates other nodes to finish transactions
type ClusterDatabase struct {
self string
nodes []string
peerPicker *consistenthash.NodeMap
peerConnection map[string]*pool.ObjectPool
db databaseface.Database
}
// MakeClusterDatabase creates and starts a node of cluster
func MakeClusterDatabase() *ClusterDatabase {
cluster := &ClusterDatabase{
self: config.Properties.Self,
db: database.NewStandaloneDatabase(),
peerPicker: consistenthash.NewNodeMap(nil),
peerConnection: make(map[string]*pool.ObjectPool),
}
nodes := make([]string, 0, len(config.Properties.Peers)+1)
for _, peer := range config.Properties.Peers {
nodes = append(nodes, peer)
}
nodes = append(nodes, config.Properties.Self)
cluster.peerPicker.AddNode(nodes...)
ctx := context.Background()
for _, peer := range config.Properties.Peers {
cluster.peerConnection[peer] = pool.NewObjectPoolWithDefaultConfig(ctx, &connectionFactory{
Peer: peer,
})
}
cluster.nodes = nodes
return cluster
}另外还需实现以下几个方法。
1
2
3
4
5
6
7
8// Close stops current node of cluster
func (cluster *ClusterDatabase) Close() {}
// Exec executes command on cluster
func (cluster *ClusterDatabase) Exec(c resp.Connection, cmdLine [][]byte) (result resp.Reply) {}
// AfterClientClose does some clean after client close connection
func (cluster *ClusterDatabase) AfterClientClose(c resp.Connection) {}操作连接池
cluster/com.go
1
2
3
4
5
6
7
8
9
10
11func (cluster *ClusterDatabase) getPeerClient(peer string) (*client.Client, error) {}
func (cluster *ClusterDatabase) returnPeerClient(peer string, peerClient *client.Client) error {}
// relay relays command to peer
// select db by c.GetDBIndex()
// cannot call Prepare, Commit, execRollback of self node
func (cluster *ClusterDatabase) relay(peer string, c resp.Connection, args [][]byte) resp.Reply {}
// broadcast broadcasts command to all node in cluster
func (cluster *ClusterDatabase) broadcast(c resp.Connection, args [][]byte) map[string]resp.Reply {}指令路由
需要根据不同的指令,操作对应的节点,这里需要有一个全局的方式,找到执行指令的相关函数,新建文件
cluster/router.go
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27func makeRouter() map[string]CmdFunc {
routerMap := make(map[string]CmdFunc)
routerMap["ping"] = ping
routerMap["del"] = Del
routerMap["exists"] = defaultFunc
routerMap["type"] = defaultFunc
routerMap["rename"] = Rename
routerMap["renamenx"] = Rename
routerMap["set"] = defaultFunc
routerMap["setnx"] = defaultFunc
routerMap["get"] = defaultFunc
routerMap["getset"] = defaultFunc
routerMap["flushdb"] = FlushDB
return routerMap
}
// relay command to responsible peer, and return its reply to client
func defaultFunc(cluster *ClusterDatabase, c resp.Connection, args [][]byte) resp.Reply {
key := string(args[1])
peer := cluster.peerPicker.PickNode(key)
return cluster.relay(peer, c, args)
}修改特殊的指令操作,分别在下述文件中添加对应的函数。
cluster/ping.go
:直接执行即可cluster/del.go
:需要广播至集群的所有节点cluster/rename.go
:需要找到键所在的节点执行,并处理重命名后更换节点的情况cluster/keys.go
:需要广播至集群的所有节点1
2
3
4
5
6
7
8
9
10
11func ping(cluster *ClusterDatabase, c resp.Connection, cmdAndArgs [][]byte) resp.Reply {}
// Del atomically removes given writeKeys from cluster, writeKeys can be distributed on any node
// if the given writeKeys are distributed on different node, Del will use try-commit-catch to remove them
func Del(cluster *ClusterDatabase, c resp.Connection, args [][]byte) resp.Reply {}
// Rename renames a key, the origin and the destination must within the same node
func Rename(cluster *ClusterDatabase, c resp.Connection, args [][]byte) resp.Reply {}
// FlushDB removes all data in current database
func FlushDB(cluster *ClusterDatabase, c resp.Connection, args [][]byte) resp.Reply {}启用集群模式
要使用集群需要修改
resp/handler/handler.go
中初始化Database的逻辑。1
2
3
4
5
6
7
8
9
10
11
12
13
14// MakeHandler creates a RespHandler instance
func MakeHandler() *RespHandler {
var db databaseface.Database
if config.Properties.Self != "" &&
len(config.Properties.Peers) > 0 {
db = cluster.MakeClusterDatabase()
} else {
db = database.NewStandaloneDatabase()
}
return &RespHandler{
db: db,
}
}总结
至此,基于go语言实现redis的项目已基本完成了。项目中有许多值得学习的地方,例如整个架构的设计,整体功能的切分及具体实现。每一步的过程都需要有自己的思考,而不是按部就班,随便记录以下就完了,记录下这个系列的文章,也是为了以后的回顾,温故而知新才能不断的加深影响提高自己。不过项目也依然有很多需要优化的地方,这个可以参考开源的项目godis。