Go语言并发处理机制

Goroutine

Go 语言提供了 goroutine 用于实现并发,这是一种轻量级线程,有时也叫做 协程

1
go f(x, y, z)

这行代码启动了一个新协程来执行函数调用:

1
f(x, y, z)

请注意,函数参数 xyz 求值由当前协程负责;而函数 f 的执行由新协程负责。

由于同个进程中的协程跑在同一个地址空间中,访问共享内存区域时需要 同步synchronized )。sync 包提供了很多好用的同步原语,不过使用频率不高,因为 Go 提供了更科学的机制。

下面,我们来看一个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
package main

import (
	"fmt"
	"time"
)

func say(s string) {
	for i := 0; i < 5; i++ {
		time.Sleep(100 * time.Millisecond)
		fmt.Println(s)
	}
}

func main() {
	go say("world")
	say("hello")
}

say 函数每隔 100 毫秒输出一个指定字符串,循环 5 次;程序启动一个协程输出单词 world ,主协程输出单词 hello ;最终效果是程序交替输出这两个单词,但顺序是随机的。

Channel

通道channel )是一种带类型的管道,通过操作符 <- 可以向它发送或者接收数据:

1
2
3
4
5
// 向通道ch发送数据v
ch <- v

// 从通道ch接收数据,并赋值给变量v
v := <-ch

通道操作符 <- 很形象,箭头表明数据的流向。

映射表切片 一样,通道创建好之后才能使用:

1
ch := make(chan int)

发送和接收操作默认都会阻塞,直到对方就绪。举个例子,接收协程会一直阻塞,直到其他协程发送数据,反之亦然。因此,Go 协程可以通过通道来实现同步,不用显式加锁或使用条件变量。

我们来看一个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import "fmt"

func sum(s []int, c chan int) {
	sum := 0
	for _, v := range s {
		sum += v
	}
	c <- sum // send sum to c
}

func main() {
	s := []int{7, 2, 8, -9, 4, 0}

	c := make(chan int)
	go sum(s[:len(s)/2], c)
	go sum(s[len(s)/2:], c)
	x, y := <-c, <-c // receive from c

	fmt.Println(x, y, x+y)
}

sum 函数计算将 int 切片的值累加,然后将结果写到一个通道。主协程启动两个协程分头计算两个切片的和;然后从通道收集结果并计算总和。

带缓冲通道

通道默认阻塞,当生产者和消费者同时操作时,才能完成通信。如果消费者还在处理上一个数据,那生产者就只能阻塞等待,通信效率便大打折扣。

不过不用担心,通道还能够缓存数据!给 make 加个参数指定缓冲区长度,即可创建带缓冲通道:

1
ch := make(chan int, 100)

这样一来,只有缓冲区满了之后,发送才会阻塞;缓冲区空了之后,接收才会阻塞。

我们来看一个例子:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
package main

import "fmt"

func main() {
	ch := make(chan int, 2)
	ch <- 1
	ch <- 2
	fmt.Println(<-ch)
	fmt.Println(<-ch)
}

试着修改通道的缓冲区大小,观察程序有什么变化?

关闭通道

发送者可以调用 close 函数关闭通道,以此表明数据已经发完。接收者可以判断通道是否已被关闭:

1
v, ok := <-ch

接收表达式赋值额外变量 ok :如果 ok 值为 false ,说明通道已经关闭,而且数据已经读完。

循环接收

我们可以通过 range 不断接收数据,直到通道关闭:

1
for i := range c

只有发送者可以关闭通道,禁止接收者关闭通道!向已关闭通道发送数据会导致 panic 出错!

通道跟文件不一样,不是每个通道都必须关闭。需要告知接收方数据发完的场景才需要关闭通道,比如让 range 接收循环终止。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package main

import (
	"fmt"
)

func fibonacci(n int, c chan int) {
	x, y := 0, 1
	for i := 0; i < n; i++ {
		c <- x
		x, y = y, x+y
	}
	close(c)
}

func main() {
	c := make(chan int, 10)
	go fibonacci(cap(c), c)
	for i := range c {
		fmt.Println(i)
	}
}

这个例子启动一个新协程输出斐波那契数列,数据通过通道传给主协程。当指定个数达到后,子协程关闭通道。主协程则通过 range 将数列逐个输出。

Select

通过 select 语句,Go 协程可以同时等待多个通信操作。select 会一直阻塞,直到有 case 通信操作就绪,执行对应的 case 代码体。如果有多个 case 就绪,执行顺序是随机的。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package main

import "fmt"

func fibonacci(c, quit chan int) {
	x, y := 0, 1
	for {
		select {
		case c <- x:
			x, y = y, x+y
		case <-quit:
			fmt.Println("quit")
			return
		}
	}
}

func main() {
	c := make(chan int)
	quit := make(chan int)
	go func() {
		for i := 0; i < 10; i++ {
			fmt.Println(<-c)
		}
		quit <- 0
	}()
	fibonacci(c, quit)
}

