Go语言中的并发编程:从Goroutine到Channel

张开发
2026/4/11 1:13:22 15 分钟阅读

分享文章

Go语言中的并发编程:从Goroutine到Channel
Go语言中的并发编程从Goroutine到Channel1. 引言并发编程是现代软件开发中的重要组成部分Go语言以其简洁而强大的并发模型而闻名。通过goroutine和channelGo语言提供了一种优雅的并发编程方式使得并发代码更加易于编写和理解。本文将从goroutine到channel深入探讨Go语言中的并发编程技术帮助开发者掌握并发编程构建高效、可靠的并发应用。2. Goroutine2.1 什么是GoroutineGoroutine是Go语言中的轻量级线程由Go运行时管理比系统线程更轻量创建和切换成本更低。2.2 创建Goroutine使用go关键字创建goroutine。package main import ( fmt time ) func sayHello() { fmt.Println(Hello, Goroutine!) } func main() { // 创建goroutine go sayHello() // 主线程需要等待goroutine执行完成 time.Sleep(1 * time.Second) fmt.Println(Hello, Main!) }2.3 Goroutine的生命周期Goroutine的生命周期由Go运行时管理当goroutine的函数执行完成时goroutine就会结束。package main import ( fmt time ) func worker(id int) { fmt.Printf(Worker %d starting\n, id) time.Sleep(1 * time.Second) fmt.Printf(Worker %d done\n, id) } func main() { for i : 1; i 5; i { go worker(i) } time.Sleep(2 * time.Second) fmt.Println(All workers completed) }2.4 等待Goroutine完成使用sync.WaitGroup等待goroutine完成。package main import ( fmt sync time ) func worker(id int, wg *sync.WaitGroup) { defer wg.Done() fmt.Printf(Worker %d starting\n, id) time.Sleep(1 * time.Second) fmt.Printf(Worker %d done\n, id) } func main() { var wg sync.WaitGroup for i : 1; i 5; i { wg.Add(1) go worker(i, wg) } wg.Wait() fmt.Println(All workers completed) }3. Channel3.1 什么是ChannelChannel是Go语言中用于goroutine之间通信的管道可以安全地在不同goroutine之间传递数据。3.2 创建Channel使用make函数创建channel。package main import fmt func main() { // 创建无缓冲channel ch : make(chan string) // 创建goroutine发送数据 go func() { ch - Hello, Channel! }() // 接收数据 message : -ch fmt.Println(message) }3.3 缓冲Channelpackage main import fmt func main() { // 创建缓冲channel容量为2 ch : make(chan string, 2) // 发送数据 ch - Message 1 ch - Message 2 // 接收数据 fmt.Println(-ch) fmt.Println(-ch) }3.4 关闭Channel使用close函数关闭channel。package main import fmt func main() { ch : make(chan int, 3) ch - 1 ch - 2 ch - 3 close(ch) // 遍历channel中的数据 for v : range ch { fmt.Println(v) } // 尝试发送到已关闭的channel会导致panic // ch - 4 // 会panic }4. Select语句4.1 基本使用select语句用于在多个channel操作中选择一个执行。package main import ( fmt time ) func main() { ch1 : make(chan string) ch2 : make(chan string) go func() { time.Sleep(1 * time.Second) ch1 - From channel 1 }() go func() { time.Sleep(2 * time.Second) ch2 - From channel 2 }() for i : 0; i 2; i { select { case msg1 : -ch1: fmt.Println(msg1) case msg2 : -ch2: fmt.Println(msg2) case -time.After(3 * time.Second): fmt.Println(Timeout) } } }4.2 默认分支package main import ( fmt time ) func main() { ch : make(chan string) for i : 0; i 5; i { select { case msg : -ch: fmt.Println(msg) default: fmt.Println(No message available) time.Sleep(500 * time.Millisecond) } } }5. 互斥锁5.1 基本使用使用sync.Mutex实现互斥锁。package main import ( fmt sync time ) var ( counter int mutex sync.Mutex ) func increment(wg *sync.WaitGroup) { defer wg.Done() mutex.Lock() defer mutex.Unlock() counter fmt.Printf(Counter: %d\n, counter) } func main() { var wg sync.WaitGroup for i : 0; i 10; i { wg.Add(1) go increment(wg) } wg.Wait() fmt.Printf(Final counter: %d\n, counter) }5.2 读写锁使用sync.RWMutex实现读写锁允许多个读操作同时进行。package main import ( fmt sync time ) var ( data int rwMutex sync.RWMutex ) func readData(wg *sync.WaitGroup) { defer wg.Done() rwMutex.RLock() defer rwMutex.RUnlock() fmt.Printf(Read data: %d\n, data) time.Sleep(100 * time.Millisecond) } func writeData(wg *sync.WaitGroup, value int) { defer wg.Done() rwMutex.Lock() defer rwMutex.Unlock() data value fmt.Printf(Write data: %d\n, data) time.Sleep(200 * time.Millisecond) } func main() { var wg sync.WaitGroup // 启动5个读goroutine for i : 0; i 5; i { wg.Add(1) go readData(wg) } // 启动2个写goroutine for i : 0; i 2; i { wg.Add(1) go writeData(wg, i) } wg.Wait() fmt.Println(All operations completed) }6. 原子操作6.1 基本使用使用sync/atomic包进行原子操作。package main import ( fmt sync sync/atomic ) var counter int64 func increment(wg *sync.WaitGroup) { defer wg.Done() atomic.AddInt64(counter, 1) } func main() { var wg sync.WaitGroup for i : 0; i 1000; i { wg.Add(1) go increment(wg) } wg.Wait() fmt.Printf(Final counter: %d\n, atomic.LoadInt64(counter)) }6.2 原子操作类型package main import ( fmt sync sync/atomic ) var ( counter int64 flag uint32 ) func main() { var wg sync.WaitGroup // 原子加法 atomic.AddInt64(counter, 5) fmt.Printf(Counter after add: %d\n, atomic.LoadInt64(counter)) // 原子比较交换 old : atomic.SwapInt64(counter, 10) fmt.Printf(Old counter: %d, New counter: %d\n, old, atomic.LoadInt64(counter)) // 原子比较并交换 swapped : atomic.CompareAndSwapInt64(counter, 10, 15) fmt.Printf(Swapped: %t, Counter: %d\n, swapped, atomic.LoadInt64(counter)) // 原子布尔操作 atomic.StoreUint32(flag, 1) fmt.Printf(Flag: %t\n, atomic.LoadUint32(flag) 1) swapped atomic.CompareAndSwapUint32(flag, 1, 0) fmt.Printf(Swapped flag: %t, Flag: %t\n, swapped, atomic.LoadUint32(flag) 1) }7. 并发模式7.1 生产者-消费者模式package main import ( fmt sync time ) func producer(ch chan- int, wg *sync.WaitGroup) { defer wg.Done() for i : 1; i 10; i { ch - i fmt.Printf(Produced: %d\n, i) time.Sleep(500 * time.Millisecond) } close(ch) } func consumer(ch -chan int, wg *sync.WaitGroup, id int) { defer wg.Done() for value : range ch { fmt.Printf(Consumer %d received: %d\n, id, value) time.Sleep(1 * time.Second) } } func main() { ch : make(chan int, 5) var wg sync.WaitGroup // 启动生产者 wg.Add(1) go producer(ch, wg) // 启动消费者 for i : 1; i 2; i { wg.Add(1) go consumer(ch, wg, i) } wg.Wait() fmt.Println(All done) }7.2 工作池模式package main import ( fmt sync time ) func worker(id int, jobs -chan int, results chan- int, wg *sync.WaitGroup) { defer wg.Done() for job : range jobs { fmt.Printf(Worker %d processing job %d\n, id, job) time.Sleep(1 * time.Second) results - job * 2 } } func main() { const numJobs 10 const numWorkers 3 jobs : make(chan int, numJobs) results : make(chan int, numJobs) var wg sync.WaitGroup // 启动工作协程 for w : 1; w numWorkers; w { wg.Add(1) go worker(w, jobs, results, wg) } // 发送任务 for j : 1; j numJobs; j { jobs - j } close(jobs) // 收集结果 go func() { wg.Wait() close(results) }() for result : range results { fmt.Printf(Result: %d\n, result) } fmt.Println(All jobs completed) }8. 并发安全8.1 并发安全的数据结构package main import ( fmt sync ) type SafeMap struct { data map[string]int mutex sync.RWMutex } func NewSafeMap() *SafeMap { return SafeMap{ data: make(map[string]int), } } func (sm *SafeMap) Set(key string, value int) { sm.mutex.Lock() defer sm.mutex.Unlock() sm.data[key] value } func (sm *SafeMap) Get(key string) (int, bool) { sm.mutex.RLock() defer sm.mutex.RUnlock() value, ok : sm.data[key] return value, ok } func (sm *SafeMap) Delete(key string) { sm.mutex.Lock() defer sm.mutex.Unlock() delete(sm.data, key) } func main() { sm : NewSafeMap() var wg sync.WaitGroup // 并发写入 for i : 0; i 100; i { wg.Add(1) go func(i int) { defer wg.Done() sm.Set(fmt.Sprintf(key%d, i), i) }(i) } // 并发读取 for i : 0; i 50; i { wg.Add(1) go func(i int) { defer wg.Done() value, ok : sm.Get(fmt.Sprintf(key%d, i)) if ok { fmt.Printf(key%d: %d\n, i, value) } }(i) } wg.Wait() fmt.Println(All operations completed) }8.2 避免竞争条件package main import ( fmt sync ) // 不安全的实现 var counter int func incrementUnsafe(wg *sync.WaitGroup) { defer wg.Done() counter // 非原子操作可能导致竞争条件 } // 安全的实现 var safeCounter int64 var mutex sync.Mutex func incrementSafe(wg *sync.WaitGroup) { defer wg.Done() mutex.Lock() safeCounter mutex.Unlock() } func main() { var wg sync.WaitGroup // 测试不安全的实现 counter 0 for i : 0; i 1000; i { wg.Add(1) go incrementUnsafe(wg) } wg.Wait() fmt.Printf(Unsafe counter: %d\n, counter) // 测试安全的实现 safeCounter 0 for i : 0; i 1000; i { wg.Add(1) go incrementSafe(wg) } wg.Wait() fmt.Printf(Safe counter: %d\n, safeCounter) }9. 并发最佳实践9.1 避免共享状态使用channel传递数据尽量使用channel在goroutine之间传递数据而不是共享内存最小化锁的范围只在必要时使用锁并且锁的范围要尽可能小使用原子操作对于简单的计数操作使用原子操作代替锁9.2 合理控制goroutine数量使用工作池限制并发goroutine的数量避免创建过多goroutine过多的goroutine会消耗系统资源9.3 错误处理使用context使用context控制goroutine的生命周期处理panic在goroutine中使用recover捕获panicpackage main import ( fmt sync ) func safeGoroutine(wg *sync.WaitGroup) { defer wg.Done() defer func() { if r : recover(); r ! nil { fmt.Printf(Recovered from panic: %v\n, r) } }() // 可能会panic的代码 fmt.Println(1 / 0) } func main() { var wg sync.WaitGroup wg.Add(1) go safeGoroutine(wg) wg.Wait() fmt.Println(Main continues) }9.4 性能优化使用缓冲channel对于生产者-消费者模式使用缓冲channel可以提高性能避免goroutine泄漏确保所有goroutine都能正常结束使用runtime.GOMAXPROCS根据CPU核心数调整GOMAXPROCS10. 总结Go语言的并发模型是其最强大的特性之一通过goroutine和channelGo语言提供了一种简洁而强大的并发编程方式。掌握并发编程技术合理使用goroutine、channel、锁和原子操作可以构建高效、可靠的并发应用。同时遵循并发最佳实践避免共享状态、合理控制goroutine数量、正确处理错误都是保证并发代码质量的重要因素。11. 参考资料Go Concurrency PatternsThe Go Programming Language SpecificationGo by Example: ChannelsGo by Example: Goroutines

更多文章