项目实战:go语言实现redis(三)

背景

本系列文章记录如何基于go语言实现Redis,从整体设计到具体细节实现,不仅可以有效的锻炼自己的编码能力,又能加深对redis的认识。文章主要从整体设计思路入手,记录关键的设计步骤,详细的实现可以参考github上的相关代码。主体上有下面几个部分:

  • TCP服务器(一):支持同时监听多个TCP连接,并进行相关处理
  • Redis协议解析器(一):实现相关Handler,命令解析及响应处理
  • 内存数据库(二):实现数据库,注册相关命令,完成支持对数据库的增删改查
  • **Redis持久化(三)**:实现redis中的持久化功能aof
  • Redis集群(四)

本章的项目目录结构如下,主要是在前一篇文章新增了aof相关文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
├─aof
│ aof.go

├─config

├─database

├─datastruct
│ └─dict

├─interface
│ ├─database
│ │
│ ├─dict
│ │
│ ├─resp
│ │
│ └─tcp

├─lib

├─resp
│ ├─connection
│ │
│ ├─handler
│ │
│ ├─parser
│ │
│ └─reply

├─tcp

│ go.mod
│ main.go
│ redis.conf

命令记录与恢复

需要实现一个AofHandler结构体,由它封装命令保存及数据恢复的相关方法,然后在初始化Database的时候将AofHandler进行注册。在文件aof/aof.go中的实现如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// CmdLine is alias for [][]byte, represents a command line
type CmdLine = [][]byte

type payload struct {
cmdLine CmdLine
dbIndex int
}

// AofHandler receive msgs from channel and write to AOF file
type AofHandler struct {
db databaseface.Database
aofChan chan *payload
aofFile *os.File
aofFilename string
currentDB int
}

另外初始化AofHandler的时候还需要考虑根据已有的Aof文件进行恢复,以及启用一个协程不断的记录执行过的命令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// NewAOFHandler creates a new aof.AofHandler
func NewAOFHandler(db databaseface.Database) (*AofHandler, error) {
handler := &AofHandler{}
handler.aofFilename = config.Properties.AppendFilename
handler.db = db
handler.LoadAof()
aofFile, err := os.OpenFile(handler.aofFilename, os.O_APPEND|os.O_CREATE|os.O_RDWR, 0600)
if err != nil {
return nil, err
}
handler.aofFile = aofFile
handler.aofChan = make(chan *payload, aofQueueSize)
go func() {
handler.handleAof()
}()
return handler, nil
}

另外还需实现下述方法:

  • AddAof:如果配置中开启Aof的话,将执行的命令发送到channel中
  • handleAof:从channel中读取数据,并写入到文件当中
  • LoadAof:从文件中读取命令,然后执行
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // AddAof send command to aof goroutine through channel
    func (handler *AofHandler) AddAof(dbIndex int, cmdLine CmdLine) {}

    // handleAof listen aof channel and write into file
    func (handler *AofHandler) handleAof() {}

    // LoadAof read aof file
    func (handler *AofHandler) LoadAof() {}

    底层指令集修改

    首先修改database/db.go中的结构体DB,新增属性addAof,以便指令集中的方法能够调用到addAof,需要修改的地方如下,另外初始化该方法的工作将在后续完成。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // DB stores data and execute user's commands
    type DB struct {
    index int
    data iDict.Dict // key -> DataEntity
    addAof func(CmdLine)
    }

    // makeDB create DB instance
    func makeDB() *DB {
    db := &DB{
    data: dict.MakeSyncDict(),
    addAof: func(line CmdLine) {},
    }
    return db
    }
    database/string.go中对应的需要记录该指令的方法中添加Addof
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    // execSet sets string value and time to live to the given key
    func execSet(db *DB, args [][]byte) resp.Reply {
    ...
    db.addAof(utils.ToCmdLine2("set", args...))
    ...
    }

    // execSetNX sets string if not exists
    func execSetNX(db *DB, args [][]byte) resp.Reply {
    ...
    db.addAof(utils.ToCmdLine2("setnx", args...))
    ...
    }

    ...

    注:同样的database/keys.go中的相关方法也许调用Addof方法。

调用AofHandler

初始化database时将AofHandler注册到Database结构体中,然后把方法Addof赋值给DB结构体,这样上述指令集就能调用到该方法了。文件database/databse.go中需要修改的地方如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Database is a set of multiple database set
type Database struct {
dbSet []*DB
aofHandler *aof.AofHandler // handle aof persistence
}

// NewDatabase creates a redis database,
func NewDatabase() *Database {
...
if config.Properties.AppendOnly {
aofHandler, err := aof.NewAOFHandler(mdb)
if err != nil {
panic(err)
}
mdb.aofHandler = aofHandler
for _, db := range mdb.dbSet {
// avoid closure
singleDB := db
singleDB.addAof = func(line CmdLine) {
mdb.aofHandler.AddAof(singleDB.index, line)
}
}
}
return mdb
}

注:上述for循环遍历数据库的切片时,需要暂时把db赋值给一个临时变量,不然拿到的db都是最后一个值,这是go语言中循环变量的作用域导致的,需要注意。

  • 在go语言的for循环中,循环内部创建的函数变量都是共享同一内存地址,for循环总是使用同一块内存去接收循环中的变量的值。不管循环多少次,变量的内存地址都是相同的。
  • 此处使用的解决方法就是用一个临时变量进行赋值保存记录。

总结

redis支持两种持久化的方式,一种是aof,它对数据有修改的相关指令记录到文件中,重新执行这些命令达到数据恢复的效果。另一种是rdb,这种方式是记录了内存快照,在指定的时间间隔内,将内存中的数据写入到磁盘中,就是在指定目录下生产一个dump.rdb文件,通过加载该文件进行恢复数据。本文基于go语言实现了aof持久化功能,通过可插拔的方式集成到之前已基本实现的单体redis当中。接下来将实现redis的集群模式。