项目实战:go语言实现redis(一)

背景

本系列文章记录如何基于go语言实现Redis,从整体设计到具体细节实现,不仅可以有效的锻炼自己的编码能力,又能加深对redis的认识。文章主要从整体设计思路入手,记录关键的设计步骤,详细的实现可以参考github上的相关代码。主体上有下面几个部分:

  • **TCP服务器(一)**:支持同时监听多个TCP连接,并进行相关处理

  • **Redis协议解析器(一)**:实现相关Handler,命令解析及响应处理

  • 内存数据库(二)

  • Redis持久化(三)

  • Redis集群(四)

    实现TCP服务器

    项目初始化,主要包括相关配置,日志处理,以及定义相关接口或结构体。当前目录结构如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    ├─config
    │ config.go

    ├─interface
    │ └─tcp
    │ handler.go

    ├─lib

    ├─tcp
    │ echo.go
    │ server.go

    | go.mod
    | main.go
    | redis.conf

    项目初始化

    配置文件解析

    config/config.go文件中定义Redis相关服务端属性如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    package config

    // ServerProperties defines global config properties
    type ServerProperties struct {
    Bind string `cfg:"bind"`
    Port int `cfg:"port"`
    AppendOnly bool `cfg:"appendOnly"`
    AppendFilename string `cfg:"appendFilename"`
    MaxClients int `cfg:"maxclients"`
    RequirePass string `cfg:"requirepass"`
    Databases int `cfg:"databases"`

    Peers []string `cfg:"peers"`
    Self string `cfg:"self"`
    }

    // Properties holds global config properties
    var Properties *ServerProperties

    另外,还需要实现配置文件解析的相关方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    // SetupConfig read config file and store properties into Properties
    func SetupConfig(configFilename string) {
    file, err := os.Open(configFilename)
    if err != nil {
    panic(err)
    }
    defer file.Close()
    Properties = parse(file)
    }

    func parse(src io.Reader) *ServerProperties {
    config := &ServerProperties{}
    ...
    ...
    ...
    return config
    }

    为了防止没有配置文件的情况下也能正常初始化,可以添加如下代码:

    1
    2
    3
    4
    5
    6
    7
    8
    func init() {
    // default config
    Properties = &ServerProperties{
    Bind: "127.0.0.1",
    Port: 6379,
    AppendOnly: false,
    }
    }

    接口定义

    interface/tcp/handler.go中需要定义相关接口,以规范化处理tcp的连接:

    1
    2
    3
    4
    5
    // Handler represents application server over tcp
    type Handler interface {
    Handle(ctx context.Context, conn net.Conn)
    Close() error
    }

    TCP服务实现

    并发处理tcp连接

    tcp/server.go中实现以下两个函数,用于处理tcp连接,此处主要依赖标准库net

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    // Config stores tcp handler properties
    type Config struct {
    Address string `yaml:"address"`
    MaxConnect uint32 `yaml:"max-connect"`
    Timeout time.Duration `yaml:"timeout"`
    }

    // ListenAndServeWithSignal binds port and handle requests, blocking until receive stop signal
    func ListenAndServeWithSignal(cfg *Config, handler tcp.Handler) error {
    closeChan := make(chan struct{})
    ...
    listener, err := net.Listen("tcp", cfg.Address)
    ...
    ListenAndServe(listener, handler, closeChan)
    return nil
    }

    // ListenAndServe binds port and handle requests, blocking until close
    func ListenAndServe(listener net.Listener, handler tcp.Handler, closeChan <-chan struct{}) {
    go func(){
    <-closeChan
    listener.Close()
    handler.Close()
    }
    ...
    for {
    conn, err := listener.Accept()
    ...
    go func() {
    handler.Handle(ctx, conn)
    }()
    }
    ....
    }

    main函数入口实现

    此时main.go中需要调用的函数已基本实现,后续修改只需传入对应的Handler即可:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    package main

    import (
    "fmt"
    "go-redis/config"
    "go-redis/lib/logger"
    "go-redis/tcp"
    EchoHandler "go-redis/tcp"
    "os"
    )

    const configFile string = "redis.conf"

    var defaultProperties = &config.ServerProperties{
    Bind: "0.0.0.0",
    Port: 6379,
    }

    func fileExists(filename string) bool {
    info, err := os.Stat(filename)
    return err == nil && !info.IsDir()
    }

    func main() {
    logger.Setup(&logger.Settings{
    Path: "logs",
    Name: "godis",
    Ext: "log",
    TimeFormat: "2006-01-02",
    })

    if fileExists(configFile) {
    config.SetupConfig(configFile)
    } else {
    config.Properties = defaultProperties
    }

    err := tcp.ListenAndServeWithSignal(
    &tcp.Config{
    Address: fmt.Sprintf("%s:%d",
    config.Properties.Bind,
    config.Properties.Port),
    },
    EchoHandler.MakeHandler())
    if err != nil {
    logger.Error(err)
    }
    }

    echoHandler示例

    新建文件tcp/echo.go,定义结构体EchoHandlerEchoClient

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // EchoHandler echos received line to client, using for test
    type EchoHandler struct {
    activeConn sync.Map
    closing atomic.Boolean
    }

    // MakeHandler creates EchoHandler
    func MakeHandler() *EchoHandler {
    return &EchoHandler{}
    }

    // EchoClient is client for EchoHandler, using for test
    type EchoClient struct {
    Conn net.Conn
    Waiting wait.Wait
    }

    // Close close connection
    func (c *EchoClient) Close() error {
    c.Waiting.WaitWithTimeout(10 * time.Second)
    c.Conn.Close()
    return nil
    }

    EchoHandler实现了接口Handler,主要是将接收到的数据原样返回,详情如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // Handle echos received line to client
    func (h *EchoHandler) Handle(ctx context.Context, conn net.Conn) {
    ...
    reader := bufio.NewReader(conn)
    for {
    // may occurs: client EOF, client timeout, server early close
    msg, err := reader.ReadString('\n')
    ....
    b := []byte(msg)
    _, _ = conn.Write(b)
    ....
    }
    }

    // Close stops echo handler
    func (h *EchoHandler) Close() error {
    h.closing.Set(true)
    h.activeConn.Range(func(key interface{}, val interface{}) bool {
    client := key.(*EchoClient)
    _ = client.Close()
    return true
    })
    return nil
    }

    实现Redis协议解析器

    本章节主要新增了redis相关命令的解析,具体执行,以及响应处理等,相关目录结构如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    ├─config
    │ config.go

    ├─database
    │ echo_database.go

    ├─interface
    │ ├─database
    │ │ database.go
    │ │
    │ ├─resp
    │ │ conn.go
    │ │ reply.go
    │ │
    │ └─tcp
    │ handler.go

    ├─lib

    ├─resp
    │ ├─connection
    │ │ conn.go
    │ │
    │ ├─handler
    │ │ handler.go
    │ │
    │ ├─parser
    │ │ parser.go
    │ │
    │ └─reply
    │ consts.go
    │ errors.go
    │ reply.go

    └─tcp
    │ echo.go
    │ server.go

    | go.mod
    | main.go
    | redis.conf

    redis网络协议认识

  • 正常回复:以”+”开头,以”\r\n”结尾的字符串形式

  • 错误回复:以”-“开头,以”\r\n”结尾的字符串形式

  • 整数:以”:”开头,以”\r\n”结尾的字符串形式

  • 多行字符串:以”$”开头,后面跟实际发送的字节数,以”\r\n”结尾

  • 数组:以”*”开头,后面跟成员个数

    接口定义及实现

    Connection

    interface/resp/conn.go中定义连接,主要包含三个方法,写数据、获取当前所在数据库的索引,以及选择数据库:

    1
    2
    3
    4
    5
    6
    // Connection represents a connection with redis client
    type Connection interface {
    Write([]byte) error
    GetDBIndex() int // used for multi database
    SelectDB(int)
    }

    resp/connection/conn.go中创建Connection结构体并实现Connection接口

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    // Connection represents a connection with a redis-cli
    type Connection struct {
    conn net.Conn
    waitingReply wait.Wait // waiting until reply finished
    mu sync.Mutex // lock while handler sending response
    selectedDB int // selected db
    }

    func NewConn(conn net.Conn) *Connection {
    return &Connection{conn: conn,}
    }

    // RemoteAddr returns the remote network address
    func (c *Connection) RemoteAddr() net.Addr {}

    // Close disconnect with the client
    func (c *Connection) Close() error {}

    // Write sends response to client over tcp connection
    func (c *Connection) Write(b []byte) error {}

    // GetDBIndex returns selected db
    func (c *Connection) GetDBIndex() int {}

    // SelectDB selects a database
    func (c *Connection) SelectDB(dbNum int) {}

    Reply

    interface/resp/reply.go中定义接口Reply,用于处理基于resp协议的响应,主要包含ToBytes方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // Reply is the interface of redis serialization protocol message
    type Reply interface {
    ToBytes() []byte
    }

    // ErrorReply is an error and redis.Reply
    type ErrorReply interface {
    Error() string
    ToBytes() []byte
    }

    resp/reply下创建以下三个文件,分别用于实现具体的相关处理:

  • consts.go:定义一些固定不变的响应

  • reply.go:定义正常执行命令时的响应

  • errors.go:定义发生错误时的响应