fibonacci 函数接受两个通道,然后不断向通道 c 发送斐波那契数列,直到 quit 通道发来数据。该函数主体是一个循环,每次循环同时发起并等待两个通信操作:

  • 将最新的斐波那契数发到通道 c ,操作成功则计算下一个斐波那契数,并进入下一次循环;
  • quit 通道接收数据,操作成功则退出函数;

默认分支

select 语句也可以写 default 分支,如果没一个 case 已就绪,就执行 default 分支。有了 default 分支,我们可以尝试发送或者接收数据,而不会造成阻塞:

1
2
3
4
5
6
select {
case i := <-c:
    // use i
default:
    // receiving from c would block
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package main

import (
	"fmt"
	"time"
)

func main() {
	tick := time.Tick(100 * time.Millisecond)
	boom := time.After(500 * time.Millisecond)
	for {
		select {
		case <-tick:
			fmt.Println("tick.")
		case <-boom:
			fmt.Println("BOOM!")
			return
		default:
			fmt.Println("    .")
			time.Sleep(50 * time.Millisecond)
		}
	}
}

sync.Mutex

Go 协程通过 通道channel )来进行通信,但如果我们不需要通信呢?如果我们只需保证,同一时间下只有一个协程访问共享变量,避免冲突呢?

这个概念叫做 互斥mutual exclusion ),提供互斥功能的数据结构习惯称为 互斥锁mutex )。Go 标准库提供了互斥锁 sync.Mutex ,它有两个方法:

  • Lock ,加锁,加锁后无法再次加锁;
  • Unlock ,解锁,解锁后可再次加锁;
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package main

import (
	"fmt"
	"sync"
	"time"
)

// SafeCounter is safe to use concurrently.
type SafeCounter struct {
	mu sync.Mutex
	v  map[string]int
}

// Inc increments the counter for the given key.
func (c *SafeCounter) Inc(key string) {
	c.mu.Lock()
	// Lock so only one goroutine at a time can access the map c.v.
	c.v[key]++
	c.mu.Unlock()
}

// Value returns the current value of the counter for the given key.
func (c *SafeCounter) Value(key string) int {
	c.mu.Lock()
	// Lock so only one goroutine at a time can access the map c.v.
	defer c.mu.Unlock()
	return c.v[key]
}

来看一个例子,Inc 方法中由 LockUnlock 包围起来的代码,将被互斥执行。换句话讲,任一时刻内最多只有一个协程在执行这个代码块。当然了,我们也可以通过 defer 关键字确保互斥锁用完后解锁,参考 Value 方法。

练习:Web爬虫

这个练习我们使用 Go 语言的并发机制,为一个网络爬虫实现并发处理。修改 Crawl 函数,实现并发请求 URL ,要求同一个 URL 不能请求两次。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package main

import (
	"fmt"
)

type Fetcher interface {
	// Fetch returns the body of URL and
	// a slice of URLs found on that page.
	Fetch(url string) (body string, urls []string, err error)
}

// Crawl uses fetcher to recursively crawl
// pages starting with url, to a maximum of depth.
func Crawl(url string, depth int, fetcher Fetcher) {
	// TODO: Fetch URLs in parallel.
	// TODO: Don't fetch the same URL twice.
	// This implementation doesn't do either:
	if depth <= 0 {
		return
	}
	body, urls, err := fetcher.Fetch(url)
	if err != nil {
		fmt.Println(err)
		return
	}
	fmt.Printf("found: %s %q\n", url, body)
	for _, u := range urls {
		Crawl(u, depth-1, fetcher)
	}
	return
}

func main() {
	Crawl("https://golang.org/", 4, fetcher)
}

// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult

type fakeResult struct {
	body string
	urls []string
}

func (f fakeFetcher) Fetch(url string) (string, []string, error) {
	if res, ok := f[url]; ok {
		return res.body, res.urls, nil
	}
	return "", nil, fmt.Errorf("not found: %s", url)
}

// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
	"https://golang.org/": &fakeResult{
		"The Go Programming Language",
		[]string{
			"https://golang.org/pkg/",
			"https://golang.org/cmd/",
		},
	},
	"https://golang.org/pkg/": &fakeResult{
		"Packages",
		[]string{
			"https://golang.org/",
			"https://golang.org/cmd/",
			"https://golang.org/pkg/fmt/",
			"https://golang.org/pkg/os/",
		},
	},
	"https://golang.org/pkg/fmt/": &fakeResult{
		"Package fmt",
		[]string{
			"https://golang.org/",
			"https://golang.org/pkg/",
		},
	},
	"https://golang.org/pkg/os/": &fakeResult{
		"Package os",
		[]string{
			"https://golang.org/",
			"https://golang.org/pkg/",
		},
	},
}

提示:你可以用一个 映射表 来缓存已经请求过的 URL ,但得注意单独映射表不是并发安全的。

答案

【小菜学Go语言】系列文章首发于公众号【小菜学编程】,敬请关注:

【小菜学Go语言】系列文章首发于公众号【小菜学编程】,敬请关注: