资讯详情

Redis 如何实现库存扣减操作和防止被超卖?(荣耀典藏版)

b51ecb11818dbb552c559a9627345234.jpeg

前言

一、解决方案

二、分析

2.1.基于数据库单库存

2.2.基于数据库的多库存

2.3、基于redis

三、基于redis具体实现库存扣减的具体实现

3.1.初始化库存回调函数(IStockCallback )

3.2.扣除库存服务(StockService)

3.3、调用


前言

电子商务在项目方面的经验非常普遍。在日常开发中,有许多类似的库存扣除操作,如电子商务系统中的商品库存、抽奖系统中的奖品库存等。无论你是包装还是真实,至少你应该能够解释电子商务中常见的问题,比如库存操作如何防止商品超卖?这已经成为,那么如何解决这样的问题呢?这篇文章带你重新认识Redis的强大!!!

  1. 使用mysql数据库,用一个字段存储库存,每次扣除库存以更新此字段。

  2. 仍然使用数据库,但将库存分层存储在多个记录中。扣除库存时,路由会增加并发量,但大量访问数据库无法避免更新库存。

  3. 将库存放到redis使用redis的incrby特征扣除库存。

二、分析

上述第一种和第二种方法是基于数据扣除库存。

2.1.基于数据库单库存

第一种方法是在这里等待所有的要求,以获得来扣除库存。它可以在并发量低的情况下使用,但一旦并发量大,就会有大量的要求被阻塞,导致要求加班,然后整个系统雪崩;它经常访问数据库,占用大量的数据库资源,因此不适用于并发率高的情况。

2.2.基于数据库的多库存

第二种方法实际上是第一种方法的优化版本,在一定程度上增加了并发量,但仍将大量更新数据库,占用大量数据库资源。

  • 以数据库扣除库存的方式,必须在一句话中执行扣除库存的操作,不能先执行selec在update,这样,并发下就会出现超扣。

updatenumbersetx=x-1wherex>0 
  • MySQL一般来说,高并发处理性能会出现问题,MySQL并发处理性能thread上升和上升,但在一定的并发度之后,会有一个明显的拐点,然后一路下降,最终甚至会超过单一thread性能差。

  • 当库存减少和高并发相遇时,由于操作的库存数量在同一行,会有竞争InnoDB行锁问题导致相互等待甚至死锁,从而大大降低MySQL处理性能最终导致前端页面超时异常。

2.3、基于redis

针对上述问题的问题我们就有了第三种方案,将库存放到缓存,利用redis的incrby特征扣除库存,解决超扣和性能问题。但一旦缓存丢失,需要考虑恢复计划。例如,当抽奖系统扣除奖品库存时,初始库存=库存总数-已发放的奖励数,但如果是异步奖励,需要等到MQ消息消费完成后才能重启redis库存初始化,否则库存不一致。项目实战(点击下载):SpringBoot SpringCloud Mybatis Vue实战电商项目

三、基于redis具体实现库存扣减的具体实现

  • 我们使用redis的lua实现库存扣除库存

  • 因为是分布式环境,需要一个分布式锁来控制,只有一个服务来初始化库存

  • 需要提供一个回调函数,在初始化库存的时候去调用这个函数获取初始化库存

3.1.初始化库存回调函数(IStockCallback )

/** *获取库存回调 *@author龙背骑仕 */ publicinterfaceIStockCallback{  /** *获取库存 *@return */ intgetStock(); }

3.2.扣除库存服务(StockService)

