使用gin封装
项目目录结构如下:
api
:主要业务逻辑实现,结合gin的上下文进行封装config
:项目主要配置文件global
:全局变量管理model
:结构体定义的地方proto
:定义proto文件,存放其生成的go文件router
:路由控制storage
:运行日志存储及临时结果存放util
:常用的工具类封装1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38├─api
│ │ base.go
│ │
│ └─crawler
│ │ crawl.go
│ │
│ └─links
│ links.go
│
├─config
│ settings.go
│
├─global
│ global.go
│
├─model
│ ├─request
│ │ request.go
│ │
│ └─response
│ response.go
│
├─proto
│ crawler.proto
│
├─router
│ router.go
│
├─storage
│ ├─logs
│ └─tasks
│─utils
│ └─writer
│ write_data.go
│ go.mod
│ go.sum
│ main.go
│ readme.md完善各控制流程
任务定义及初始化如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45type work struct {
url string // 网页链接
currDepth int // 当前深度
}
type WlcTask struct {
taskID string // 任务ID
workList chan []work // 存放每一个链接下一层的所有链接
workListBak chan []work // workList的备份, 用于任务的暂停与恢复
unVisitedLinks chan work // 存放未访问过的链接
stopSignalCh chan struct{} // 用于通知所有生产消费的goroutine退出, 防止内存泄露
quitCh chan struct{} // 用于主动停止任务, 通过递归深度判断
statusCh chan string // 用于记录任务状态变化
visited map[string]bool // 标记已访问过的链接
startUrl []work // 开始链接
concurrency int // 并发数量
timeOut int // 超时时间设置
depth int // 递归深度
currDepth int // 当前递归深度
status string // 任务状态 created running stopped finished pausing resuming
}
func NewWclTask(wlcReq *request.WlcTaskReq) *WlcTask {
var startUrl []work
for _, url := range wlcReq.Urls {
startUrl = append(startUrl, work{url: url, currDepth: 1})
}
workListCh := make(chan []work)
return &WlcTask{
taskID: wlcReq.TaskID,
workList: workListCh,
workListBak: workListCh,
unVisitedLinks: make(chan work),
stopSignalCh: make(chan struct{}),
quitCh: make(chan struct{}),
statusCh: make(chan string),
visited: make(map[string]bool),
startUrl: startUrl,
concurrency: wlcReq.Concurrency,
timeOut: wlcReq.Timeout,
depth: wlcReq.Depth,
currDepth: 1,
status: config.TaskStatus.Created,
}
}另外,还针对
WlcTask
封装了一个改变任务状态的方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26func (w *WlcTask) ChangeTaskStatus() {
for {
select {
case <-w.stopSignalCh:
log.Printf("[Start] record task(%s) status goroutine exit...", w.taskID)
return
case taskStr, ok := <-w.statusCh:
if ok {
arr := strings.Split(taskStr, ",")
w.status = arr[0]
currDepth, err := strconv.Atoi(arr[1])
if err != nil {
log.Println(err)
}
log.Printf("[Start] change task(%s) status to %s, depth is, %d", w.taskID, w.status, currDepth)
// recode the task info to a json file
task := request.TaskInfo{
Status: arr[0],
CurrDepth: currDepth,
}
writer.TaskInfoWriter(w.taskID, &task)
}
}
}
}启动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61func (w *WlcTask) StartHandler(globalTask map[string]*WlcTask) {
// record the task status change
go w.ChangeTaskStatus()
go func() { w.workList <- w.startUrl }()
w.statusCh <- fmt.Sprintf("%s,%d", config.TaskStatus.Running, w.currDepth)
for i := 0; i < w.concurrency; i++ {
go func(num int) {
for {
select {
case <-w.stopSignalCh:
log.Printf("[Start] task(%s) goroutine %d exit...", w.taskID, num)
return
case link, _ := <-w.unVisitedLinks:
foundLinks := crawl(link, w.depth, w.quitCh)
go func() { w.workList <- foundLinks }()
}
}
}(i)
}
for {
select {
case list := <-w.workList:
// change taskStatus from resuming to running
if w.status != config.TaskStatus.Running {
w.status = config.TaskStatus.Running
}
for _, link := range list {
if !w.visited[link.url] {
w.visited[link.url] = true
w.unVisitedLinks <- link
// write crawl result to file and update currDepth
writer.TaskResultWriter(fmt.Sprintf("%s.txt", w.taskID), fmt.Sprintln(link.url))
if link.currDepth != w.currDepth {
w.currDepth = link.currDepth
}
}
}
case <-w.quitCh:
log.Printf("[Start] task(%s) over than crawl depth, get quit signal and exit", w.taskID)
w.statusCh <- fmt.Sprintf("%s,%d", config.TaskStatus.Stopped, w.currDepth)
close(w.stopSignalCh)
delete(globalTask, w.taskID)
return
case <-time.After(time.Duration(w.timeOut) * time.Second):
if w.status == config.TaskStatus.Pausing || w.status == config.TaskStatus.Resuming {
continue
} else {
log.Printf("[Start] task(%s) execute timeout, get timeout signal and exit", w.taskID)
w.statusCh <- fmt.Sprintf("%s,%d", config.TaskStatus.Finished, w.currDepth)
close(w.stopSignalCh)
delete(globalTask, w.taskID)
return
}
}
}
}停止
停止较为简单,直接向退出信号channel发送消息即可:
1
2
3func (w *WlcTask) StopHandler() {
w.quitCh <- struct{}{}
}暂停
暂停的思路是将数据传输的channel赋值为nil,则相关的goroutine会阻塞进而整个执行暂停。
1
2
3
4
5
6
7
8
9
10
11
12
13func (w *WlcTask) PauseHandler() {
if w.status == config.TaskStatus.Pausing {
log.Printf("[Pause] task(%s) is already pause, can't pause again", w.taskID)
return
}
if w.status == config.TaskStatus.Stopped || w.status == config.TaskStatus.Finished {
log.Printf("[Pause] task(%s) is closed, can't pause", w.taskID)
return
}
w.statusCh <- fmt.Sprintf("%s,%d", config.TaskStatus.Pausing, w.currDepth)
w.workList = nil
log.Printf("[Pause] task(%s) is pausing", w.taskID)
}恢复
将之前备份好的数据channel重新赋值给当前使用的channel,则相关goroutine会接着运行。
1
2
3
4
5
6
7
8
9func (w *WlcTask) ResumingHandler() {
if w.status != config.TaskStatus.Pausing {
log.Printf("[Resume] task(%s) is not pausing, can't resume", w.taskID)
return
}
w.workList = w.workListBak
w.statusCh <- fmt.Sprintf("%s,%d", config.TaskStatus.Resuming, w.currDepth)
log.Printf("[Resume] task(%s) is resuming", w.taskID)
}状态及结果获取
状态获取:
1
2
3func (w *WlcTask) StatusHandler() (status string, currDepth int) {
return w.status, w.currDepth
}结果获取:
1
2
3
4
5
6
7
8func ResultHandler(taskID string) (links []string, err error) {
f, err := ioutil.ReadFile(taskID + ".txt")
if err != nil {
return nil, err
}
links = strings.Split(string(f), "\n")
return links[:len(links)-1], nil
}思考
本文使用的是channel控制整个流程,当换成标准库context时如何编写?
路由的地方有重复性的地方,考虑如何优化?
当前项目未自定义错误,如何完善整个项目的异常处理?
v1.5.2