项目实战:go语言实现redis(二)

背景

本系列文章记录如何基于go语言实现Redis,从整体设计到具体细节实现,不仅可以有效的锻炼自己的编码能力,又能加深对redis的认识。文章主要从整体设计思路入手,记录关键的设计步骤,详细的实现可以参考github上的相关代码。主体上有下面几个部分:

  • TCP服务器(一):支持同时监听多个TCP连接,并进行相关处理
  • Redis协议解析器(一):实现相关Handler,命令解析及响应处理
  • **内存数据库(二)**:实现数据库,注册相关命令,完成支持对数据库的增删改查
  • Redis持久化(三)
  • Redis集群(四)

本章的项目目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
├─config

├─database
│ command.go
│ database.go
│ db.go
│ echo_database.go
│ keys.go
│ ping.go
│ string.go

├─datastruct
│ └─dict
│ simple_dict.go
│ sync_dict.go

├─interface
│ ├─database
│ │
│ ├─dict
│ │ dict.go
│ │
│ ├─resp
│ │
│ └─tcp

├─lib

├─resp
│ ├─connection
│ │
│ ├─handler
│ │
│ ├─parser
│ │
│ └─reply

├─tcp

│ go.mod
│ main.go
│ redis.conf

数据底层存储

定义及实现Dict接口

interface/dict/dict.go中定义Dict接口,包含下述方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Consumer is used to traversal dict, if it returns false the traversal will be break
type Consumer func(key string, val interface{}) bool

// Dict is interface of a key-value data structure
type Dict interface {
Get(key string) (val interface{}, exists bool)
Len() int
Put(key string, val interface{}) (result int)
PutIfAbsent(key string, val interface{}) (result int)
PutIfExists(key string, val interface{}) (result int)
Remove(key string) (result int)
ForEach(consumer Consumer)
Keys() []string
RandomKeys(limit int) []string
RandomDistinctKeys(limit int) []string
Clear()
}

然后在datastruct/dict/sync_dict.go中实现一个并发安全的dict。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// SyncDict wraps a map, it is not thread safe
type SyncDict struct {
m sync.Map
}

// MakeSyncDict makes a new map
func MakeSyncDict() *SyncDict {
return &SyncDict{}
}

// Get returns the binding value and whether the key is exist
func (dict *SyncDict) Get(key string) (val interface{}, exists bool) {}

// Len returns the number of dict
func (dict *SyncDict) Len() int {}

// Put puts key value into dict and returns the number of new inserted key-value
func (dict *SyncDict) Put(key string, val interface{}) (result int) {}

// PutIfAbsent puts value if the key is not exists and returns the number of updated key-value
func (dict *SyncDict) PutIfAbsent(key string, val interface{}) (result int) {}

// PutIfExists puts value if the key is exist and returns the number of inserted key-value
func (dict *SyncDict) PutIfExists(key string, val interface{}) (result int) {}

// Remove removes the key and return the number of deleted key-value
func (dict *SyncDict) Remove(key string) (result int) {}

// Keys returns all keys in dict
func (dict *SyncDict) Keys() []string {}

// ForEach traversal the dict
func (dict *SyncDict) ForEach(consumer iDict.Consumer) {}

// RandomKeys randomly returns keys of the given number, may contain duplicated key
func (dict *SyncDict) RandomKeys(limit int) []string {}

// RandomDistinctKeys randomly returns keys of the given number, won't contain duplicated key
func (dict *SyncDict) RandomDistinctKeys(limit int) []string {}

// Clear removes all keys in dict
func (dict *SyncDict) Clear() {}

注:非并发安全的实现可参考datastruct/dict/simple_dict.go

定义DB结构体

database/db.go定义DB结构体,它是对底层Dict相关操作的进一步封装,根据接收到的实际命令找到对应的方法并执行,其核心是Exec方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// DB stores data and execute user's commands
type DB struct {
index int
data dict.Dict // key -> DataEntity
}

// makeDB create DB instance
func makeDB() *DB {
db := &DB{data: dict.MakeSyncDict()}
return db
}

// Exec executes command within one database
func (db *DB) Exec(c resp.Connection, cmdLine [][]byte) resp.Reply {}

// GetEntity returns DataEntity bind to given key
func (db *DB) GetEntity(key string) (*database.DataEntity, bool) {}

// PutEntity a DataEntity into DB
func (db *DB) PutEntity(key string, entity *database.DataEntity) int {}

// PutIfExists edit an existing DataEntity
func (db *DB) PutIfExists(key string, entity *database.DataEntity) int {}

// Remove the given key from db
func (db *DB) Remove(key string) {}

// Removes the given keys from db
func (db *DB) Removes(keys ...string) (deleted int) {}

// Flush clean database
func (db *DB) Flush() {}

func validateArity(arity int, cmdArgs [][]byte) bool {}

命令注册及实现

命令的注册