consts.go中的文件内容如下,其中的结构体都需要实现Reply接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// PongReply is +PONG
type PongReply struct{}

// OkReply is +OK
type OkReply struct{}

// NullBulkReply is empty string
type NullBulkReply struct{}

// EmptyMultiBulkReply is a empty list
type EmptyMultiBulkReply struct{}

// NoReply respond nothing, for commands like subscribe
type NoReply struct{}

reply.go中的文件内容如下,其中的结构体都需要实现Reply接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// BulkReply stores a binary-safe string
type BulkReply struct {
Arg []byte
}

// MultiBulkReply stores a list of string
type MultiBulkReply struct {
Args [][]byte
}

// StatusReply stores a simple status string
type StatusReply struct {
Status string
}

// IntReply stores an int64 number
type IntReply struct {
Code int64
}

// StandardErrReply represents handler error
type StandardErrReply struct {
Status string
}

errors.go中的文件内容如下,其中的结构体都需要实现Reply接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// UnknownErrReply represents UnknownErr
type UnknownErrReply struct{}

// ArgNumErrReply represents wrong number of arguments for command
type ArgNumErrReply struct {
Cmd string
}

// SyntaxErrReply represents meeting unexpected arguments
type SyntaxErrReply struct{}

// WrongTypeErrReply represents operation against a key holding the wrong kind of value
type WrongTypeErrReply struct{}

