电商并发超卖现象

线程

这应该是电商系统里感觉最难的一块功能了

解决方法

  • 扣减库存不在程序中进行,而是通过数据库
  • 向数据库传递库存增量,扣减1个库存,增量为 -1
  • 在数据库 update 语句计算库存,通过 update 行锁解决并发(该方法会导致库存为负数)

模拟并发场景代码

public class My { public static void main(String[] args) throws InterruptedException { CountDownLatch cdl = new CountDownLatch(5); CyclicBarrier cyclicBarrier = new CyclicBarrier(5); ExecutorService es = Executors.newFixedThreadPool(5); for (int i = 0; i < 5; i++) { es.execute(() -> { try { cyclicBarrier.await(); // TODO: 其他业务逻辑代码,添加到这里 System.out.println("执行"); } catch (Exception e) { e.printStackTrace(); } finally { cdl.countDown(); } }); } cdl.await(); es.shutdown(); } }

1. 基于 Synchronized 最原始的锁

解决方式①

不要把查询结果缓存到方法内,每次更新前都使用数据库行级锁查询最新数量

解决方式②(通过给方法加锁)

  1. 给方法添加 synchronized 关键字
  2. 通过自己手动不依赖注解实现事务,在异常时候手动回滚事务
public synchronized Integer createOrder() throws Exception{ // 打开事务 TransactionStatus transaction = platformTransactionManager.getTransaction(transactionDefinition); // 其他业务代码 // 有异常进行事务回滚 platformTransactionManager.rollback(transaction); // 没有异常进行事务提交 platformTransactionManager.commit(transaction); }

解决方式③

通过使用 synchronized 同步块解决

public synchronized Integer createOrder() throws Exception{ // 这里的 this 指的是 @Service 生成的实例,因为 @Service 模式是单例模式,只会产生一个实例。 synchronized(this) { // 你的业务代码及手动事务 } }

2. 基于 ReentrantLock(可重入锁) 并发包中的锁

分析ReentrantLock的实现原理

class OrderService { // 必须要引入并发包里面的锁 "java.util.concurrent.locks" private Lock lock = new ReentrantLock(); public Integer createOrder() throws Exception { // 在当行表示已加锁,在所有的线程中只有一个线程能获得锁继续执行后续的操作,其他线程进入等待 lock.lock(); // 业务代码... // 在事务提交以后需要进行释放锁 lock.unlock(); } }

基于数据库悲观锁的分布式锁

可以跨线程、跨进程、跨jvm实现锁

实现原理:
1)、多个进程、多个线程访问共同组件数据库
2)、通过 select…for update 访问同一条数据
3)、for update 锁定数据,其他线程只能等待

在 Navicat 中需要先设置不自动提交事务: SET @@autocommit=0;

优点: 简单方便、易于理解、易于操作
缺点: 并发量大时,对数据库压力较大
建议: 作为锁的数据库与业务数据库分开

基于 Redis 的 Setnx 实现分布式锁

获取锁的原理
主要是利用了 redis 单线程及 NX 的原子性操作,在多个线程并发时,只有一个线程可以设置成功。设置成功即获得锁,可以执行后续的业务处理。如果出现了异常或过了锁的有效期,锁自动释放。

释放锁原理
释放锁采用 Redis 的 delete 命令,释放锁时要校验之前设置的随机数,相同才能释放,释放锁需要用到 LUA 校验,主要是 delete 命令没有校验功能。

if redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del", KEYS[1]) else return 0 end

原理图解
redislock.png

实现原理:
获取锁的 Redis 命令

SET resource_name my_random_value NX PX 3000

  • resource_name: 表示资源名称(可根据不同的业务区分不同的锁)
  • my_random_value: 随机值,每个线程的随机值都需要不同,主要是用于释放锁时的效验
  • NX: key不存在时设置成功,key存在则设置不成功
  • PX: 自动失效时间,出现异常情况,锁可以过期失效

Zookeeper 分布式锁原理

原理

  • 利用 Zookeeper 的瞬时有序节点的特点
  • 多线程并发创建瞬时节点时,得到有序的序列
  • 序号最小的线程获得锁
  • 其他的线程监听自己序号前一个序号
  • 前一个线程执行完成,删除自己序号的节点
  • 下个序号的线程得到通知,继续执行

扩展知识