分布式id生成器

在分布式场景下,唯一ID的生成更为重要。

通常在高并发场景下,需要像MySQL自增id一样持续增长且不重复的id,即MySql的主键id。

例如,电商企业在618或双11举办活动时,通常从0点开始,会写入数千万到数亿的订单,每秒大约需要处理10万个订单。

在将订单插入数据库之前,我们需要在业务中给订单一个唯一的ID,即使用唯一的订单号,然后将其插入数据库。 如果生成的ID是随机的、无意义的纯数字,那么在订单量较大的情况下,在数据库增删改查时,将无法提高效率。 因此,这个ID应该包含一些时间信息、机器信息等,这样即使后端系统将消息分库分表,这些消息也能按照时间顺序排序。

比较典型的是 的【雪花算法】,可以看作是上述场景下的更优解。 原理如图:

首先要确定的是id值的长度是64位,类型是int64。 除开头的符号位外,其余可分为四部分:

上述机制生成的ID可以支持一台机器在一毫秒内生成4096条消息。 即每秒总共409.6w条消息。 仅就取值范围而言,是完全足够的。

数据中心ID加实例ID共10位。 每个数据中心可以部署32个实例,建设32个数据中心,所以总共可以部署1024个实例。

41 位时间戳(以毫秒为单位)可持续 69 年。

如何分配

(时间戳)、(数据中心)、(机器ID)和(序列号)这四个字段中,是由程序在运行时生成的。 但需要在部署阶段获取,并且一旦程序启动后就无法更改,因为如果可以随意更改,可能会导致最终生成的ID发生冲突。

不过,一般不同数据中心的机器都会提供相应的API来获取数据中心ID,所以我们在部署阶段就可以轻松获取。 它是我们逻辑上分配给机器的 ID。 一个比较简单的做法是用一个可以提供这种自增ID功能的工具来支持它,比如MySql:

mysql> insert into a (ip) values("10.115.4.66");
Query OK, 1 row affected (0.00 sec)
mysql> select last_insert_id();
+------------------+
| last_insert_id() |
+------------------+
|                2 |
+------------------+
1 row in set (0.00 sec)
复制代码

从MySql获取后,直接持久化到本地,避免每次上线都需要重新获取。 让单实例始终保持不变。

然而,使用MySQL相当于给id生成服务添加了外部依赖。 当然,依赖越多,服务运维成本就会越高。

考虑到集群中即使有单实例的ID生成服务,在一段时间内也会丢失部分ID,所以我们也可以更简单暴力一些,直接写在配置中。 上线后,部署脚本将完成现场替换。 就是这样。

开源示例:标准雪花算法

// 这是一个相对轻量级的 Go 实现。 其文档的定义供大家使用,如下图:

这个库完全符合标准实现,使用起来也比较简单。 只需上传示例代码:

package main
import (
  "fmt"
 "github.com/bwmarrin/snowflake"
)
func main() {
 node, err := snowflake.NewNode(1)
 if err != nil {
  println(err.Error())
  os.Exit(1)
 }
 for i := 0; i < 20; i++ {
  id := node.Generate()
  fmt.Printf("Int64  ID: %d\n", id)
  fmt.Printf("String ID: %s\n", id)
  fmt.Printf("ID Time  : %d\n", id.Time())
  fmt.Printf("ID Node  : %d\n", id.Node())
  fmt.Printf("ID Step  : %d\n", id.Step())
  fmt.Println("--------- end ----------")
 }
}
复制代码

分布式锁

当独立程序并发或并行修改全局共享变量时,需要锁定修改行为。 因为如果没有锁,多个协程就会竞争变量,结果就会不准确,或者不是我们期望的结果,比如下面的例子:

package main
func main() {
 var wg sync.WaitGroup
 var count = 0
 for i := 1; i < 1000; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()
   count++
  }()
 }
 wg.Wait()
 fmt.Println(count)
}
复制代码

多次运行的结果是不同的:

➜  go run main.go
884
➜  go run main.go
957
➜  go run main.go
923
复制代码

预期结果是:999

进程内锁定

而如果想要得到正确(预期)的结果,则需要锁定计数器的操作代码部分:

package main
import (
 "fmt"
 "sync"
)
func main() {
 var wg sync.WaitGroup
 var lock sync.Mutex
 var count = 0
 for i := 1; i < 1000; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()
   lock.Lock()  // 加锁
   count++
   lock.Unlock()  // 释放锁
  }()
 }
 wg.Wait()
 fmt.Println(count)
}
复制代码

这会给你正确的结果:

➜  go run main.go
999
复制代码

尝试锁定

在某些场景中,我们通常只希望一个任务有一个执行器,这与计数器不同,计数器中所有执行器都执行成功。 随后,抢锁失败后wg999,需要放弃执行。 这时候就需要尝试加锁,即实现。

尝试加锁,加锁成功后执行后续流程。 如果失败,则不能阻塞,而是直接返回锁定结果。

在Go语言中,可以用大小1来模拟:

