Go并发模式进阶

Go并发模式进阶

Go语言的并发模型是其最强大的特性之一。本文将深入探讨Go的高级并发模式,帮助您构建高性能、高可靠性的分布式系统。

一、扇出/扇入模式

1. 基础实现

func fanOutFanIn() {
    // 扇出:一个输入channel分发给多个worker
    in := genNumbers(10) // 生成1-10的数字

    // 启动3个worker处理
    c1 := square(in)
    c2 := square(in)
    c3 := square(in)

    // 扇入:合并多个channel的结果
    for n := range merge(c1, c2, c3) {
        fmt.Println(n) // 输出平方结果
    }
}

func genNumbers(n int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 1; i <= n; i++ {
            out <- i
        }
        close(out)
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }

    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

2. 带限流的实现

func fanOutWithThrottle() {
    in := genNumbers(100) // 生成100个数字

    // 限制最多5个并发worker
    var wg sync.WaitGroup
    workerCount := 5
    wg.Add(workerCount)

    out := make(chan int)
    for i := 0; i < workerCount; i++ {
        go func() {
            defer wg.Done()
            for n := range in {
                out <- n * n // 处理结果
            }
        }()
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    for result := range out {
        fmt.Println(result)
    }
}

二、顺序一致性保证

1. 内存屏障使用

var (
    data  int
    ready bool
)

func sequentialConsistency() {
    go func() { // Writer goroutine
        data = 42 // 写操作1

        // 使用atomic作为内存屏障
        atomic.StoreInt32((*int32)(unsafe.Pointer(&ready)), 1) // 写操作2
    }()

    go func() { // Reader goroutine
        // 使用atomic检查ready标志
        for atomic.LoadInt32((*int32)(unsafe.Pointer(&ready))) == 0 {
            runtime.Gosched() // 让出CPU
        }
        fmt.Println(data) // 保证会读到42
    }()

    time.Sleep(time.Second)
}

2. Sync.Map保证一致性

func syncMapExample() {
    var m sync.Map

    // 存储
    m.Store("key", "value")

    // 加载,保证原子性
    if val, ok := m.Load("key"); ok {
        fmt.Println(val) // 输出"value"
    }

    // 删除
    m.Delete("key")

    // LoadOrStore
    actual, loaded := m.LoadOrStore("key", "new value")
    if !loaded {
        fmt.Println("inserted new value:", actual)
    }
}

三、分布式锁实现

1. 基于Redis的实现

type RedisLock struct {
    client     *redis.Client
    key        string
    value      string // 唯一标识
    expiration time.Duration
}

func NewRedisLock(client *redis.Client, key string, exp time.Duration) *RedisLock {
    return &RedisLock{
        client:     client,
        key:        key,
        value:      uuid.New().String(),
        expiration: exp,
    }
}

func (l *RedisLock) Lock(ctx context.Context) (bool, error) {
    return l.client.SetNX(ctx, l.key, l.value, l.expiration).Result()
}

func (l *RedisLock) Unlock(ctx context.Context) error {
    script := `
    if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
    else
        return 0
    end`

    _, err := l.client.Eval(ctx, script, []string{l.key}, l.value).Result()
    return err
}

2. 基于etcd的实现

type EtcdLock struct {
    client *clientv3.Client
    lease  clientv3.Lease
    key    string
    cancel context.CancelFunc
}

func NewEtcdLock(cli *clientv3.Client, key string) *EtcdLock {
    return &EtcdLock{
        client: cli,
        key:    fmt.Sprintf("/locks/%s", key),
    }
}

func (l *EtcdLock) Lock(ctx context.Context, ttl int64) error {
    // 创建租约
    lease := clientv3.NewLease(l.client)
    grantResp, err := lease.Grant(ctx, ttl)
    if err != nil {
        return err
    }

    // 自动续约
    ctx, cancel := context.WithCancel(ctx)
    keepAliveChan, err := lease.KeepAlive(ctx, grantResp.ID)
    if err != nil {
        cancel()
        return err
    }

    // 竞争锁
    txn := l.client.Txn(ctx).
        If(clientv3.Compare(clientv3.CreateRevision(l.key), "=", 0)).
        Then(clientv3.OpPut(l.key, "", clientv3.WithLease(grantResp.ID))).
        Else(clientv3.OpGet(l.key))

    txnResp, err := txn.Commit()
    if err != nil {
        cancel()
        return err
    }

    if !txnResp.Succeeded {
        return fmt.Errorf("lock is held by others")
    }

    l.lease = lease
    l.cancel = cancel

    go func() {
        for range keepAliveChan {
            // 保持租约活跃
        }
    }()

    return nil
}

func (l *EtcdLock) Unlock() {
    if l.cancel != nil {
        l.cancel()
    }
}

四、Leader选举模式

1. 基于etcd的选举

func etcdLeaderElection() {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   []string{"localhost:2379"},
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    // 参与选举
    election := concurrency.NewElection(cli, "/my-election/")
    ctx := context.Background()

    go func() {
        if err := election.Campaign(ctx, "node1"); err != nil {
            log.Println("campaign error:", err)
        }
    }()

    // 监听leader变更
    ch := election.Observe(ctx)
    for resp := range ch {
        log.Println("new leader:", string(resp.Kvs[0].Value))
    }
}

2. 主从切换处理

func handleLeadership(isLeader <-chan bool) {
    for leader := range isLeader {
        if leader {
            log.Println("I am now the leader")
            go doLeaderWork()
        } else {
            log.Println("I am now a follower")
            stopLeaderWork()
        }
    }
}

func doLeaderWork() {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        log.Println("leader is working...")
    }
}

func stopLeaderWork() {
    // 清理leader相关资源
}

五、Backpressure控制

