同步机制的实现

信号量(Semaphore)

信号量包含三个内容:Semaphore S(信号量),Operation P(P操作,表示wait()),Operation V(V操作,表示signal())

信号量是用于指示共享资源的可用数量,P操作可以减少S的计数,V则增加它的计数。

当某个进程想进入共享区的时候,需要先P操作,离开共享区的时候,则需要V操作。并且,PV操作是原子级别的,执行过程是不允许被中断的。

P操作执行过程

1、信号量S自减1
2、如果此时S仍然大于等于0,说明共享资源此时是允许访问的,因而调用者会直接返回,开始操作共享资源
3、如果S小于0,则需要等待别人主动释放资源,这种情况下调用者会加入等待队列,直到后续唤醒
4、当释放了共享资源之后,处于等待队列中的相关对象就会被唤醒,此时该对象就具备了资源的访问权

V操作执行过程

1、信号量S自增1
2、如果S大于等于0,说明没人等待,直接返回
3、S小于0,要唤醒等待队列中的对象,对应P操作第四步

缺点

信号量机制的缺点:进程自备同步操作,P(S)和V(S)操作大量分散在各个进程中,不易管理,易发生死锁。

互斥锁(mutex)

互斥锁其实可以看做为1的信号量,也就是binary semaphore

管程(monitor)

管程是由局部于自己的若干公共变量及其说明和所有访问这些公共变量的过程所组成的软件模块。

管程特点

管程封装了同步操作,对进程隐蔽了同步细节,简化了同步功能的调用界面。用户编写并发程序如同编写顺序(串行)程序。

管程目的

1、把分散在各进程中的临界区集中起来进行管理;
2、防止进程有意或无意的违法同步操作;
3、便于用高级语言来书写程序,也便于程序正确性验证。

管程的属性

共享性:管程可被系统范围内的进程互斥访问,属于共享资源
安全性:管程的局部变量只能由管程的过程访问,不允许进程或其它管程直接访问,管程也不能访问非局部于它的变量。
互斥性:多个进程对管程的访问是互斥的。任一时刻,管程中只能有一个活跃进程。
封装性:管程内的数据结构是私有的,只能在管程内使用,管程内的过程也只能使用管程内的数据结构。进程通过调用管程的过程使用临界资源。管程在Java中已实现。

Java中信号量的相关Api

此处要结合并发编程来看,先预留

同步范例

书中介绍的就是生产者消费者模型,的确,生产者消费者模型就是基于共享空间来进行设计的。

生产者消费者-原生使用sychronized

代码我就抄一下

共享空间-仓库模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import java.util.LinkedList;

public class Storage {

// 仓库容量
private final int MAX_SIZE = 10;
// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<>();

public void produce() {
synchronized (list) {
while (list.size() + 1 > MAX_SIZE) {
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】仓库已满");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + list.size());
list.notifyAll();
}
}

public void consume() {
synchronized (list) {
while (list.size() == 0) {
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】仓库为空");
try {
list.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.remove();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + list.size());
list.notifyAll();
}
}
}

生产者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Producer implements Runnable{
private Storage storage;

public Producer(){}

public Producer(Storage storage , String name){
this.storage = storage;
}

@Override
public void run(){
while(true){
try{
Thread.sleep(1000);
storage.produce();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}

消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Consumer implements Runnable{
private Storage storage;

public Consumer(){}

public Consumer(Storage storage , String name){
this.storage = storage;
}

@Override
public void run(){
while(true){
try{
Thread.sleep(3000);
storage.consume();
}catch (InterruptedException e){
e.printStackTrace();
}
}
}
}

运行样例(3生产,3消费)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Main {

public static void main(String[] args) {
Storage storage = new Storage();
Thread p1 = new Thread(new Producer(storage));
Thread p2 = new Thread(new Producer(storage));
Thread p3 = new Thread(new Producer(storage));

Thread c1 = new Thread(new Consumer(storage));
Thread c2 = new Thread(new Consumer(storage));
Thread c3 = new Thread(new Consumer(storage));

p1.start();
p2.start();
p3.start();
c1.start();
c2.start();
c3.start();
}
}

运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
【生产者p2】生产一个产品,现库存2
【生产者p3】生产一个产品,现库存3
【生产者p1】生产一个产品,现库存4
【生产者p2】生产一个产品,现库存5
【生产者p3】生产一个产品,现库存6
【生产者p1】生产一个产品,现库存7
【生产者p2】生产一个产品,现库存8
【消费者c1】消费一个产品,现库存7
【生产者p3】生产一个产品,现库存8
【消费者c2】消费一个产品,现库存7
【消费者c3】消费一个产品,现库存6
【生产者p1】生产一个产品,现库存7
【生产者p2】生产一个产品,现库存8
【生产者p3】生产一个产品,现库存9
【生产者p1】生产一个产品,现库存10
【生产者p2】仓库已满
【生产者p3】仓库已满
【生产者p1】仓库已满
【消费者c1】消费一个产品,现库存9
【生产者p1】生产一个产品,现库存10
【生产者p3】仓库已满
。。。。。。以下省略

生产者消费者-使用await/signal

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import java.util.LinkedList;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Storage {

// 仓库最大存储量
private final int MAX_SIZE = 10;
// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
// 锁
private final Lock lock = new ReentrantLock();
// 仓库满的条件变量
private final Condition full = lock.newCondition();
// 仓库空的条件变量
private final Condition empty = lock.newCondition();

public void produce()
{
// 获得锁
lock.lock();
while (list.size() + 1 > MAX_SIZE) {
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】仓库已满");
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + list.size());

empty.signalAll();
lock.unlock();
}

public void consume()
{
// 获得锁
lock.lock();
while (list.size() == 0) {
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】仓库为空");
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.remove();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + list.size());

full.signalAll();
lock.unlock();
}
}

生产者消费者-使用信号量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
import java.util.LinkedList;
import java.util.concurrent.Semaphore;

public class Storage {

// 仓库存储的载体
private LinkedList<Object> list = new LinkedList<Object>();
// 仓库的最大容量
final Semaphore notFull = new Semaphore(10);
// 将线程挂起,等待其他来触发
final Semaphore notEmpty = new Semaphore(0);
// 互斥锁
final Semaphore mutex = new Semaphore(1);

public void produce()
{
try {
notFull.acquire();
mutex.acquire();
list.add(new Object());
System.out.println("【生产者" + Thread.currentThread().getName()
+ "】生产一个产品,现库存" + list.size());
}
catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
notEmpty.release();
}
}

public void consume()
{
try {
notEmpty.acquire();
mutex.acquire();
list.remove();
System.out.println("【消费者" + Thread.currentThread().getName()
+ "】消费一个产品,现库存" + list.size());
} catch (Exception e) {
e.printStackTrace();
} finally {
mutex.release();
notFull.release();
}
}
}