database/command.go主要包含以下三个部分:

  • cmdTable:类型为字典,功能为存储命令及对应命令的结构体
  • command:类型为结构体,包含命令对应的实际方法以及参数数量
  • RegisterCommand:类型为一个函数,用于实现命令注册全局表中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    var cmdTable = make(map[string]*command)

    type command struct {
    executor ExecFunc
    arity int // allow number of args, arity < 0 means len(args) >= -arity
    }

    // RegisterCommand registers a new command
    // arity means allowed number of cmdArgs, arity < 0 means len(args) >= -arity.
    // for example: the arity of `get` is 2, `mget` is -2
    func RegisterCommand(name string, executor ExecFunc, arity int) {
    name = strings.ToLower(name)
    cmdTable[name] = &command{
    executor: executor,
    arity: arity,
    }
    }

    命令的实现

    ping的实现与注册

    在文件database/ping.go中的init()方法下将命令及其对应的处理函数注册到全局表中:
    1
    2
    3
    4
    5
    6
    func init() {
    RegisterCommand("ping", Ping, -1)
    }

    // Ping the server
    func Ping(db *DB, args [][]byte) resp.Reply {}

    keys指令集的实现与注册

    在文件database/keys.go中的init()方法下将命令及其对应的处理函数注册到全局表中:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    func init() {
    RegisterCommand("Del", execDel, -2)
    RegisterCommand("Exists", execExists, -2)
    RegisterCommand("Keys", execKeys, 2)
    RegisterCommand("FlushDB", execFlushDB, -1)
    RegisterCommand("Type", execType, 2)
    RegisterCommand("Rename", execRename, 3)
    RegisterCommand("RenameNx", execRenameNx, 3)
    }
    并实现相关函数:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    // execDel removes a key from db
    func execDel(db *DB, args [][]byte) resp.Reply {}

    // execExists checks if a is existed in db
    func execExists(db *DB, args [][]byte) resp.Reply {}

    // execFlushDB removes all data in current db
    func execFlushDB(db *DB, args [][]byte) resp.Reply {}

    // execType returns the type of entity, including: string, list, hash, set and zset
    func execType(db *DB, args [][]byte) resp.Reply {}

    // execRename a key
    func execRename(db *DB, args [][]byte) resp.Reply {}

    // execRenameNx a key, only if the new key does not exist
    func execRenameNx(db *DB, args [][]byte) resp.Reply {}

    // execKeys returns all keys matching the given pattern
    func execKeys(db *DB, args [][]byte) resp.Reply {}

    string指令集的实现与注册

    在文件database/string.go中的init()方法下将命令及其对应的处理函数注册到全局表中:
    1
    2
    3
    4
    5
    6
    7
    func init() {
    RegisterCommand("Get", execGet, 2)
    RegisterCommand("Set", execSet, -3)
    RegisterCommand("SetNx", execSetNX, 3)
    RegisterCommand("GetSet", execGetSet, 3)
    RegisterCommand("StrLen", execStrLen, 2)
    }
    并实现相关函数:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    func (db *DB) getAsString(key string) ([]byte, reply.ErrorReply) {}

    // execGet returns string value bound to the given key
    func execGet(db *DB, args [][]byte) resp.Reply {}

    // execSet sets string value and time to live to the given key
    func execSet(db *DB, args [][]byte) resp.Reply {}

    // execSetNX sets string if not exists
    func execSetNX(db *DB, args [][]byte) resp.Reply {}

    // execGetSet sets value of a string-type key and returns its old value
    func execGetSet(db *DB, args [][]byte) resp.Reply {}

    // execStrLen returns len of string value bound to the given key
    func execStrLen(db *DB, args [][]byte) resp.Reply {}

    实现数据库核心

    database/database.go中定义结构体Database,需实现第一篇文章中定义的Database接口,并进行初始化,创建16个数据库表,另外还需要注意执行命令时所在db的选择,这里封装了一个选择db的函数execSelect
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    // Database is a set of multiple database set
    type Database struct {
    dbSet []*DB
    }

    // NewDatabase creates a redis database,
    func NewDatabase() *Database {
    mdb := &Database{}
    if config.Properties.Databases == 0 {
    config.Properties.Databases = 16
    }
    mdb.dbSet = make([]*DB, config.Properties.Databases)
    for i := range mdb.dbSet {
    singleDB := makeDB()
    singleDB.index = i
    mdb.dbSet[i] = singleDB
    }
    return mdb
    }

    // Exec executes command
    // parameter `cmdLine` contains command and its arguments, for example: "set key value"
    func (mdb *Database) Exec(c resp.Connection, cmdLine [][]byte) (result resp.Reply) {}

    // Close graceful shutdown database
    func (mdb *Database) Close() {}

    func (mdb *Database) AfterClientClose(c resp.Connection) {}

    func execSelect(c resp.Connection, mdb *Database, args [][]byte) resp.Reply {}
    然后将resp/handler/handler.go中的初始化数据库改为上述实现的数据库即可。

    总结

    到目前为止一个单体的redis应用已基本完成,可以暂不关注每个方法的具体实现,但是一定要理解整个调用逻辑,做到融会贯通。首先处理TCP连接,选择对应的handler,由handler初始化database,同时对conn封装,选择对应的数据库执行命令,在项目一开始运行的时候会将代码中已实现的命令注册到全局表中。接下来将实现redis的持久化与集群的相关功能。