/** *扣库存 * *@author龙背骑仕 */ @Service publicclassStockService{ Loggerlogger=LoggerFactory.getLogger(StockService.class);  /** *不限库存 */ publicstaticfinallongUNINITIALIZED_STOCK=-3L;  /** *Redis客户端 */ @Autowired privateRedisTemplate<String,Object>redisTemplate;  /** *执行扣库存脚本 */ publicstaticfinalStringSTOCK_LUA;  static{ /** * *@desc扣减库存Lua脚本 *库存(stock)-1:表示库存不限 *库存(stock)0:表示没有库存 *库存(stock)大于0:表示剩余库存 * *@params库存key *@return *-三、库存未初始化 *-2:库存不足 *-1:不限库存 *大于等于0:剩余库存(扣除后剩余库存) *redis缓存的存(value)是-1表示不限库存,直接返回1
         */
        StringBuilder sb = new StringBuilder();
        sb.append("if (redis.call('exists', KEYS[1]) == 1) then");
        sb.append("    local stock = tonumber(redis.call('get', KEYS[1]));");
        sb.append("    local num = tonumber(ARGV[1]);");
        sb.append("    if (stock == -1) then");
        sb.append("        return -1;");
        sb.append("    end;");
        sb.append("    if (stock >= num) then");
        sb.append("        return redis.call('incrby', KEYS[1], 0 - num);");
        sb.append("    end;");
        sb.append("    return -2;");
        sb.append("end;");
        sb.append("return -3;");
        STOCK_LUA = sb.toString();
    }

    /**
     * @param key           库存key
     * @param expire        库存有效时间,单位秒
     * @param num           扣减数量
     * @param stockCallback 初始化库存回调函数
     * @return -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存
     */
    public long stock(String key, long expire, int num, IStockCallback stockCallback) {
        long stock = stock(key, num);
        // 初始化库存
        if (stock == UNINITIALIZED_STOCK) {
            RedisLock redisLock = new RedisLock(redisTemplate, key);
            try {
                // 获取锁
                if (redisLock.tryLock()) {
                    // 双重验证,避免并发时重复回源到数据库
                    stock = stock(key, num);
                    if (stock == UNINITIALIZED_STOCK) {
                        // 获取初始化库存
                        final int initStock = stockCallback.getStock();
                        // 将库存设置到redis
                        redisTemplate.opsForValue().set(key, initStock, expire, TimeUnit.SECONDS);
                        // 调一次扣库存的操作
                        stock = stock(key, num);
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            } finally {
                redisLock.unlock();
            }

        }
        return stock;
    }

    /**
     * 加库存(还原库存)
     *
     * @param key    库存key
     * @param num    库存数量
     * @return
     */
    public long addStock(String key, int num) {

        return addStock(key, null, num);
    }

    /**
     * 加库存
     *
     * @param key    库存key
     * @param expire 过期时间(秒)
     * @param num    库存数量
     * @return
     */
    public long addStock(String key, Long expire, int num) {
        boolean hasKey = redisTemplate.hasKey(key);
        // 判断key是否存在,存在就直接更新
        if (hasKey) {
            return redisTemplate.opsForValue().increment(key, num);
        }

        Assert.notNull(expire,"初始化库存失败,库存过期时间不能为null");
        RedisLock redisLock = new RedisLock(redisTemplate, key);
        try {
            if (redisLock.tryLock()) {
                // 获取到锁后再次判断一下是否有key
                hasKey = redisTemplate.hasKey(key);
                if (!hasKey) {
                    // 初始化库存
                    redisTemplate.opsForValue().set(key, num, expire, TimeUnit.SECONDS);
                }
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        } finally {
            redisLock.unlock();
        }

        return num;
    }

    /**
     * 获取库存
     *
     * @param key 库存key
     * @return -1:不限库存; 大于等于0:剩余库存
     */
    public int getStock(String key) {
        Integer stock = (Integer) redisTemplate.opsForValue().get(key);
        return stock == null ? -1 : stock;
    }

    /**
     * 扣库存
     *
     * @param key 库存key
     * @param num 扣减库存数量
     * @return 扣减之后剩余的库存【-3:库存未初始化; -2:库存不足; -1:不限库存; 大于等于0:扣减库存之后的剩余库存】
     */
    private Long stock(String key, int num) {
        // 脚本里的KEYS参数
        List<String> keys = new ArrayList<>();
        keys.add(key);
        // 脚本里的ARGV参数
        List<String> args = new ArrayList<>();
        args.add(Integer.toString(num));

        long result = redisTemplate.execute(new RedisCallback<Long>() {
            @Override
            public Long doInRedis(RedisConnection connection) throws DataAccessException {
                Object nativeConnection = connection.getNativeConnection();
                // 集群模式和单机模式虽然执行脚本的方法一样,但是没有共同的接口,所以只能分开执行
                // 集群模式
                if (nativeConnection instanceof JedisCluster) {
                    return (Long) ((JedisCluster) nativeConnection).eval(STOCK_LUA, keys, args);
                }

                // 单机模式
                else if (nativeConnection instanceof Jedis) {
                    return (Long) ((Jedis) nativeConnection).eval(STOCK_LUA, keys, args);
                }
                return UNINITIALIZED_STOCK;
            }
        });
        return result;
    }

}

3.3、调用

/**
 * @author 龍揹仩哋騎仕
 */
@RestController
public class StockController {

    @Autowired
    private StockService stockService;

    @RequestMapping(value = "stock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object stock() {
        // 商品ID
        long commodityId = 1;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;
        long stock = stockService.stock(redisKey, 60 * 60, 2, () -> initStock(commodityId));
        return stock >= 0;
    }

    /**
     * 获取初始的库存
     *
     * @return
     */
    private int initStock(long commodityId) {
        // TODO 这里做一些初始化库存的操作
        return 1000;
    }

    @RequestMapping(value = "getStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object getStock() {
        // 商品ID
        long commodityId = 1;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;

        return stockService.getStock(redisKey);
    }

    @RequestMapping(value = "addStock", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
    public Object addStock() {
        // 商品ID
        long commodityId = 2;
        // 库存ID
        String redisKey = "redis_key:stock:" + commodityId;

        return stockService.addStock(redisKey, 2);
    }
}

只有当你开始,你才会到达你的理想和目的地,只有当你努力, 你才会获得辉煌的成功,只有当你播种,你才会有所收获。只有追求, 才能尝到成功的味道,坚持在昨天叫立足,坚持在今天叫进取,坚持在明天叫成功。欢迎所有小伙伴们点赞+收藏!!!

 

 

标签: 3l光点长距离高精度稳定传感器

锐单商城拥有海量元器件数据手册IC替代型号,打造 电子元器件IC百科大全!

锐单商城 - 一站式电子元器件采购平台