场景
现在有一个场景:
我们需要在pinnacle中跑定时任务来关闭未关闭的房间以及进行plan状态的轮转
如果只有一个机器下,我使用下面这个代码来执行操作,看起来是没什么问题的
1 2 3 4 5 6 7 8
| wg.Add(1) for _ , plan range plans { go func(){ closePlan() wg.Done() }() } wg.Wait()
|
但是如果我们有多个 replicas 那么,假设有 replicas:5,那么有可能每个副本机器会同时来执行关闭closePlan的操作,这样子带来的结果就是数据库中表数据的不一致性以及其他方面的不一致性,严重灾难!
因此我们需要使用分布式锁来保证一个定时任务在一段时间内只能由一个机器来执行这个定时任务
也就是在多实例情况下实现单实例操作
说人话:上厕所,一个坑位只能上一个人,这个人上的时候其他人都得等着
思考过程
ok ,那么我们来一步步推导分布式锁的书写逻辑
我们如何保证在一段时间内只能由一个机器来执行?
我们可以上锁,这里使用了最简单的redis 实现的分布式锁
我们可以将 某个定时任务的唯一uid(可辨认的) 放到redis上,如果redis上有这个uid存在了,就说明坑位满了
这里的唯一 uid 是用来区分这个定时任务与其他定时任务的,机器副本都能够共同识别认可的
比如在定时任务中,我们可以用 key = corn + planId 来表示这个唯一 uid
注意这里的锁是抽象概念的锁,这里的锁并不是特殊值golang中 mutex.Lock
这种锁,而是指能达到锁的效果的目的手段
再理解一下,机器副本相当于不同的顾客,而redis相当一个权威的购物平台,而plan就相当于一个个商店,商店将自己的信息上交给redis,然后不同的顾客就可以通过这些信息辨别商家
简单流程:
我们需要给一个特定的定时任务创建一个 id
比如说 机器A要跑定时任务的时候,先从 redis 中检查 redis.Get(“id”) 是否存在
如果存在就说明这个定时任务已经上锁了,程序直接return,如果不存在,那么我们就需要在 redis.Set(“id”) 上一个锁,表示自己占用了这个定时任务
然后当这个任务完成的时候,我们会将这个锁释放掉 :redis.del(“id”)
以上就是最简单的分布式锁场景
深入一下,如果机器A在运行的过程中直接panic了,而分布式锁没有删除应该怎么解决?
如果分布式锁没有删除,就会导致这个锁永远地留在了redis中,那么其他的机器也无法跑通这个定时任务,这就糟糕了!
那么处理方法就是:给这个锁加上 EX/PX(expireTime)过期时间(EX是秒,PX是毫秒),当副本直接 panic后,锁到期后会直接删除,这样子下一个锁就可以直接运行定时任务了
一般来说锁的设定时间要稍微大于 这个定时任务的的运行时间
比如说一个运行时间为1s,我们就可以设置为3秒5秒
再深入思考一下,现在我们已经设置了过期时间,机器B检查锁已经没了以后开始运行定时任务并重新上锁,而这个时候机器A跑完了流程,就会把redis中机器B上的分布式锁删掉,这可怎么办?
我们需要解决的就是机器A删除不了机器B上的锁,也就是给锁贴一个 create_user_id 这个标签
主要实现形式是机器B可以 set(”id”,”ValueB”)来告知机器A,这个锁是老子加的,你没有权限删除他,对就是这么张狂
问题其实还是存在着的,比如说在A的锁已经过期了,B开始运行定时任务的这一段时间中,定时任务被两个实例调用,太危险了
- 我们需要尽可能的保证 定时任务操作的原子性,比如说数据库事务的原子性就很nice
- 我们可以从源头上掐灭,不让机器A的分布式锁过期,即引入一个协程来监听机器A的是否完成任务,如果没有完成,则再追加时间;至于如果机器A一直在执行任务,那就是代码写的有问题了
注:看到这我们需要注意,分布式锁并不是指定时任务每个任务只执行一次,而是指在多实例环境下,每段执行时间中,只有一个单实例执行任务。
所以这个任务可以执行多次,至于执行多次后可能产生的后果我们需要在代码中通过流程著名
代码逻辑
分布式锁很简单,总的来说就三步
- 检验是否有锁存在
- 如果没有加锁 ; 如果有 return
- 释放锁
但是redis是单线程的,事实上我们的操作一和操作二是不能分离的,要实现redis中的原子性,我们就需要实现LUA脚本:
- 检验是否有锁-> 上锁 or return
- 释放锁
下面是代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91
| const ( randomLen = 16 tolerance = 500 millisPerSecond = 1000 lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then redis.call("SET", KEYS[1], ARGV[1], "PX", ARGV[2]) return "OK" else return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2]) end` delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end` )
type RedisLock struct { store *Redis seconds uint32 key string id string }
func init() { rand.Seed(time.Now().UnixNano()) }
func NewRedisLock(store *Redis, key string) *RedisLock { return &RedisLock{ store: store, key: key, id: stringx.Randn(randomLen), } }
func (rl *RedisLock) Acquire() (bool, error) { return rl.AcquireCtx(context.Background()) }
func (rl *RedisLock) AcquireCtx(ctx context.Context) (bool, error) { seconds := atomic.LoadUint32(&rl.seconds) resp, err := rl.store.EvalCtx(ctx, lockCommand, []string{rl.key}, []string{ rl.id, strconv.Itoa(int(seconds)*millisPerSecond + tolerance), }) if err == red.Nil { return false, nil } else if err != nil { logx.Errorf("Error on acquiring lock for %s, %s", rl.key, err.Error()) return false, err } else if resp == nil { return false, nil }
reply, ok := resp.(string) if ok && reply == "OK" { return true, nil }
logx.Errorf("Unknown reply when acquiring lock for %s: %v", rl.key, resp) return false, nil }
func (rl *RedisLock) Release() (bool, error) { return rl.ReleaseCtx(context.Background()) }
func (rl *RedisLock) ReleaseCtx(ctx context.Context) (bool, error) { resp, err := rl.store.EvalCtx(ctx, delCommand, []string{rl.key}, []string{rl.id}) if err != nil { return false, err }
reply, ok := resp.(int64) if !ok { return false, nil }
return reply == 1, nil }
func (rl *RedisLock) SetExpire(seconds int) { atomic.StoreUint32(&rl.seconds, uint32(seconds)) }
|
1 2 3 4
| lockCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] else return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2]) end`
|
if redis.call("GET", KEYS[1]) == ARGV[1]
ARGV[1]表示机器的唯一值,这是来验证是否是该机器上的锁
else return redis.call("SET", KEYS[1], ARGV[1], "NX", "PX", ARGV[2])
如果不是则上锁
ARGV[1]
签名机器自身唯一值,NX
表示 if not exist
PX ARGV[2]
表示过期时间,ARGV[2]
是过期时间
1 2 3 4 5
| delCommand = `if redis.call("GET", KEYS[1]) == ARGV[1] then return redis.call("DEL", KEYS[1]) else return 0 end`
|
简单来说就是 如果这是自己上的锁,那么删了(前提是已经执行完毕)
- Release() 释放锁
- AcquireCtx 获取锁
- SetExpire() 设置过期时间
- NewRedisLock() 新建锁
在我们的定时任务中就这么写:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| func PlanCorn() { ctx := context.Background() newCtx, cancel := context.WithCancel(ctx) defer cancel()
type ongoingPlan struct { model.Plan QuestionCount int } var plans []*ongoingPlan err := dao.Plan.DB().WithContext(ctx).Model(&model.Plan{}). Select("plans.*,knowledge_bases.question_count AS question_count"). Joins("LEFT JOIN knowledge_bases ON plans.knowledge_base_id = knowledge_bases.id"). Where("plans.status = ?", model.Ongoing).Find(&plans).Error if err != nil { logx.Error(err) return }
cache := kernel.Kernel.MainCache
ch := make(chan struct{}, 5) wg := sync.WaitGroup{} for _, p := range plans { ch <- struct{}{} wg.Add(1) go func(plan *ongoingPlan) {
defer wg.Done() defer func() { if r := recover(); r != nil { logx.Error(fmt.Sprintf("panic: %v", r)) } <-ch }()
lock := rds.NewRedisLock(cache, rds.Key("PlanCorn",plan.Id)) lock.SetExpire(3) defer lock.ReleaseCtx(newCtx)
tx := dao.Plan.DB().WithContext(newCtx).Begin() defer tx.Rollback() ok := ClosePlanRoom(tx, &plan.Plan) if !ok { tx.Rollback() return }
if err = tx.Commit().Error; err != nil { logx.Error(err) return }
}(p) }
wg.Wait()
logx.Info(time.Now().Format("2006-01-02") + " PlanCorn done success")
}
|
代码很简单,但也要理解一下里面的细节
rds.Key("PlanCorn",plan.Id)
用来表示这是哪个定时任务上的锁
defer lock.ReleaseCtx(newCtx)
细节 defer
总结
感谢老板,以上的所有见解都是老板教的,本人今天半夜突然长脑子了…………..