package main
import (
  "fmt"
  "sync"
)
type MyLock struct {
 lockCh chan struct{}
}
func NewLock() MyLock {
 var myLock MyLock
 myLock = MyLock{
  lockCh:make(chan struct{}, 1),
 }
 myLock.lockCh <- struct{}{}
 return myLock
}
func (l *MyLock) Lock() bool {
 result := false
 select {
 case <-l.lockCh:
  result = true
 default:  // 这里去掉就会阻塞,直到获取到锁
 }
 return result
}
func (l *MyLock) Unlock() {
 l.lockCh <- struct{}{}
}
func main() {
 var wg sync.WaitGroup
 var count int
 l := NewLock()
 for i := 0; i < 10; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()
   if !l.Lock() {
    fmt.Println("get lock failed")
    return
   }
   count++
   fmt.Println("count=", count)
   l.Unlock()
  }()
 }
 wg.Wait()
}
复制代码

只有获取到锁(Lock执行成功),各个程序才会继续执行后续代码。 然后(),就可以保证Lock结构一定为空,这样就不会阻塞或者失败。

在单机系统中,这并不是一个好的选择,因为大量的抢锁会无意义地占用CPU资源。 这就是所谓的活锁,所以不建议使用这种锁。

基于Redis的Setnx分布式锁

在分布式场景中,还需要“抢占”逻辑,可以使用Redis的setnx来实现:

package main
import (
 "github.com/go-redis/redis"
 "sync"
 "time"
)
func setnx() {
 client := redis.NewClient(&redis.Options{})
 var lockKey = "counter_lock"
 var counterKey = "counter"
 // lock
 resp := client.SetNX(lockKey, 1, time.Second*6)
 lockStatus, err := resp.Result()
 if err != nil || !lockStatus {
  println("lock failed")
  return
 }
 // counter++
 getResp := client.Get(counterKey)
 cntValue, err := getResp.Int64()
 if err == nil || err == redis.Nil {
  cntValue++
  resp := client.Set(counterKey, cntValue, 0)
  _, err := resp.Result()
  if err != nil {
   println(err)
  }
 }
 println("current counter is ", cntValue)
 // unlock
 delResp := client.Del(lockKey)
 unlockStatus, err := delResp.Result()
 if err == nil && unlockStatus > 0 {
  println("unlock success")
 } else {
  println("unlock failed", err)
 }
}
func main() {
 var wg sync.WaitGroup
 for i := 0; i < 10; i++ {
  wg.Add(1)
  go func() {
   defer wg.Done()
   setnx()
  }()
 }
 wg.Wait()
}
复制代码

运行结果:

➜  go run main.go
lock failed
lock failed
lock failed
lock failed
lock failed
current counter is  34
lock failed
unlock success
复制代码

从上面的代码和执行结果可以看出,远程调用setnx的运行过程与单机非常相似。 如果获取锁失败,相关任务逻辑将不会继续向后执行。

setnx非常适合在高并发场景下争夺一些“独特”的资源。 例如,对于在商场销售的产品,在某个时间点,会有多个买家下单并竞争。 在这种场景下,我们不能依靠具体的时间来确定顺序,因为不同设备的时间无法保证是相同的,时序也无法保证。

因此,我们需要依赖这些请求到达redis节点的顺序来进行正确的抢锁操作。

如果用户的网络环境比较差的话,可能抢不到。

基于分布式锁

基于Redis的锁和基于Redis的锁有些类似。 不同的是Lock会一直阻塞直到成功,这和单机场景下的mutex.Lock非常相似。

package main
import (
 "github.com/go-zookeeper/zk"
 "time"
)
func main() {
 c, _, err := zk.Connect([]string{"127.0.0.1"}, time.Second)
 if err != nil {
  panic(err)
 }
 l := zk.NewLock(c, "/lock", zk.WorldACL(zk.PermAll))
 err = l.Lock()
 if err != nil {
  panic(err)
 }
 println("lock success, do your business logic")
 time.Sleep(time.Second * 10) // 模拟业务处理
 l.Unlock()
 println("unlock success, finish business logic")
}
复制代码

原理也是基于临时节点和watch API。 例如,我们在这里使用/lock节点。

Lock会将自己的值插入到该节点下的节点列表中。 只要该节点下的子节点发生变化,所有观察该节点的程序都会收到通知。 这时,程序会检查当前节点下最小子节点的ID是否与自己一致。 如果一致,则锁定成功。

这种分布式阻塞锁比较适合分布式任务调度场景,但不太适合高频次、锁持有时间较短的抢锁场景。

一般来说,基于强一致性协议的锁适合粗粒度的锁定操作。 这里的粒度粗是指锁的时间比较长。 我们在使用的时候也要考虑在自己的业务场景中使用是否合适。

总结

本期主要介绍分布式ID的使用场景、分布式ID如何生成、分布式锁及使用。

未经允许不得转载! 作者:admin,转载或复制请以超链接形式并注明出处天心神途传奇手游发布网

原文地址:《wg999 分布式id生成器和分布式锁简介》发布于:2024-03-25

发表评论

表情:
验证码
评论列表 (暂无评论,36人围观)

还没有评论,来说两句吧...