【Go】Sync(一)

Synopsis

  • sync 有幾種類型 : WaitGroup, Mutex, RWMutex, Pool, Once, Cond, Map
  • 而何時需要用到 sync.Mutex?
    • 如果我們不需要溝通怎麼辦? 如果我們只是想確保一次只有一個 goroutine 可以一次訪問一個變量以避免衝突該怎麼辦?
      • 可以用 sync.Mutex
      • 但其實一樣可以把變數塞到 channel 解決相同的問題
  • 關於 channel 的應用模式: [Lock/TryLock 模式, Or Channel 模式, Or-Done-Channel模式, Fan-in, Tee模式, 分布模式, eapache]
    • channel 的用途就是在於『讓多個 goroutine 互相溝通,不會產生重複取物件的問題』
    • Do not communicate by sharing memory; instead, share memory by communicating.
    • goroutine + channel 可以做到同步/異步的功能, 透過 channel 來讓多個 goroutine 互相溝通(也就是不會取到相同的值)

sync.Mutex{} example

  • 使用 sync.Mutex 模擬 Lock, Unlock 使用行為
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package main

import (
	"fmt"
	"math/rand"
	"sort"
	"sync"
	"time"
)

var mutex = sync.Mutex{}

type mySlice []int

var myslice = mySlice{}
var j = 0

func main() {

	go process_1()
	go process_2()

	var stop bool
	fmt.Scanln(&stop)
	sort.Ints(myslice)
	fmt.Println(myslice)
}

func process_1() {
	for i := 0; i < 50; i++ {
		func() {
			mutex.Lock()
			defer mutex.Unlock()
			fmt.Printf("(%d)鎖住了: %d\n", i, j)
			j++
			fmt.Printf("解鎖(%d): %d\n", i, j)
			myslice = append(myslice, j)
			time.Sleep(time.Microsecond * time.Duration(rand.Intn(1000)))
		}()
	}
}

func process_2() {
	for i := 0; i < 50; i++ {
		func() {
			mutex.Lock()
			defer mutex.Unlock()
			fmt.Printf("(process_2)鎖住了: j=%d\n", j)
			j++
			fmt.Printf("(process_2)解鎖: j=%d\n", j)
			myslice = append(myslice, j)
			time.Sleep(time.Microsecond * time.Duration(rand.Intn(1000)))
		}()
	}
}

channel example(一)

  • 透過 channel 模擬 lock, unlock 行為
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package main

import (
	"fmt"
	"math/rand"
	"sort"
	"time"
)

var c = make(chan int, 1)
var i = 0
var myslice []int

var plus_1_times int
var plus_2_times int

func main() {

	go plus()
	go plus2()
	c <- 0

	var stop bool
	fmt.Scanln(&stop)

	fmt.Println(<-c)

	sort.Ints(myslice)
	fmt.Println(myslice)
	fmt.Println(plus_1_times, plus_2_times)
}

func plus() {
ForEnd1:
	for {
		select {
		case x := <-c:
			if x >= 100 {
				c <- x
				break ForEnd1
			}
			x++
			fmt.Println(x)
			myslice = append(myslice, x)
			c <- x
			plus_1_times++
			time.Sleep(time.Microsecond * time.Duration(rand.Intn(1000)))
		}
	}
}

func plus2() {
ForEnd2:
	for {
		select {
		case x := <-c:
			if x >= 100 {
				c <- x
				break ForEnd2
			}
			x++
			fmt.Println(x)
			myslice = append(myslice, x)
			c <- x
			plus_2_times++
			time.Sleep(time.Microsecond * time.Duration(rand.Intn(1000)))
		}
	}
}

channel example(二)

  • 規則
    • 模擬阻塞跟非組塞的狀態
    • 建立一個 ping 單位, 透過 pingTimeMillisecond 來模擬要求的速度
    • 建立五個 pong 單位, 透過 pongTimeMillisecond 來模擬回應的速度
    • 透過 c:=make(chan int, 5), 來暫存接收且暫存的任務(當處理的 goroutine 來不及處理時)
  • 流程說明
    • 也就是當 ping 的速度過快,讓 c 來不及暫存的時候,就會產生組塞的動作
    • 當阻塞的狀況發生,在 web 上就會發生請求逾時
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package main

import (
	"fmt"
	"time"
)

func main() {

	// 模擬設定 ping 的忙碌速度
	go simulateBusyAndNobusy()

	// 模擬一個 ping 
	go pinger()

	// 模擬 5  pong 
	for index := 0; index < 5; index++ {
		go ponger(index)
	}

	var stop string
	fmt.Scanln(&stop)
}

type State int

const (
	nobusy State = iota
	busy
)

var pingTimeMillisecond time.Duration = 1000
var pongTimeMillisecond time.Duration = 1000
var (
	busyTimeMillisecond   = (pongTimeMillisecond - pingTimeMillisecond) / chanLength / 2
	NobusyTimeMillisecond = pongTimeMillisecond
)

func simulateBusyAndNobusy() {

	var state = nobusy
	for {
		switch state {
		case busy:
			fmt.Println("============>阻塞狀態")
			pingTimeMillisecond = busyTimeMillisecond
			state = nobusy
		case nobusy:
			fmt.Println("============>非阻塞狀態")
			pingTimeMillisecond = NobusyTimeMillisecond
			state = busy
		}

		// 等待 5 秒後, 切換狀態
		time.Sleep(time.Second * 3)
	}
}

const chanLength = 5

var c chan string = make(chan string, chanLength)

func ponger(id int) {
	for i := 0; ; i++ {
		x := <-c
		fmt.Printf("pong id(%d) from : %v\n", id, x)
		time.Sleep(time.Millisecond * pongTimeMillisecond)
	}
}

func pinger() {
	for i := 0; ; i++ {
		isChannelChoke()
		c <- fmt.Sprint(i)
		fmt.Printf("ping:%v , and c(%d)\n", i, len(c))
		time.Sleep(time.Millisecond * pingTimeMillisecond)
	}
}

func isChannelChoke() {
	if len(c) == chanLength {
		fmt.Println("【Task choke .......】")
	}
}