Go并发模式进阶
Go语言的并发模型是其最强大的特性之一。本文将深入探讨Go的高级并发模式,帮助您构建高性能、高可靠性的分布式系统。
一、扇出/扇入模式
1. 基础实现
func fanOutFanIn() {
// 扇出:一个输入channel分发给多个worker
in := genNumbers(10) // 生成1-10的数字
// 启动3个worker处理
c1 := square(in)
c2 := square(in)
c3 := square(in)
// 扇入:合并多个channel的结果
for n := range merge(c1, c2, c3) {
fmt.Println(n) // 输出平方结果
}
}
func genNumbers(n int) <-chan int {
out := make(chan int)
go func() {
for i := 1; i <= n; i++ {
out <- i
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
2. 带限流的实现
func fanOutWithThrottle() {
in := genNumbers(100) // 生成100个数字
// 限制最多5个并发worker
var wg sync.WaitGroup
workerCount := 5
wg.Add(workerCount)
out := make(chan int)
for i := 0; i < workerCount; i++ {
go func() {
defer wg.Done()
for n := range in {
out <- n * n // 处理结果
}
}()
}
go func() {
wg.Wait()
close(out)
}()
for result := range out {
fmt.Println(result)
}
}
二、顺序一致性保证
1. 内存屏障使用
var (
data int
ready bool
)
func sequentialConsistency() {
go func() { // Writer goroutine
data = 42 // 写操作1
// 使用atomic作为内存屏障
atomic.StoreInt32((*int32)(unsafe.Pointer(&ready)), 1) // 写操作2
}()
go func() { // Reader goroutine
// 使用atomic检查ready标志
for atomic.LoadInt32((*int32)(unsafe.Pointer(&ready))) == 0 {
runtime.Gosched() // 让出CPU
}
fmt.Println(data) // 保证会读到42
}()
time.Sleep(time.Second)
}
2. Sync.Map保证一致性
func syncMapExample() {
var m sync.Map
// 存储
m.Store("key", "value")
// 加载,保证原子性
if val, ok := m.Load("key"); ok {
fmt.Println(val) // 输出"value"
}
// 删除
m.Delete("key")
// LoadOrStore
actual, loaded := m.LoadOrStore("key", "new value")
if !loaded {
fmt.Println("inserted new value:", actual)
}
}
三、分布式锁实现
1. 基于Redis的实现
type RedisLock struct {
client *redis.Client
key string
value string // 唯一标识
expiration time.Duration
}
func NewRedisLock(client *redis.Client, key string, exp time.Duration) *RedisLock {
return &RedisLock{
client: client,
key: key,
value: uuid.New().String(),
expiration: exp,
}
}
func (l *RedisLock) Lock(ctx context.Context) (bool, error) {
return l.client.SetNX(ctx, l.key, l.value, l.expiration).Result()
}
func (l *RedisLock) Unlock(ctx context.Context) error {
script := `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end`
_, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Result()
return err
}
2. 基于etcd的实现
type EtcdLock struct {
client *clientv3.Client
lease clientv3.Lease
key string
cancel context.CancelFunc
}
func NewEtcdLock(cli *clientv3.Client, key string) *EtcdLock {
return &EtcdLock{
client: cli,
key: fmt.Sprintf("/locks/%s", key),
}
}
func (l *EtcdLock) Lock(ctx context.Context, ttl int64) error {
// 创建租约
lease := clientv3.NewLease(l.client)
grantResp, err := lease.Grant(ctx, ttl)
if err != nil {
return err
}
// 自动续约
ctx, cancel := context.WithCancel(ctx)
keepAliveChan, err := lease.KeepAlive(ctx, grantResp.ID)
if err != nil {
cancel()
return err
}
// 竞争锁
txn := l.client.Txn(ctx).
If(clientv3.Compare(clientv3.CreateRevision(l.key), "=", 0)).
Then(clientv3.OpPut(l.key, "", clientv3.WithLease(grantResp.ID))).
Else(clientv3.OpGet(l.key))
txnResp, err := txn.Commit()
if err != nil {
cancel()
return err
}
if !txnResp.Succeeded {
return fmt.Errorf("lock is held by others")
}
l.lease = lease
l.cancel = cancel
go func() {
for range keepAliveChan {
// 保持租约活跃
}
}()
return nil
}
func (l *EtcdLock) Unlock() {
if l.cancel != nil {
l.cancel()
}
}
四、Leader选举模式
1. 基于etcd的选举
func etcdLeaderElection() {
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"localhost:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatal(err)
}
defer cli.Close()
// 参与选举
election := concurrency.NewElection(cli, "/my-election/")
ctx := context.Background()
go func() {
if err := election.Campaign(ctx, "node1"); err != nil {
log.Println("campaign error:", err)
}
}()
// 监听leader变更
ch := election.Observe(ctx)
for resp := range ch {
log.Println("new leader:", string(resp.Kvs[0].Value))
}
}
2. 主从切换处理
func handleLeadership(isLeader <-chan bool) {
for leader := range isLeader {
if leader {
log.Println("I am now the leader")
go doLeaderWork()
} else {
log.Println("I am now a follower")
stopLeaderWork()
}
}
}
func doLeaderWork() {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
log.Println("leader is working...")
}
}
func stopLeaderWork() {
// 清理leader相关资源
}
五、Backpressure控制
1. 滑动窗口限流
type SlidingWindow struct {
windowSize int64 // 窗口大小(毫秒)
limit int // 限制数量
mu sync.Mutex
timestamps []int64 // 请求时间记录
}
func NewSlidingWindow(windowSize time.Duration, limit int) *SlidingWindow {
return &SlidingWindow{
windowSize: windowSize.Milliseconds(),
limit: limit,
}
}
func (sw *SlidingWindow) Allow() bool {
now := time.Now().UnixMilli()
sw.mu.Lock()
defer sw.mu.Unlock()
// 移除窗口外的记录
i := 0
for ; i < len(sw.timestamps); i++ {
if now-sw.timestamps[i] < sw.windowSize {
break
}
}
sw.timestamps = sw.timestamps[i:]
// 检查是否超过限制
if len(sw.timestamps) >= sw.limit {
return false
}
sw.timestamps = append(sw.timestamps, now)
return true
}
2. 自适应限流算法
type AdaptiveLimiter struct {
maxInflight int32 // 最大并发数
currentInflight int32 // 当前并发数
lastErrorTime time.Time // 最后错误时间
lastRefused time.Time // 最后拒绝时间
}
func (al *AdaptiveLimiter) TryAcquire() bool {
current := atomic.LoadInt32(&al.currentInflight)
max := atomic.LoadInt32(&al.maxInflight)
if current >= max {
if time.Since(al.lastRefused) > time.Second {
// 尝试减少并发数
atomic.CompareAndSwapInt32(&al.maxInflight, max, max-1)
al.lastRefused = time.Now()
}
return false
}
atomic.AddInt32(&al.currentInflight, 1)
return true
}
func (al *AdaptiveLimiter) Release(success bool) {
if success {
max := atomic.LoadInt32(&al.maxInflight)
if time.Since(al.lastErrorTime) > 10*time.Second {
// 稳定情况下适度增加并发数
atomic.CompareAndSwapInt32(&al.maxInflight, max, max+1)
}
} else {
al.lastErrorTime = time.Now()
}
atomic.AddInt32(&al.currentInflight, -1)
}
六、容错模式实现
1. Circuit Breaker实现
type CircuitBreaker struct {
failureThreshold int
successThreshold int
timeout time.Duration
mu sync.Mutex
failures int
successes int
lastAttempt time.Time
state string // closed, open, half-open
}
func (cb *CircuitBreaker) Execute(action func() error) error {
cb.mu.Lock()
switch cb.state {
case "open":
if time.Since(cb.lastAttempt) < cb.timeout {
cb.mu.Unlock()
return fmt.Errorf("circuit breaker is open")
}
cb.state = "half-open"
case "closed":
// 正常执行
}
cb.mu.Unlock()
err := action()
cb.mu.Lock()
defer cb.mu.Unlock()
if err != nil {
if cb.state == "half-open" {
// 半开状态下失败直接转回开状态
cb.state = "open"
cb.lastAttempt = time.Now()
return err
}
cb.failures++
if cb.failures >= cb.failureThreshold {
cb.state = "open"
cb.lastAttempt = time.Now()
}
return err
}
// 执行成功
if cb.state == "half-open" {
cb.successes++
if cb.successes >= cb.successThreshold {
cb.state = "closed"
cb.failures = 0
cb.successes = 0
}
} else {
cb.failures = 0
}
return nil
}
预告:Go错误处理最佳实践
在掌握了高级并发模式之后,下一期我们将深入探讨Go语言中错误处理的工程实践:
《Go错误处理最佳实践》内容预告:
- 错误包装与解包:errors.Is与errors.As的使用场景
- 错误分级策略:致命错误与可恢复错误的区分处理
- 错误日志记录:上下文信息的结构化记录方法
- 错误追踪整合:OpenTelemetry与错误追踪的结合
- 错误恢复模式:panic/recover的合理使用边界
- 错误处理工具链:静态分析工具与lint检查
通过这些错误处理技巧的学习,您将能够构建更加健壮、更易维护的Go应用程序!