多线程编程(2)—线程安全

1. 问题的引出 

 线程安全归根结底可以说是内存安全,在jvm内存模型中,有一块特殊的公共内存空间,称为堆内存,进程内的所有线程都可以访问并修改其中的数据,就会造成潜在的问题。因为堆内存空间在没有保护机制的情况下,你放进去的数据,可能被别的线程篡改。如下代码:

public class ThreadSafe implements Runnable {
    private static int count = 0;
    @Override
    public void run() {
        for (int i = 0; i < 1000; i++) {
            count++;
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
            es.execute(new ThreadSafe());
        }
        es.shutdown();  //不允许添加线程到线程池,异步关闭连接池
        es.awaitTermination(10L, TimeUnit.SECONDS); //等待连接池的线程任务完成
        System.out.println(count);
    }
}

 本来期望的值是20000,可是最终输出的结果却一点在变化,其?#24213;?#26159;小于等于20000,显然这是由于线程不安全造成的,多个线程并发的去访问全局变量、静态变量、文件、设备、套接字等都可能出?#32456;?#31181;问题。


2. 线程同步的措施

 为了协调和配合线程之间对共享资源的访问,通常有四种方式:

   1. 临界区:访问某一段临界资源的代码片段,与共享资源类似,但有一点不同的是,某一时刻只允许一个线程去访问(对应java中的关键字 synchronized包含的代码)。

?   2. 互斥量:互斥量是一个对象,只有都拥有互斥量的对象才可以访问共享资源。而?#19968;?#26021;量中只有一个,通常互斥量的实现是通过锁来实现的,而且加锁操作和?#22836;?#25805;作只能由同一个线程来完成。此处与临界区的区别是一段代码,通常存在与一个文件中,而互斥量是一个对象,加锁操作和解锁操作可以在不同的文件去编写,只要是同一个线程就好。

  3. 信号量: 信号量可以允许指定数量的线程同一时刻去访问共享资源,当线程数达到了阈值后,将阻止其他线程的访问,最常见的比如生产者和消费者问题。信号量和互斥量的区别则是信号量的发出和?#22836;?#21487;以由不同线程来完成

?   4. 事件?#21644;?#36807;发送通知的?#38382;?#26469;实现线程同步,可以实现不同进程中的线程同步操作


 3.饥饿与死锁

  饥饿:某些线程或进程由于长期得不到资源,而总是处于就绪或者阻塞状态。例如:

  ①. 该进程或线程所拥有的CPU时间片被其他线程抢占而得不到执行(通常是优先级比它高的线程或进程),一直处于就绪状态。

  ②. 由于选用不恰当的调度算法,导致该进程或线程长期无法得到CPU时间片,处于就绪状态。

  ③. 由于唤醒的时间把握?#27426;裕?#21796;醒线程?#20445;?#25152;需的资源处于被锁定状态,导致线程回到阻塞状态。

  死锁:两个或多个线程在执行过程中,由于相互占有对方所需的资源而?#21482;?#19981;相让从而造成这些线程都被阻塞,若无外力的作用下,他们都将无法执行下去。例如

  ①. 进程推进顺序不合适。互相占有彼此需要的资源,同时请求对方占有的资源,形成循环依赖的关系。

  ②. 资源不足。

  ③. 进程运?#22411;?#36827;顺序与速度不同,也可能产生死锁。

  一些避免死锁的措施:

  1. 不要在锁区域内在加把锁,即不要在?#22836;?#38145;之前竞争其他锁。

  2. 减小锁粒度,即减小线程加锁的?#27573;В?#30495;正需要的时候再去加锁。

  3. 顺序访问共享资源。

  4. 设置超时机制,超过指定时间则程序返回错误。

  5. 竞争锁期间,允许程序被中断。


 4.代码层面解决线程安全

 解决线程安全主要考虑三方面:

  1. 可见性:当多个线程并发的读写某个共享资源的时候,每个线程总是可以取到该共享资源的最新数据。

  2. 原?#26377;裕?/strong>某线程对一个或者多个共享资源所做的一连串写操作不会被中?#24076;?#22312;此期间不会有其他线程同时对这些共享资源进行写操作。

  3. 有序性:单个线程内的操作必须是有序的。

