背景
本系列文章记录如何基于go语言实现Redis,从整体设计到具体细节实现,不仅可以有效的锻炼自己的编码能力,又能加深对redis的认识。文章主要从整体设计思路入手,记录关键的设计步骤,详细的实现可以参考github上的相关代码。主体上有下面几个部分:
**TCP服务器(一)**:支持同时监听多个TCP连接,并进行相关处理
**Redis协议解析器(一)**:实现相关Handler,命令解析及响应处理
内存数据库(二)
Redis持久化(三)
Redis集群(四)
实现TCP服务器
项目初始化,主要包括相关配置,日志处理,以及定义相关接口或结构体。当前目录结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16├─config
│ config.go
│
├─interface
│ └─tcp
│ handler.go
│
├─lib
│
├─tcp
│ echo.go
│ server.go
│
| go.mod
| main.go
| redis.conf项目初始化
配置文件解析
在
config/config.go
文件中定义Redis相关服务端属性如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18package config
// ServerProperties defines global config properties
type ServerProperties struct {
Bind string `cfg:"bind"`
Port int `cfg:"port"`
AppendOnly bool `cfg:"appendOnly"`
AppendFilename string `cfg:"appendFilename"`
MaxClients int `cfg:"maxclients"`
RequirePass string `cfg:"requirepass"`
Databases int `cfg:"databases"`
Peers []string `cfg:"peers"`
Self string `cfg:"self"`
}
// Properties holds global config properties
var Properties *ServerProperties另外,还需要实现配置文件解析的相关方法:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17// SetupConfig read config file and store properties into Properties
func SetupConfig(configFilename string) {
file, err := os.Open(configFilename)
if err != nil {
panic(err)
}
defer file.Close()
Properties = parse(file)
}
func parse(src io.Reader) *ServerProperties {
config := &ServerProperties{}
...
...
...
return config
}为了防止没有配置文件的情况下也能正常初始化,可以添加如下代码:
1
2
3
4
5
6
7
8func init() {
// default config
Properties = &ServerProperties{
Bind: "127.0.0.1",
Port: 6379,
AppendOnly: false,
}
}接口定义
在
interface/tcp/handler.go
中需要定义相关接口,以规范化处理tcp的连接:1
2
3
4
5// Handler represents application server over tcp
type Handler interface {
Handle(ctx context.Context, conn net.Conn)
Close() error
}TCP服务实现
并发处理tcp连接
在
tcp/server.go
中实现以下两个函数,用于处理tcp连接,此处主要依赖标准库net
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34// Config stores tcp handler properties
type Config struct {
Address string `yaml:"address"`
MaxConnect uint32 `yaml:"max-connect"`
Timeout time.Duration `yaml:"timeout"`
}
// ListenAndServeWithSignal binds port and handle requests, blocking until receive stop signal
func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
closeChan := make(chan struct{})
...
listener, err := net.Listen("tcp", cfg.Address)
...
ListenAndServe(listener, handler, closeChan)
return nil
}
// ListenAndServe binds port and handle requests, blocking until close
func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) {
go func(){
<-closeChan
listener.Close()
handler.Close()
}
...
for {
conn, err := listener.Accept()
...
go func() {
handler.Handle(ctx, conn)
}()
}
....
}main函数入口实现
此时
main.go
中需要调用的函数已基本实现,后续修改只需传入对应的Handler即可:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48package main
import (
"fmt"
"go-redis/config"
"go-redis/lib/logger"
"go-redis/tcp"
EchoHandler "go-redis/tcp"
"os"
)
const configFile string = "redis.conf"
var defaultProperties = &config.ServerProperties{
Bind: "0.0.0.0",
Port: 6379,
}
func fileExists(filename string) bool {
info, err := os.Stat(filename)
return err == nil && !info.IsDir()
}
func main() {
logger.Setup(&logger.Settings{
Path: "logs",
Name: "godis",
Ext: "log",
TimeFormat: "2006-01-02",
})
if fileExists(configFile) {
config.SetupConfig(configFile)
} else {
config.Properties = defaultProperties
}
err := tcp.ListenAndServeWithSignal(
&tcp.Config{
Address: fmt.Sprintf("%s:%d",
config.Properties.Bind,
config.Properties.Port),
},
EchoHandler.MakeHandler())
if err != nil {
logger.Error(err)
}
}echoHandler示例
新建文件
tcp/echo.go
,定义结构体EchoHandler
与EchoClient
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23// EchoHandler echos received line to client, using for test
type EchoHandler struct {
activeConn sync.Map
closing atomic.Boolean
}
// MakeHandler creates EchoHandler
func MakeHandler() *EchoHandler {
return &EchoHandler{}
}
// EchoClient is client for EchoHandler, using for test
type EchoClient struct {
Conn net.Conn
Waiting wait.Wait
}
// Close close connection
func (c *EchoClient) Close() error {
c.Waiting.WaitWithTimeout(10 * time.Second)
c.Conn.Close()
return nil
}EchoHandler
实现了接口Handler
,主要是将接收到的数据原样返回,详情如下:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// Handle echos received line to client
func (h *EchoHandler) Handle(ctx context.Context, conn net.Conn) {
...
reader := bufio.NewReader(conn)
for {
// may occurs: client EOF, client timeout, server early close
msg, err := reader.ReadString('\n')
....
b := []byte(msg)
_, _ = conn.Write(b)
....
}
}
// Close stops echo handler
func (h *EchoHandler) Close() error {
h.closing.Set(true)
h.activeConn.Range(func(key interface{}, val interface{}) bool {
client := key.(*EchoClient)
_ = client.Close()
return true
})
return nil
}实现Redis协议解析器
本章节主要新增了redis相关命令的解析,具体执行,以及响应处理等,相关目录结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41├─config
│ config.go
│
├─database
│ echo_database.go
│
├─interface
│ ├─database
│ │ database.go
│ │
│ ├─resp
│ │ conn.go
│ │ reply.go
│ │
│ └─tcp
│ handler.go
│
├─lib
│
├─resp
│ ├─connection
│ │ conn.go
│ │
│ ├─handler
│ │ handler.go
│ │
│ ├─parser
│ │ parser.go
│ │
│ └─reply
│ consts.go
│ errors.go
│ reply.go
│
└─tcp
│ echo.go
│ server.go
│
| go.mod
| main.go
| redis.confredis网络协议认识
正常回复:以”+”开头,以”\r\n”结尾的字符串形式
错误回复:以”-“开头,以”\r\n”结尾的字符串形式
整数:以”:”开头,以”\r\n”结尾的字符串形式
多行字符串:以”$”开头,后面跟实际发送的字节数,以”\r\n”结尾
数组:以”*”开头,后面跟成员个数
接口定义及实现
Connection
在
interface/resp/conn.go
中定义连接,主要包含三个方法,写数据、获取当前所在数据库的索引,以及选择数据库:1
2
3
4
5
6// Connection represents a connection with redis client
type Connection interface {
Write([]byte) error
GetDBIndex() int // used for multi database
SelectDB(int)
}在
resp/connection/conn.go
中创建Connection结构体并实现Connection接口1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26// Connection represents a connection with a redis-cli
type Connection struct {
conn net.Conn
waitingReply wait.Wait // waiting until reply finished
mu sync.Mutex // lock while handler sending response
selectedDB int // selected db
}
func NewConn(conn net.Conn) *Connection {
return &Connection{conn: conn,}
}
// RemoteAddr returns the remote network address
func (c *Connection) RemoteAddr() net.Addr {}
// Close disconnect with the client
func (c *Connection) Close() error {}
// Write sends response to client over tcp connection
func (c *Connection) Write(b []byte) error {}
// GetDBIndex returns selected db
func (c *Connection) GetDBIndex() int {}
// SelectDB selects a database
func (c *Connection) SelectDB(dbNum int) {}Reply
在
interface/resp/reply.go
中定义接口Reply,用于处理基于resp协议的响应,主要包含ToBytes
方法:1
2
3
4
5
6
7
8
9
10// Reply is the interface of redis serialization protocol message
type Reply interface {
ToBytes() []byte
}
// ErrorReply is an error and redis.Reply
type ErrorReply interface {
Error() string
ToBytes() []byte
}在
resp/reply
下创建以下三个文件,分别用于实现具体的相关处理:consts.go
:定义一些固定不变的响应reply.go
:定义正常执行命令时的响应errors.go
:定义发生错误时的响应
consts.go
中的文件内容如下,其中的结构体都需要实现Reply接口:
1 | // PongReply is +PONG |
reply.go
中的文件内容如下,其中的结构体都需要实现Reply接口:
1 | // BulkReply stores a binary-safe string |
errors.go
中的文件内容如下,其中的结构体都需要实现Reply接口:
1 | // UnknownErrReply represents UnknownErr |
Database
在interface/database/database.go
定义接口Database,规范不同的数据库实现:
1 | // Database is the interface for redis style storage engine |
命令解析实现
在文件resp/parser/parser.go
实现下面的功能:
响应流式处理
Payload
:包含正常响应或错误readState
:解析器状态ParseStream
:返回数据类型为Payload
的channel1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24// Payload stores redis.Reply or error
type Payload struct {
Data resp.Reply
Err error
}
type readState struct {
readingMultiLine bool
expectedArgsCount int
msgType byte
args [][]byte
bulkLen int64
}
func (s *readState) finished() bool {
return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
}
// ParseStream reads data from io.Reader and send payloads through channel
func ParseStream(reader io.Reader) <-chan *Payload {
ch := make(chan *Payload)
go parse0(reader, ch)
return ch
}命令读取及具体解析
readLine
:精确读取一行数据parseMultiBulkHeader
:处理多行数据(“*”号开始),改变解析器状态parseBulkHeader
:处理单个数据(“$”号开始),改变解析器状态parseSingleLineReply
:处理客户端发送+ok -err :5的情况1
2
3
4
5
6
7func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {}
func parseMultiBulkHeader(msg []byte, state *readState) error {}
func parseBulkHeader(msg []byte, state *readState) error {}
func parseSingleLineReply(msg []byte) (resp.Reply, error) {}解析实现
readBody
:去除其他字符,解析命令内容parse0
:根据读取到的数据调用不同的解析方法,并将结果返回Payload的channel1
2
3
4// read the non-first lines of multi bulk reply or bulk reply
func readBody(msg []byte, state *readState) error {}
func parse0(reader io.Reader, ch chan<- *Payload) {}实现RespHandler
在TCP层面处理用户发过来的数据,并调用resp协议解析器进行解析,然后根据对应的db去实际执行相关的命令,主要在Handle方法中实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23// RespHandler implements tcp.Handler and serves as a redis handler
type RespHandler struct {
activeConn sync.Map // *client -> placeholder
db databaseface.Database
closing atomic.Boolean // refusing new client and new request
}
// MakeHandler creates a RespHandler instance
func MakeHandler() *RespHandler {
var db databaseface.Database
db = database.NewEchoDatabase()
return &RespHandler{
db: db,
}
}
func (h *RespHandler) closeClient(client *connection.Connection) {}
// Handle receives and executes redis commands
func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {}
// Close stops handler
func (h *RespHandler) Close() error {}EchoDatabase示例
在
database/echo_database.go
中简单实现一个数据库,其实现了Database接口。这里只是方便做简单测试,下一篇文章将记录如何实现redis的内存数据库:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18type EchoDatabase struct {}
func NewEchoDatabase() *EchoDatabase {
return &EchoDatabase{}
}
func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
return reply.MakeMultiBulkReply(args)
}
func (e EchoDatabase) AfterClientClose(c resp.Connection) {
logger.Info("EchoDatabase AfterClientClose")
}
func (e EchoDatabase) Close() {
logger.Info("EchoDatabase Close")
}