// ProtocolErrReply represents meeting unexpected byte during parse requests
type ProtocolErrReply struct {
Msg string
}

Database

interface/database/database.go定义接口Database,规范不同的数据库实现:

1
2
3
4
5
6
// Database is the interface for redis style storage engine
type Database interface {
Exec(client resp.Connection, args [][]byte) resp.Reply
AfterClientClose(c resp.Connection)
Close()
}

命令解析实现

在文件resp/parser/parser.go实现下面的功能:

响应流式处理

  • Payload:包含正常响应或错误

  • readState:解析器状态

  • ParseStream:返回数据类型为Payload的channel

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    // Payload stores redis.Reply or error
    type Payload struct {
    Data resp.Reply
    Err error
    }

    type readState struct {
    readingMultiLine bool
    expectedArgsCount int
    msgType byte
    args [][]byte
    bulkLen int64
    }

    func (s *readState) finished() bool {
    return s.expectedArgsCount > 0 && len(s.args) == s.expectedArgsCount
    }

    // ParseStream reads data from io.Reader and send payloads through channel
    func ParseStream(reader io.Reader) <-chan *Payload {
    ch := make(chan *Payload)
    go parse0(reader, ch)
    return ch
    }

    命令读取及具体解析

  • readLine:精确读取一行数据

  • parseMultiBulkHeader:处理多行数据(“*”号开始),改变解析器状态

  • parseBulkHeader:处理单个数据(“$”号开始),改变解析器状态

  • parseSingleLineReply:处理客户端发送+ok -err :5的情况

    1
    2
    3
    4
    5
    6
    7
    func readLine(bufReader *bufio.Reader, state *readState) ([]byte, bool, error) {}

    func parseMultiBulkHeader(msg []byte, state *readState) error {}

    func parseBulkHeader(msg []byte, state *readState) error {}

    func parseSingleLineReply(msg []byte) (resp.Reply, error) {}

    解析实现

  • readBody:去除其他字符,解析命令内容

  • parse0:根据读取到的数据调用不同的解析方法,并将结果返回Payload的channel

    1
    2
    3
    4
    // read the non-first lines of multi bulk reply or bulk reply
    func readBody(msg []byte, state *readState) error {}

    func parse0(reader io.Reader, ch chan<- *Payload) {}

    实现RespHandler

    在TCP层面处理用户发过来的数据,并调用resp协议解析器进行解析,然后根据对应的db去实际执行相关的命令,主要在Handle方法中实现。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    // RespHandler implements tcp.Handler and serves as a redis handler
    type RespHandler struct {
    activeConn sync.Map // *client -> placeholder
    db databaseface.Database
    closing atomic.Boolean // refusing new client and new request
    }

    // MakeHandler creates a RespHandler instance
    func MakeHandler() *RespHandler {
    var db databaseface.Database
    db = database.NewEchoDatabase()
    return &RespHandler{
    db: db,
    }
    }

    func (h *RespHandler) closeClient(client *connection.Connection) {}

    // Handle receives and executes redis commands
    func (h *RespHandler) Handle(ctx context.Context, conn net.Conn) {}

    // Close stops handler
    func (h *RespHandler) Close() error {}

    EchoDatabase示例

    database/echo_database.go中简单实现一个数据库,其实现了Database接口。这里只是方便做简单测试,下一篇文章将记录如何实现redis的内存数据库:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    type EchoDatabase struct {}

    func NewEchoDatabase() *EchoDatabase {
    return &EchoDatabase{}
    }

    func (e EchoDatabase) Exec(client resp.Connection, args [][]byte) resp.Reply {
    return reply.MakeMultiBulkReply(args)

    }

    func (e EchoDatabase) AfterClientClose(c resp.Connection) {
    logger.Info("EchoDatabase AfterClientClose")
    }

    func (e EchoDatabase) Close() {
    logger.Info("EchoDatabase Close")
    }