利用consul在spring boot中实现分布式锁场景代码示例解析

作者:袖梨 2022-06-29

本篇文章小编给大家分享一下利用consul在spring boot中实现分布式锁场景代码示例解析,文章代码介绍的很详细,小编觉得挺不错的,现在分享给大家供大家参考,有需要的小伙伴们可以来看看。

因为在项目实际过程中所采用的是微服务架构,考虑到承载量基本每个相同业务的服务都是多节点部署,所以针对某些资源的访问就不得不用到用到分布式锁了。

这里列举一个最简单的场景,假如有一个智能售货机,由于机器本身的原因不能同一台机器不能同时出两个商品,这就要求在在出货流程前针对同一台机器在同一时刻出现并发

创建订单时只能有一笔订单创建成功,但是订单服务是多节点部署的,所以就不得不用到分布式锁了。

以上只是一种简单的业务场景,在各种大型互联网实际应用中,需要分布式锁的业务场景会更多,综合比较了业界基于各种中间件来实现的分布式锁方案,然后结合实际业务最终

决定采用consul来实现,因为我们的项目中采用了consul做注册中心,并且consul天生可以保证一致性(这点类似zk),当然zk也能实现分布式锁,但是这里不对这点做过多讨论。

redis虽然也能实现分布式锁,但是可能因为场景比较复杂,如果redis采用cluster部署的话,如果某一主节点出现故障的话,有一定几率会出现脑裂现象,这样就可能会让竞争者在

并发时同时获得到锁,这样可能会破坏掉后面的业务,当然出现这种情况的概率很低,但是也不能完全排除,因为redis的根本不能保证强一致性导致的。

好了,这里说的最简单的分布式锁的意思是,多个竞争者同一时间并发去获得锁时,获取失败的就直接返回了,获取成功的继续后续的流程,然后在合适的时间释放锁,并且为锁

加了超时时间,防止获得到锁的进程或线程在未来得及释放锁时自己挂掉了,导致资源处于一直被锁定的状态无法得到释放。主要的实现逻辑就是这样,如果有人想实现获得锁失

败的竞争者一直继续尝试获得,可以基于该示例进行修改,加上自旋逻辑就OK。

以下是锁实现代码:

package com.lyb.consullock;

import com.ecwid.consul.v1.ConsulClient;
import com.ecwid.consul.v1.agent.model.NewCheck;
import com.ecwid.consul.v1.kv.model.PutParams;
import com.ecwid.consul.v1.session.model.NewSession;
import com.ecwid.consul.v1.session.model.Session;
import lombok.Data;


import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;


public class DistributedLock{
    private ConsulClient consulClient;

    /**
     * 构造函数
     * @param consulHost 注册consul的client或服务端的Ip或主机名,或域名
     * @param consulPort 端口号
     */
    public DistributedLock(String consulHost,int consulPort){
        consulClient = new ConsulClient(consulHost,consulPort);
    }

    /**
     * 获得锁的方法
     * @param lockName 竞争的资源名
     * @param ttlSeconds 锁的超时时间,超过该时间自动释放
     * @return
     */
    public LockContext getLock(String lockName,int ttlSeconds){
        LockContext lockContext = new LockContext();
        if(ttlSeconds<10 || ttlSeconds > 86400) ttlSeconds = 60;
        String sessionId = createSession(lockName,ttlSeconds);
        boolean success = lock(lockName,sessionId);
        if(success == false){
            consulClient.sessionDestroy(sessionId,null);
            lockContext.setGetLock(false);

            return lockContext;
        }

        lockContext.setSession(sessionId);
        lockContext.setGetLock(true);

        return lockContext;
    }

    /**
     * 释放锁
     * @param sessionID
     */
    public void releaseLock(String sessionID){
        consulClient.sessionDestroy(sessionID,null);
    }

