go语言实现爬虫(二)

使用gin封装

项目目录结构如下:

  • api:主要业务逻辑实现,结合gin的上下文进行封装

  • config:项目主要配置文件

  • global:全局变量管理

  • model:结构体定义的地方

  • proto:定义proto文件,存放其生成的go文件

  • router:路由控制

  • storage:运行日志存储及临时结果存放

  • util:常用的工具类封装

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    ├─api
    │ │ base.go
    │ │
    │ └─crawler
    │ │ crawl.go
    │ │
    │ └─links
    │ links.go

    ├─config
    │ settings.go

    ├─global
    │ global.go

    ├─model
    │ ├─request
    │ │ request.go
    │ │
    │ └─response
    │ response.go

    ├─proto
    │ crawler.proto

    ├─router
    │ router.go

    ├─storage
    │ ├─logs
    │ └─tasks
    │─utils
    │ └─writer
    │ write_data.go
    │ go.mod
    │ go.sum
    │ main.go
    │ readme.md

    完善各控制流程

    任务定义及初始化如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    type work struct {
    url string // 网页链接
    currDepth int // 当前深度
    }

    type WlcTask struct {
    taskID string // 任务ID
    workList chan []work // 存放每一个链接下一层的所有链接
    workListBak chan []work // workList的备份, 用于任务的暂停与恢复
    unVisitedLinks chan work // 存放未访问过的链接
    stopSignalCh chan struct{} // 用于通知所有生产消费的goroutine退出, 防止内存泄露
    quitCh chan struct{} // 用于主动停止任务, 通过递归深度判断
    statusCh chan string // 用于记录任务状态变化
    visited map[string]bool // 标记已访问过的链接
    startUrl []work // 开始链接
    concurrency int // 并发数量
    timeOut int // 超时时间设置
    depth int // 递归深度
    currDepth int // 当前递归深度
    status string // 任务状态 created running stopped finished pausing resuming
    }

    func NewWclTask(wlcReq *request.WlcTaskReq) *WlcTask {
    var startUrl []work
    for _, url := range wlcReq.Urls {
    startUrl = append(startUrl, work{url: url, currDepth: 1})
    }
    workListCh := make(chan []work)
    return &WlcTask{
    taskID: wlcReq.TaskID,
    workList: workListCh,
    workListBak: workListCh,
    unVisitedLinks: make(chan work),
    stopSignalCh: make(chan struct{}),
    quitCh: make(chan struct{}),
    statusCh: make(chan string),
    visited: make(map[string]bool),
    startUrl: startUrl,
    concurrency: wlcReq.Concurrency,
    timeOut: wlcReq.Timeout,
    depth: wlcReq.Depth,
    currDepth: 1,
    status: config.TaskStatus.Created,
    }
    }

    另外,还针对WlcTask封装了一个改变任务状态的方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    func (w *WlcTask) ChangeTaskStatus() {
    for {
    select {
    case <-w.stopSignalCh:
    log.Printf("[Start] record task(%s) status goroutine exit...", w.taskID)
    return
    case taskStr, ok := <-w.statusCh:
    if ok {
    arr := strings.Split(taskStr, ",")
    w.status = arr[0]
    currDepth, err := strconv.Atoi(arr[1])
    if err != nil {
    log.Println(err)
    }
    log.Printf("[Start] change task(%s) status to %s, depth is, %d", w.taskID, w.status, currDepth)

    // recode the task info to a json file
    task := request.TaskInfo{
    Status: arr[0],
    CurrDepth: currDepth,
    }
    writer.TaskInfoWriter(w.taskID, &task)
    }
    }
    }
    }

    启动

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    func (w *WlcTask) StartHandler(globalTask map[string]*WlcTask) {
    // record the task status change
    go w.ChangeTaskStatus()

    go func() { w.workList <- w.startUrl }()

    w.statusCh <- fmt.Sprintf("%s,%d", config.TaskStatus.Running, w.currDepth)

    for i := 0; i < w.concurrency; i++ {
    go func(num int) {
    for {
    select {
    case <-w.stopSignalCh:
    log.Printf("[Start] task(%s) goroutine %d exit...", w.taskID, num)
    return
    case link, _ := <-w.unVisitedLinks:
    foundLinks := crawl(link, w.depth, w.quitCh)
    go func() { w.workList <- foundLinks }()
    }
    }
    }(i)
    }

    for {
    select {
    case list := <-w.workList:
    // change taskStatus from resuming to running
    if w.status != config.TaskStatus.Running {
    w.status = config.TaskStatus.Running
    }
    for _, link := range list {
    if !w.visited[link.url] {
    w.visited[link.url] = true
    w.unVisitedLinks <- link
    // write crawl result to file and update currDepth
    writer.TaskResultWriter(fmt.Sprintf("%s.txt", w.taskID), fmt.Sprintln(link.url))
    if link.currDepth != w.currDepth {
    w.currDepth = link.currDepth
    }
    }
    }
    case <-w.quitCh:
    log.Printf("[Start] task(%s) over than crawl depth, get quit signal and exit", w.taskID)
    w.statusCh <- fmt.Sprintf("%s,%d", config.TaskStatus.Stopped, w.currDepth)
    close(w.stopSignalCh)
    delete(globalTask, w.taskID)
    return
    case <-time.After(time.Duration(w.timeOut) * time.Second):
    if w.status == config.TaskStatus.Pausing || w.status == config.TaskStatus.Resuming {
    continue
    } else {
    log.Printf("[Start] task(%s) execute timeout, get timeout signal and exit", w.taskID)
    w.statusCh <- fmt.Sprintf("%s,%d", config.TaskStatus.Finished, w.currDepth)
    close(w.stopSignalCh)
    delete(globalTask, w.taskID)
    return
    }
    }
    }
    }

    停止

    停止较为简单,直接向退出信号channel发送消息即可:

    1
    2
    3
    func (w *WlcTask) StopHandler() {
    w.quitCh <- struct{}{}
    }

    暂停

    暂停的思路是将数据传输的channel赋值为nil,则相关的goroutine会阻塞进而整个执行暂停。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    func (w *WlcTask) PauseHandler() {
    if w.status == config.TaskStatus.Pausing {
    log.Printf("[Pause] task(%s) is already pause, can't pause again", w.taskID)
    return
    }
    if w.status == config.TaskStatus.Stopped || w.status == config.TaskStatus.Finished {
    log.Printf("[Pause] task(%s) is closed, can't pause", w.taskID)
    return
    }
    w.statusCh <- fmt.Sprintf("%s,%d", config.TaskStatus.Pausing, w.currDepth)
    w.workList = nil
    log.Printf("[Pause] task(%s) is pausing", w.taskID)
    }

    恢复

    将之前备份好的数据channel重新赋值给当前使用的channel,则相关goroutine会接着运行。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    func (w *WlcTask) ResumingHandler() {
    if w.status != config.TaskStatus.Pausing {
    log.Printf("[Resume] task(%s) is not pausing, can't resume", w.taskID)
    return
    }
    w.workList = w.workListBak
    w.statusCh <- fmt.Sprintf("%s,%d", config.TaskStatus.Resuming, w.currDepth)
    log.Printf("[Resume] task(%s) is resuming", w.taskID)
    }

    状态及结果获取

    状态获取:

    1
    2
    3
    func (w *WlcTask) StatusHandler() (status string, currDepth int) {
    return w.status, w.currDepth
    }

    结果获取:

    1
    2
    3
    4
    5
    6
    7
    8
    func ResultHandler(taskID string) (links []string, err error) {
    f, err := ioutil.ReadFile(taskID + ".txt")
    if err != nil {
    return nil, err
    }
    links = strings.Split(string(f), "\n")
    return links[:len(links)-1], nil
    }

    思考

  • 本文使用的是channel控制整个流程,当换成标准库context时如何编写?

  • 路由的地方有重复性的地方,考虑如何优化?

  • 当前项目未自定义错误,如何完善整个项目的异常处理?

Powered By Valine
v1.5.2