通常原?#26377;?#37117;可以得到保证,问题的病端就出在可见性和原?#26377;浴?/span>

4.1 可见性的问题

  如下实例程序,按通常的理解来说,当主线程等待一秒后,把flag的值修改为true后,另外一个线程应该可以感知到,然后跳过while循环,直接打印出后面的数据,可是最结果却一直卡在了while循环里。

public class Thread4 implements Runnable{
    private static boolean flag = false;
    @Override
    public void run() {
        System.out.println("waiting for data....");     
        while (!flag);
        System.out.println("cpying with data");
    }

    public static void main(String[] args) throws InterruptedException {
        Thread4 thread4 = new Thread4();
        Thread t = new Thread(thread4);
        t.start();
        Thread.sleep(1000);
        flag = true;
    }
}
/* output
 * waiting for data....
 */

  主要的原因是java程序在jvm?#26174;?#34892;的时候,该程序所?#21152;?#30340;内存分为两类主内存和工作内存(逻辑上的内存,实际上是cpu的寄存器和高速缓存,因为,cpu在计算的时候,并不总是从内存读取数据,它的数据读取顺序优先级是:寄存器-高速缓存-内存。CPU和内存之间通过总线进行)。也就是在主线程中启动另一个线程t会开辟出一个新的工作内存,与主线程的工作内存相互独立,且线程之间无法直接通信,只能去主内存读取全局变量,而线程t中做while判断的时候并不会去读取主内存的flag值,致使线程t无法被感知到flag在其他线程被改变,可以做一个测试,现在把run函数改成如下?#38382;劍?/span> 

public void run() {
        System.out.println("waiting for data....");
        /*  Notice
            如果在while循环里加上System.out.println(flag);语句,则不会使用本工作内存的flag数据,
            而是重新去主内存加载数据
         */
        while (!flag){
            System.out.println(flag);     //测试,可以做到线程的可见性
        }
        System.out.println("cpying with data");
 }
/* output
 * waiting for data....
 * false
 * ...
 * false
 * cpying with data
 */

 为了感知其他线程中一些全局变量值的变化,而且避免频繁去测试主内存中的数据变化,保证线程之间的可见性,可以使用volatile关键字去修饰全局变量,如下:

public class Thread4 implements Runnable{
    private volatile static boolean flag = false;
    @Override
	public void run() {
        System.out.println("waiting for data....");
        while (!flag);
        System.out.println("cpying with data");
    }
    //....
    
 }
/* output
 * waiting for data....
 * cpying with data
 */

volatile关键字借助MESI一致性协议,会在工作内存(CPU的寄存器等)与主内存连接的总线上建立一道总线嗅探机制,一旦发现其他线程修改了主内存中的某个全局变量(即图中?#28982;?#33394;线条读取的数据以及写回的数据),就会让其他工作线程中从主内存拷贝出来的副本变量失效(即图中紫色的线条读取的数据),从而会使左边的线程重新去读取数据(即图中红色的线条读取的数据)。如下图:

  虽然解决了原?#26377;?#38382;题,可是volatile关键字不支持原?#26377;?#25805;作,如下程序:

public class Thread5 implements Runnable {
    private static volatile int count = 0;
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            count++;
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
            es.execute(new Thread5());
        }
        es.shutdown();  //不允许添加线程,异步关闭连接池
        es.awaitTermination(10L, TimeUnit.SECONDS); //等待连接池的线程任务完成
        System.out.println(count);
    }
}
/* output
 * 175630
 */

4.2 原?#26377;?#38382;题

 针对原?#26377;?#38382;题,我们可以使用熟悉的synchronized关键字,synchronized关键字最主要有以下3种应用方式:

  • 修?#38382;?#20363;方法,作用于当前实例加锁,进入同步代码前要获得当前实例的锁

  • 修饰静态方法,作用于当前类对象加锁,进入同步代码前要获得当前类对象的锁

  • 修饰代码块,指定加锁对象,对给定对象加锁,进入同步代码库前要获得给定对象的锁。

部分示例代码如下:

