golang带宽限制

基于golang实现的网络带宽限制器

介绍

很多客户使用多中心部署的时候副中心和主中心之间都是走专线的,专线的带宽是有限的,很多服务共享此带宽。为了防止一个服务把带宽占尽而导致其它服务无法工作,这就要求服务有带宽限制的功能。本文基于golang的time/rate模块实现了一个线程安全的带宽限制器,该带宽限制器本质上还是基于Token Bucket(令牌桶)实现的。下面提供源码以供大家参考。

import (
	"context"
	"fmt"
	"golang.org/x/time/rate"
	"sync/atomic"
	"unsafe"
)

type BandwidthLimiter struct {
	limiter   *rate.Limiter
	ctx       context.Context
	cancel    context.CancelFunc
	bandwidth int
}

func NewBandwidthLimiter(bandwidth int) (*BandwidthLimiter, error) {
	if bandwidth < 0 {
		return nil, fmt.Errorf("invalid argument bandwidth %d", bandwidth)
	}

	limiter := rate.NewLimiter(rate.Limit(bandwidth), bandwidth)
	ctx, cancel := context.WithCancel(context.Background())
	return &BandwidthLimiter{
		limiter:   limiter,
		ctx:       ctx,
		cancel:    cancel,
		bandwidth: bandwidth,
	}, nil
}

func (bl *BandwidthLimiter) Wait(num int) error {
	bandwidth := bl.bandwidth
	if bandwidth == 0 || num <= 0 {
		return nil
	}

	if num <= bandwidth {
		return bl.limiter.WaitN(bl.ctx, num)
	} else {
		for i := 0; i < num/bandwidth; i++ {
			bl.limiter.WaitN(bl.ctx, bandwidth)
		}
		return bl.limiter.WaitN(bl.ctx, num%bandwidth)
	}
}

func (bl *BandwidthLimiter) UpdateBandwidth(bandwidth int) error {
	if bandwidth < 0 {
		return fmt.Errorf("invalid argument bandwidth %d", bandwidth)
	}

	bl.limiter.SetLimit(rate.Limit(bandwidth))
	bl.limiter.SetBurst(bandwidth)
	atomic.StoreInt32((*int32)(unsafe.Pointer(&bl.bandwidth)), int32(bandwidth))
	return nil
}

func (bl *BandwidthLimiter) Stop() {
	bl.cancel()
}

使用

使用方法如下,在Write发送数据之前或者Read接收数据之后,调用BandwidthLimiter的Wait方法即可实现限速。虽然此方案在1s内可能会有burst,但是1s内的平均速率不会超过限额,这种精度基本上够用了。

func witeWithLimiter(con net.Conn, limiter *BandwidthLimiter, data []byte) (int, error) {
	if limiter != nil {
		if err := limiter.Wait(len(data)); err != nil {
			return 0, err
		}
	}

	return con.Write(data)
}
Warsky007
Warsky007
程序猿

技术栈为C/Go, 专注于采集,数据库,云原生方面的工作