golang带宽限制
基于golang实现的网络带宽限制器
介绍
很多客户使用多中心部署的时候副中心和主中心之间都是走专线的,专线的带宽是有限的,很多服务共享此带宽。为了防止一个服务把带宽占尽而导致其它服务无法工作,这就要求服务有带宽限制的功能。本文基于golang的time/rate模块实现了一个线程安全的带宽限制器,该带宽限制器本质上还是基于Token Bucket(令牌桶)实现的。下面提供源码以供大家参考。
import (
"context"
"fmt"
"golang.org/x/time/rate"
"sync/atomic"
"unsafe"
)
type BandwidthLimiter struct {
limiter *rate.Limiter
ctx context.Context
cancel context.CancelFunc
bandwidth int
}
func NewBandwidthLimiter(bandwidth int) (*BandwidthLimiter, error) {
if bandwidth < 0 {
return nil, fmt.Errorf("invalid argument bandwidth %d", bandwidth)
}
limiter := rate.NewLimiter(rate.Limit(bandwidth), bandwidth)
ctx, cancel := context.WithCancel(context.Background())
return &BandwidthLimiter{
limiter: limiter,
ctx: ctx,
cancel: cancel,
bandwidth: bandwidth,
}, nil
}
func (bl *BandwidthLimiter) Wait(num int) error {
bandwidth := bl.bandwidth
if bandwidth == 0 || num <= 0 {
return nil
}
if num <= bandwidth {
return bl.limiter.WaitN(bl.ctx, num)
} else {
for i := 0; i < num/bandwidth; i++ {
bl.limiter.WaitN(bl.ctx, bandwidth)
}
return bl.limiter.WaitN(bl.ctx, num%bandwidth)
}
}
func (bl *BandwidthLimiter) UpdateBandwidth(bandwidth int) error {
if bandwidth < 0 {
return fmt.Errorf("invalid argument bandwidth %d", bandwidth)
}
bl.limiter.SetLimit(rate.Limit(bandwidth))
bl.limiter.SetBurst(bandwidth)
atomic.StoreInt32((*int32)(unsafe.Pointer(&bl.bandwidth)), int32(bandwidth))
return nil
}
func (bl *BandwidthLimiter) Stop() {
bl.cancel()
}
使用
使用方法如下,在Write发送数据之前或者Read接收数据之后,调用BandwidthLimiter的Wait方法即可实现限速。虽然此方案在1s内可能会有burst,但是1s内的平均速率不会超过限额,这种精度基本上够用了。
func witeWithLimiter(con net.Conn, limiter *BandwidthLimiter, data []byte) (int, error) {
if limiter != nil {
if err := limiter.Wait(len(data)); err != nil {
return 0, err
}
}
return con.Write(data)
}