public class Thread5 implements Runnable {
    private static int count = 0;
    public synchronized static void add() {
        count++;
    }
    @Override
    public void run() {
        for (int i = 0; i < 1000000; i++) {
//          add()
            synchronized (Thread5.class){
                count++;
            }
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
            es.execute(new Thread5());
        }
        es.shutdown();  //不允许添加线程,异步关闭连接池
        es.awaitTermination(10L, TimeUnit.SECONDS); //等待连接池的线程任务完成
        System.out.println(count);
    }
}	
/* output
 * 20000000
 */

然而synchronized是一种悲观锁,具有强烈的独?#24049;?#25490;他特性,它频繁的加锁和?#22836;?#38145;操作会使程序的效?#23454;?#19979;。与悲观锁相对是一种?#27490;?#38145;操作CAS(CompareAndSwap),?#27490;?#38145;就是每次去取数据的时候都?#27490;?#30340;认为数据不会被修?#27169;?#22240;此这个过程不会上锁,但是在更新的时候会判断一下在此期间的数据有没有更新,如果没有更新则去修?#27169;?#21542;则失败。可是上面这种 操作会出现ABA(A-B-A,?#22411;?#34987;改变,但最后又改回原值)的问题,

针对上面的问题,java中可以使用Atomic,它的包名为java.util.concurrent.atomic。这个包里面提供了一组原子变量的操作类(通过?#23548;?#29256;本号的方式去解决ABA问题),这些类可以保证在多线程环境下,当某个线程在执行atomic的方法?#20445;?#19981;会被其他线程打?#24076;?#19968;直等到该方法执?#22411;?#25104;(具体的API文档可以查看参考文献第5点)。

public class ThreadSafe implements Runnable {
    private static AtomicInteger count = new AtomicInteger(0);
    @Override
    public void run() {
        for (int i = 0; i < 10000; i++) {
            count.getAndAdd(1);
        }
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
            es.execute(new ThreadSafe());
        }
        es.shutdown();  //不允许添加线程,异步关闭连接池
        es.awaitTermination(10L, TimeUnit.SECONDS); //等待连接池的线程任务完成
        System.out.println(count);
    }
}
/* output
 * 200000
 */

5. 其他方法解决线程同步

a. 自旋锁

 线程循环反复检查变量是否可用,在这一过程中线程一直保?#31181;?#34892;(RUNNABLE),因此是一种忙等待,不像关键字synchronized一样,一旦发现不能访问,则处于线程处于阻塞状态(BLOCKED)。

public class Thread6 implements Runnable{
    private static  final Lock lock = new ReentrantLock();
    private volatile static int count = 0;
    @Override
    public void run() {
        for (int i = 0; i < 1000000; i++){
            lock.lock();
            count++;
            lock.unlock();
        }
    }
    static void test(ExecutorService es) throws InterruptedException {
        for (int i = 0; i < 20; i++) {
            es.execute(new Thread6());
        }
        es.shutdown();  //不允许添加线程,异步关闭连接池
        es.awaitTermination(10L, TimeUnit.SECONDS); //等待连接池的线程任务完成
        System.out.println(count);
    }
    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = Executors.newFixedThreadPool(20);
        test(es);
    }
}

 如果在使用lock的时候包含了try...catch...语句,要注意的是lock 必须在 finally 块中?#22836;擰?#21542;则,如果受保护的代码将抛出异常,锁就有可能永远得不到?#22836;牛?/span>

 与Lock类同一个包java.util.concurrent.locks下还有一种读写分离的锁ReentrantReadWriteLock类,读写锁维护了一对锁,一个?#20102;?#21644;一个写锁。一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。

  

public class RWTest {
    private static final Map<String, Object> map = new HashMap<String, Object>();
    private static final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private static final Lock readLock = lock.readLock();
    private static final Lock writeLock = lock.writeLock();

    public static final Object get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }

    public static final Object put(String key, Object value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

    public static final void clear() {
        writeLock.lock();
        try {
            map.clear();
        } finally {
            writeLock.unlock();
        }
    }
}

 利用自旋锁Lock也提供了Condition来实现线程间的状态通知的,可以根据实?#26159;?#20917;去唤醒某个线程(与后面的wait不同,是随机的)或者所有线程。可以通过lock.newCondition()来获取得Condition实例,可以根据实际需求创建多个实例。