    private String createSession(String lockName,int ttlSeconds){
        NewCheck check = new NewCheck();
        check.setId("check "+lockName);
        check.setName(check.getId());
        check.setTtl(ttlSeconds+"s"); //该值和session ttl共同决定决定锁定时长
        check.setTimeout("10s");
        consulClient.agentCheckRegister(check);
        consulClient.agentCheckPass(check.getId());

        NewSession session = new NewSession();
        session.setBehavior(Session.Behavior.RELEASE);
        session.setName("session "+lockName);
        session.setLockDelay(1);
        session.setTtl(ttlSeconds + "s"); //和check ttl共同决定锁时长
        List checks = new ArrayList<>();
        checks.add(check.getId());
        session.setChecks(checks);
        String sessionId = consulClient.sessionCreate(session,null).getValue();

        return sessionId;
    }

    private boolean lock(String lockName,String sessionId){
        PutParams putParams = new PutParams();
        putParams.setAcquireSession(sessionId);

        boolean isSuccess = consulClient.setKVValue(lockName,"lock:"+ LocalDateTime.now(),putParams).getValue();

        return isSuccess;
    }

    /**
     * 竞争锁时返回的对象
     */
    @Data
    public class LockContext{
        /**
         * 获得锁成功返回该值,比便后面用该值来释放锁
         */
        private String session;
        /**
         * 是否获得到锁
         */
        private boolean isGetLock;
    }
}

pom文件



    4.0.0
    
        org.springframework.boot
        spring-boot-starter-parent
        2.1.6.RELEASE
         
    
    com.lyb
    consul-lock
    0.0.1-SNAPSHOT
    consul-lock
    Demo project for Spring Boot

    
        1.8
        Greenwich.SR2
    

    
        
            org.springframework.boot
            spring-boot-starter-web
        
        
            org.springframework.cloud
            spring-cloud-starter-consul-discovery
        
        
            org.springframework.boot
            spring-boot-starter-actuator
        
        
            org.projectlombok
            lombok
            1.18.8
            true
        

        
            org.springframework.boot
            spring-boot-starter-test
            test
        
    

    
        
            
                org.springframework.cloud
                spring-cloud-dependencies
                ${spring-cloud.version}
                pom
                import
            
        
    

    
        
            
                org.springframework.boot
                spring-boot-maven-plugin
            
        
    

测试代码:

package com.lyb.consullock;

import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

@RunWith(SpringRunner.class)
@SpringBootTest
public class ConsulLockApplicationTests {
    @Autowired
    private ServiceConfig serviceConfig;
    @Test
    public void lockSameResourer() {
        //针对相同资源在同一时刻只有一个线程会获得锁
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        for (int a=0;a<20;a++){
            threadPool.submit(
                    () -> {
                        for (int i = 0;i < 100; i++) {
                            DistributedLock lock = new DistributedLock(
                                    serviceConfig.getConsulRegisterHost(),
                                    serviceConfig.getConsulRegisterPort());

                            DistributedLock.LockContext lockContext = lock.getLock("test lock", 10);
                            if (lockContext.isGetLock()) {
                                System.out.println(Thread.currentThread().getName() + "获得了锁");
                                try {
                                    TimeUnit.SECONDS.sleep(1);
                                    lock.releaseLock(lockContext.getSession());
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }else {
                                //System.out.println(Thread.currentThread().getName() + "没有获得锁");
                            }
                        }
                    });
        }

        try {
            TimeUnit.MINUTES.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Test
    public void lockDiffResource(){
        //针对不通的资源所有线程都应该能获得锁
        ExecutorService threadPool = Executors.newFixedThreadPool(10);
        for (int a=0;a<20;a++){
            threadPool.submit(
                    () -> {
                        for (int i = 0;i < 100; i++) {
                            DistributedLock lock = new DistributedLock(
                                    serviceConfig.getConsulRegisterHost(),
                                    serviceConfig.getConsulRegisterPort());

                            DistributedLock.LockContext lockContext = lock.getLock("test lock"+Thread.currentThread().getName(), 10);
                            if (lockContext.isGetLock()) {
                                System.out.println(Thread.currentThread().getName() + "获得了锁");
                                try {
                                    TimeUnit.SECONDS.sleep(1);
                                    lock.releaseLock(lockContext.getSession());
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                            }else {
                                //System.out.println(Thread.currentThread().getName() + "没有获得锁");
                                Assert.assertTrue(lockContext.isGetLock());
                            }
                        }
                    });
        }

        try {
            TimeUnit.MINUTES.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

相关文章

精彩推荐