1. 滑动窗口限流

type SlidingWindow struct {
    windowSize int64 // 窗口大小(毫秒)
    limit      int   // 限制数量
    mu         sync.Mutex
    timestamps []int64 // 请求时间记录
}

func NewSlidingWindow(windowSize time.Duration, limit int) *SlidingWindow {
    return &SlidingWindow{
        windowSize: windowSize.Milliseconds(),
        limit:      limit,
    }
}

func (sw *SlidingWindow) Allow() bool {
    now := time.Now().UnixMilli()
    sw.mu.Lock()
    defer sw.mu.Unlock()

    // 移除窗口外的记录
    i := 0
    for ; i < len(sw.timestamps); i++ {
        if now-sw.timestamps[i] < sw.windowSize {
            break
        }
    }
    sw.timestamps = sw.timestamps[i:]

    // 检查是否超过限制
    if len(sw.timestamps) >= sw.limit {
        return false
    }

    sw.timestamps = append(sw.timestamps, now)
    return true
}

2. 自适应限流算法

type AdaptiveLimiter struct {
    maxInflight    int32 // 最大并发数
    currentInflight int32 // 当前并发数
    lastErrorTime  time.Time // 最后错误时间
    lastRefused    time.Time // 最后拒绝时间
}

func (al *AdaptiveLimiter) TryAcquire() bool {
    current := atomic.LoadInt32(&al.currentInflight)
    max := atomic.LoadInt32(&al.maxInflight)

    if current >= max {
        if time.Since(al.lastRefused) > time.Second {
            // 尝试减少并发数
            atomic.CompareAndSwapInt32(&al.maxInflight, max, max-1)
            al.lastRefused = time.Now()
        }
        return false
    }

    atomic.AddInt32(&al.currentInflight, 1)
    return true
}

func (al *AdaptiveLimiter) Release(success bool) {
    if success {
        max := atomic.LoadInt32(&al.maxInflight)
        if time.Since(al.lastErrorTime) > 10*time.Second {
            // 稳定情况下适度增加并发数
            atomic.CompareAndSwapInt32(&al.maxInflight, max, max+1)
        }
    } else {
        al.lastErrorTime = time.Now()
    }
    atomic.AddInt32(&al.currentInflight, -1)
}

六、容错模式实现

1. Circuit Breaker实现

type CircuitBreaker struct {
    failureThreshold int
    successThreshold int
    timeout          time.Duration

    mu           sync.Mutex
    failures     int
    successes    int
    lastAttempt  time.Time
    state        string // closed, open, half-open
}

func (cb *CircuitBreaker) Execute(action func() error) error {
    cb.mu.Lock()

    switch cb.state {
    case "open":
        if time.Since(cb.lastAttempt) < cb.timeout {
            cb.mu.Unlock()
            return fmt.Errorf("circuit breaker is open")
        }
        cb.state = "half-open"
    case "closed":
        // 正常执行
    }
    cb.mu.Unlock()

    err := action()

    cb.mu.Lock()
    defer cb.mu.Unlock()

    if err != nil {
        if cb.state == "half-open" {
            // 半开状态下失败直接转回开状态
            cb.state = "open"
            cb.lastAttempt = time.Now()
            return err
        }

        cb.failures++
        if cb.failures >= cb.failureThreshold {
            cb.state = "open"
            cb.lastAttempt = time.Now()
        }
        return err
    }

    // 执行成功
    if cb.state == "half-open" {
        cb.successes++
        if cb.successes >= cb.successThreshold {
            cb.state = "closed"
            cb.failures = 0
            cb.successes = 0
        }
    } else {
        cb.failures = 0
    }

    return nil
}

预告:Go错误处理最佳实践

在掌握了高级并发模式之后,下一期我们将深入探讨Go语言中错误处理的工程实践:

《Go错误处理最佳实践》内容预告:

  • 错误包装与解包:errors.Is与errors.As的使用场景
  • 错误分级策略:致命错误与可恢复错误的区分处理
  • 错误日志记录:上下文信息的结构化记录方法
  • 错误追踪整合:OpenTelemetry与错误追踪的结合
  • 错误恢复模式:panic/recover的合理使用边界
  • 错误处理工具链:静态分析工具与lint检查

通过这些错误处理技巧的学习,您将能够构建更加健壮、更易维护的Go应用程序!

暂无评论

发送评论 编辑评论


				
|´・ω・)ノ
ヾ(≧∇≦*)ゝ
(☆ω☆)
(╯‵□′)╯︵┴─┴
 ̄﹃ ̄
(/ω\)
∠( ᐛ 」∠)_
(๑•̀ㅁ•́ฅ)
→_→
୧(๑•̀⌄•́๑)૭
٩(ˊᗜˋ*)و
(ノ°ο°)ノ
(´இ皿இ`)
⌇●﹏●⌇
(ฅ´ω`ฅ)
(╯°A°)╯︵○○○
φ( ̄∇ ̄o)
ヾ(´・ ・`。)ノ"
( ง ᵒ̌皿ᵒ̌)ง⁼³₌₃
(ó﹏ò。)
Σ(っ °Д °;)っ
( ,,´・ω・)ノ"(´っω・`。)
╮(╯▽╰)╭
o(*////▽////*)q
>﹏<
( ๑´•ω•) "(ㆆᴗㆆ)
😂
😀
😅
😊
🙂
🙃
😌
😍
😘
😜
😝
😏
😒
🙄
😳
😡
😔
😫
😱
😭
💩
👻
🙌
🖕
👍
👫
👬
👭
🌚
🌝
🙈
💊
😶
🙏
🍦
🍉
😣
Source: github.com/k4yt3x/flowerhd
颜文字
Emoji
小恐龙
花!
上一篇
下一篇