public class Thread9 {
    public static ReentrantLock lock=new ReentrantLock();
    public static Condition condition =lock.newCondition();
    public static void main(String[] args) {
        new Thread(){
            @Override
            public void run() {
                lock.lock();//请求锁
                try{
                    System.out.println(Thread.currentThread().getName()+"==》进入等待");
                    condition.await();//设置当前线程进入等待
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    lock.unlock();//?#22836;?#38145;
                }
                System.out.println(Thread.currentThread().getName()+"==》继续执行");
            }
        }.start();
        new Thread(){
            @Override
            public void run() {
                lock.lock();//请求锁
                try{
                    System.out.println(Thread.currentThread().getName()+"=》进入");
                    Thread.sleep(2000);//休息2秒
                    condition.signal();//随机唤醒等待队列中的一个线程
                    System.out.println(Thread.currentThread().getName()+"休息结束");
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }finally{
                    lock.unlock();//?#22836;?#38145;
                }
            }
        }.start();
    }
}
/*output
 *Thread-0==》进入等待
 *Thread-1=》进入
 *Thread-1休息结束
 *Thread-0==》继续执行
 */

b. wait.notify.notifyAll

 在关键字synchronized的线程同步机制,调用线程的sleep,yield方法?#20445;?#32447;程并不会让出对象锁,但是调用wait却不同,线程自动?#22836;?#20854;占有的对象锁,同时不会去申请对象锁,当线程被唤醒的时候,它才再次去申请竞争对象的锁(该关键字通常只与synchronized结合使用)。notify()唤醒在等待该对象同步锁的线程(只唤醒一个,如果有多个在等待),注意的是在调用此方法的时候,并不能确切的唤醒某一个等待状态的线程,而是由JVM确定唤醒哪个线程,而且不是按优先级。而notifyAll()则是唤醒所有等待的线程。

public class Thread8 implements Runnable {

    private int num;
    private Object lock;

    public Thread8(int num, Object lock) {
        this.num = num;
        this.lock = lock;
    }

    public void run() {
        try {
            while (true) {
                synchronized (lock) {
                    lock.notifyAll();
                    lock.wait();
                    System.out.println(num);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        final Object lock = new Object();
        Thread thread1 = new Thread(new Thread8(1, lock));
        Thread thread2 = new Thread(new Thread8(2, lock));
        thread1.start();
        thread2.start();
    }
}
/* output
 * 交替输出1,2,1,2,1,2......
 */

6.并发编程—CountDownLatch、CyclicBarrier、Semaphore和fork/join框架

1. CountDownLatch

  CountDownLatch实现的是一个倒序计数器,可以通过调用它的countDown实现计数器减一和await方法来阻塞当前线程:

public class Thread10 {
    public static void main(String[] args) throws InterruptedException {
        int count = 20;
        final CountDownLatch cdl = new CountDownLatch(count);
        ExecutorService es = Executors.newCachedThreadPool();

        for (int i = 0; i < count; i++) {
            es.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(cdl.getCount());
                    }finally {
                        cdl.countDown();
                    }
                }
            });
        }
        cdl.await();
        es.shutdown();
        System.out.println("主线程现在才结束: count = "+cdl.getCount());
    }
}	

2.CyclicBarrier

  即回环栅栏,是一种可重用的线程阻塞器,它将率先到达栅栏的这些线程阻塞(调用await()方法),直到指定数量的线程都到达该处,这些线程将会被全部?#22836;擰?/p>

public class Thread11 implements Runnable{
    private int num;
    private static CyclicBarrier cb = new CyclicBarrier(6); //指定栅栏的等待线程数
    public Thread11(int num){
        this.num = num;
    }
    @Override
    public void run() {
        try {
            Thread.sleep(1000*num);     //等待指定数量时间后到达栅栏处
            System.out.println(Thread.currentThread().getName() +" is coming..");
            cb.await(10L, TimeUnit.SECONDS);
            System.out.println("continue....");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) {
        ExecutorService es = Executors.newCachedThreadPool();
        for (int i = 0; i < 8; i++) {
            es.execute(new Thread11(i));
        }
        es.shutdown();
    }
}
/*
 *pool-1-thread-1 is coming..
 *pool-1-thread-2 is coming..
 *pool-1-thread-3 is coming..
 *continue....
 *continue....
 *continue....
 *pool-1-thread-4 is coming..
 *超时异常错误(指定时间内线程数量仍然到达)
 */

3.Semaphore信号量

  信号量用于保护对一个或多个共享资源的访问,其内部维护一个计数器,用来只是当前可以访问共享资源的数量。可以通过tryAcquire去尝试获取许可,还可以通过availablePermits()方法得到可用的许可数目,而acquire/release则是获取/?#22836;?#35768;可。

public class Thread12 implements Runnable {
    private static SecureRandom random= new SecureRandom();
    private static Semaphore semaphore = new Semaphore(3, true);

    @Override
    public void run() {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + " got permission...");
            Thread.sleep(random.nextInt(10000));
            semaphore.release();
            System.out.println(Thread.currentThread().getName() + " released permission...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ExecutorService es = Executors.newCachedThreadPool();
        for (int i = 0; i < 6; i++) {
            es.execute(new Thread12());
        }
        es.shutdown();
    }
}

4.fork/join框架

 Fork/Join框架提供了的一个用于并行执行任务的框架,充分利用了CPU资源,把大任务?#25351;?#25104;若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。(?#36824;㎎ava7使用)

Fork/Join使用两个类:

  • ForkJoinTask:我?#19988;?#20351;用ForkJoin框架,必须首?#21364;?#24314;一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,Fork/Join框架提供了以下两个子类:

    • RecursiveAction:用于没有返回结果的任务。

    • RecursiveTask :用于有返回结果的任务。

  • ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务?#25351;?#20986;的子任务会添加到当前工作线程所维护的双端队列中。

ForkJoinPool与其他类型的ExecutorService的不同之处主要在于使用工作窃取,每个线程都有自己的双端任务队列,线程在一般情况下会从队?#22411;?#21435;获取任务,当某个线程任务队列的为空的时候,它会尝?#28304;?#20854;他线程的任务队列的尾部去“窃取”任务来执?#23567;?/span>

public class Thread13 extends RecursiveTask<Integer> {
    private int start;
    private int end;

    public Thread13(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int m = 1000;   //每个线程计算的?#27573;?#22823;小
        int s = start, n = end;  //每个线程计算的起始地址
        int r = 0;  //算和的变量
        List<ForkJoinTask<Integer>> it = new ArrayList<ForkJoinTask<Integer>>();
        while (s <= n) {
            if (n - s < m) {
                for (int i = s; i <= n; i++) {
                    r += i;
                }
            } else {
                n = Math.min(s + m - 1, n);        //得到一个新的start
                it.add(new Thread13(s, n).fork());  //得到每一个?#27573;如(0,999)]加入一个线程
            }
            s = n + 1;
            n = end;
        }

        for (ForkJoinTask<Integer> t : it) {
            r += t.join();
        }
        return r;
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool fjp = new ForkJoinPool();
        int s = 1, n = 10001;
        Future<Integer> rs = fjp.submit(new Thread13(s, n));
        System.out.println(rs.get());
    }
}
/* output
 * 50015001
 */

  


 

参考文献

  1. 庞永华. Java多线程与Socket:实战微服务框架[M].电子工业出版社.2019.3

  2. Executors类中创建线程池的几种方法的分析

  3. 知乎——如果你这样回答“什么是线程安全”,面试官都会对你?#25991;?#30456;看

  4. 知乎——Java线程内存模型,线程、工作内存、主内存

  5. Java进阶——Java中的Atomic原子特性

  6. 深入理解Java并发之synchronized实现原理

  7. Java的wait(), notify()和notifyAll()使用小结

  8. java多线程-07-Lock和Condition

  9. Java并发编程:CountDownLatch、CyclicBarrier和Semaphor

posted @ 2019-10-23 19:46 晓乎 阅读(...) 评论(...) 编辑 收藏
访问:AmazingCounters.com
三剑客和女王APP
广西11选5 一定牛 吉林省新11选5 3d走势图分析 微商现在如何赚钱2015 甘肃11选5开奖走势图 辛运28开奖参考 绣珍阁赚钱吗 百赢棋牌 三分彩时时彩中奖规则 富豪团队qq群能赚钱 湖北十一选五走势图表 如何在手机上打文章赚钱吗 三分彩的算法 有一个会赚钱的女朋友 福利七乐彩走势图 三点一线炒股法