Goroutine
Go 语言提供了 goroutine 用于实现并发,这是一种轻量级线程,有时也叫做 协程 。
这行代码启动了一个新协程来执行函数调用:
请注意,函数参数 x 、y 和 z 求值由当前协程负责;而函数 f 的执行由新协程负责。
由于同个进程中的协程跑在同一个地址空间中,访问共享内存区域时需要 同步( synchronized )。sync 包提供了很多好用的同步原语,不过使用频率不高,因为 Go 提供了更科学的机制。
下面,我们来看一个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
package main
import (
"fmt"
"time"
)
func say(s string) {
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
fmt.Println(s)
}
}
func main() {
go say("world")
say("hello")
}
|
say 函数每隔 100 毫秒输出一个指定字符串,循环 5 次;程序启动一个协程输出单词 world
,主协程输出单词 hello
;最终效果是程序交替输出这两个单词,但顺序是随机的。
Channel
通道( channel )是一种带类型的管道,通过操作符 <-
可以向它发送或者接收数据:
1
2
3
4
5
|
// 向通道ch发送数据v
ch <- v
// 从通道ch接收数据,并赋值给变量v
v := <-ch
|
通道操作符 <-
很形象,箭头表明数据的流向。
跟 映射表 和 切片 一样,通道创建好之后才能使用:
发送和接收操作默认都会阻塞,直到对方就绪。举个例子,接收协程会一直阻塞,直到其他协程发送数据,反之亦然。因此,Go 协程可以通过通道来实现同步,不用显式加锁或使用条件变量。
我们来看一个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package main
import "fmt"
func sum(s []int, c chan int) {
sum := 0
for _, v := range s {
sum += v
}
c <- sum // send sum to c
}
func main() {
s := []int{7, 2, 8, -9, 4, 0}
c := make(chan int)
go sum(s[:len(s)/2], c)
go sum(s[len(s)/2:], c)
x, y := <-c, <-c // receive from c
fmt.Println(x, y, x+y)
}
|
sum 函数计算将 int 切片的值累加,然后将结果写到一个通道。主协程启动两个协程分头计算两个切片的和;然后从通道收集结果并计算总和。
带缓冲通道
通道默认阻塞,当生产者和消费者同时操作时,才能完成通信。如果消费者还在处理上一个数据,那生产者就只能阻塞等待,通信效率便大打折扣。
不过不用担心,通道还能够缓存数据!给 make 加个参数指定缓冲区长度,即可创建带缓冲通道:
1
|
ch := make(chan int, 100)
|
这样一来,只有缓冲区满了之后,发送才会阻塞;缓冲区空了之后,接收才会阻塞。
我们来看一个例子:
1
2
3
4
5
6
7
8
9
10
11
|
package main
import "fmt"
func main() {
ch := make(chan int, 2)
ch <- 1
ch <- 2
fmt.Println(<-ch)
fmt.Println(<-ch)
}
|
试着修改通道的缓冲区大小,观察程序有什么变化?
关闭通道
发送者可以调用 close 函数关闭通道,以此表明数据已经发完。接收者可以判断通道是否已被关闭:
接收表达式赋值额外变量 ok :如果 ok 值为 false
,说明通道已经关闭,而且数据已经读完。
循环接收
我们可以通过 range 不断接收数据,直到通道关闭:
只有发送者可以关闭通道,禁止接收者关闭通道!向已关闭通道发送数据会导致 panic 出错!
通道跟文件不一样,不是每个通道都必须关闭。需要告知接收方数据发完的场景才需要关闭通道,比如让 range 接收循环终止。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
package main
import (
"fmt"
)
func fibonacci(n int, c chan int) {
x, y := 0, 1
for i := 0; i < n; i++ {
c <- x
x, y = y, x+y
}
close(c)
}
func main() {
c := make(chan int, 10)
go fibonacci(cap(c), c)
for i := range c {
fmt.Println(i)
}
}
|
这个例子启动一个新协程输出斐波那契数列,数据通过通道传给主协程。当指定个数达到后,子协程关闭通道。主协程则通过 range 将数列逐个输出。
Select
通过 select 语句,Go 协程可以同时等待多个通信操作。select 会一直阻塞,直到有 case 通信操作就绪,执行对应的 case 代码体。如果有多个 case 就绪,执行顺序是随机的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
|
package main
import "fmt"
func fibonacci(c, quit chan int) {
x, y := 0, 1
for {
select {
case c <- x:
x, y = y, x+y
case <-quit:
fmt.Println("quit")
return
}
}
}
func main() {
c := make(chan int)
quit := make(chan int)
go func() {
for i := 0; i < 10; i++ {
fmt.Println(<-c)
}
quit <- 0
}()
fibonacci(c, quit)
}
|
fibonacci 函数接受两个通道,然后不断向通道 c 发送斐波那契数列,直到 quit 通道发来数据。该函数主体是一个循环,每次循环同时发起并等待两个通信操作:
- 将最新的斐波那契数发到通道 c ,操作成功则计算下一个斐波那契数,并进入下一次循环;
- 从 quit 通道接收数据,操作成功则退出函数;
默认分支
select 语句也可以写 default 分支,如果没一个 case 已就绪,就执行 default 分支。有了 default 分支,我们可以尝试发送或者接收数据,而不会造成阻塞:
1
2
3
4
5
6
|
select {
case i := <-c:
// use i
default:
// receiving from c would block
}
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
package main
import (
"fmt"
"time"
)
func main() {
tick := time.Tick(100 * time.Millisecond)
boom := time.After(500 * time.Millisecond)
for {
select {
case <-tick:
fmt.Println("tick.")
case <-boom:
fmt.Println("BOOM!")
return
default:
fmt.Println(" .")
time.Sleep(50 * time.Millisecond)
}
}
}
|
sync.Mutex
Go 协程通过 通道( channel )来进行通信,但如果我们不需要通信呢?如果我们只需保证,同一时间下只有一个协程访问共享变量,避免冲突呢?
这个概念叫做 互斥( mutual exclusion ),提供互斥功能的数据结构习惯称为 互斥锁( mutex )。Go 标准库提供了互斥锁 sync.Mutex ,它有两个方法:
- Lock ,加锁,加锁后无法再次加锁;
- Unlock ,解锁,解锁后可再次加锁;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
|
package main
import (
"fmt"
"sync"
"time"
)
// SafeCounter is safe to use concurrently.
type SafeCounter struct {
mu sync.Mutex
v map[string]int
}
// Inc increments the counter for the given key.
func (c *SafeCounter) Inc(key string) {
c.mu.Lock()
// Lock so only one goroutine at a time can access the map c.v.
c.v[key]++
c.mu.Unlock()
}
// Value returns the current value of the counter for the given key.
func (c *SafeCounter) Value(key string) int {
c.mu.Lock()
// Lock so only one goroutine at a time can access the map c.v.
defer c.mu.Unlock()
return c.v[key]
}
|
来看一个例子,Inc 方法中由 Lock 和 Unlock 包围起来的代码,将被互斥执行。换句话讲,任一时刻内最多只有一个协程在执行这个代码块。当然了,我们也可以通过 defer 关键字确保互斥锁用完后解锁,参考 Value 方法。
练习:Web爬虫
这个练习我们使用 Go 语言的并发机制,为一个网络爬虫实现并发处理。修改 Crawl 函数,实现并发请求 URL ,要求同一个 URL 不能请求两次。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
package main
import (
"fmt"
)
type Fetcher interface {
// Fetch returns the body of URL and
// a slice of URLs found on that page.
Fetch(url string) (body string, urls []string, err error)
}
// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher) {
// TODO: Fetch URLs in parallel.
// TODO: Don't fetch the same URL twice.
// This implementation doesn't do either:
if depth <= 0 {
return
}
body, urls, err := fetcher.Fetch(url)
if err != nil {
fmt.Println(err)
return
}
fmt.Printf("found: %s %q\n", url, body)
for _, u := range urls {
Crawl(u, depth-1, fetcher)
}
return
}
func main() {
Crawl("https://golang.org/", 4, fetcher)
}
// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult
type fakeResult struct {
body string
urls []string
}
func (f fakeFetcher) Fetch(url string) (string, []string, error) {
if res, ok := f[url]; ok {
return res.body, res.urls, nil
}
return "", nil, fmt.Errorf("not found: %s", url)
}
// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
"https://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"https://golang.org/pkg/",
"https://golang.org/cmd/",
},
},
"https://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"https://golang.org/",
"https://golang.org/cmd/",
"https://golang.org/pkg/fmt/",
"https://golang.org/pkg/os/",
},
},
"https://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"https://golang.org/",
"https://golang.org/pkg/",
},
},
"https://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"https://golang.org/",
"https://golang.org/pkg/",
},
},
}
|
提示:你可以用一个 映射表 来缓存已经请求过的 URL ,但得注意单独映射表不是并发安全的。
答案
【小菜学Go语言】系列文章首发于公众号【小菜学编程】,敬请关注: