1. 주요 개념 정리
가. Synchronous vs Asynchronous
동기와 비동기를 구준하는 기준은 '작업 순서의 보장 여부'
•
Synchronous(동기화)
◦
작업의 순서가 보장됨
◦
프로세스는 요청한 작업의 결과를 받고나서 다음 작업 처리
•
Asynchronous(비동기화)
◦
작업의 순서가 보장되지 않음
◦
프로세스는 요청한 작업의 결과를 받지 않더라도 다음 작업 처리
나. Blocking vs Non-Blocking
블락킹과 논블락킹을 구분하는 기준은 '제어권 양도 시점'
•
Blocking
◦
특정 함수(A)가 다른 함수(B)를 호출할 때, 호출한 함수(B)에게 제어권을 양도
→ 함수 A는 제어권을 상실함으로써 일시적으로 작업을 멈춤
◦
프로세스는 요청한 작업이 끝날때가지 멈춤
•
Non-Blocking
◦
특정 함수(A)가 다른 함수(B)를 호출할 때, 호출한 함수(B)에게 제어권을 양도하지 않음
◦
프로세스는 요청한 작업이 끝나지 않더라도 작업을 멈추지 않음
다. Synchronous vs Asynchronous vs Blocking vs Non-Blocking
•
Synchronous Non-blocking I/O: 프로세스는 작업을 요청하고 해당 작업이 끝나기까지의 기간에 작업의 완료여부를 지속적으로 체크함
→ 다시 말해, 다음 작업을 기다리지만 해당 작업의 완료여부를 계속 체크함으로써 프로세스가 멈춘 건 아님
→ Polling: 지속적으로 상태 체크하는 것
2. 쓰레드 구현과 실행
가. Thread 클래스 상속 vs Runnable 인터페이스 구현
•
Thread 클래스 상속 방법: 상속 방법을 선택하면 다른 클래스를 상속 받을 수 없음
•
Runnable 인터페이스 구현: 인터페이스를 구현하는 방법을 선택하면 다른 클래스를 상속 받을 수 있기 때문에 보다 객체지향적으로 시스템을 설계할 수 있음
→ 보다 일반적인 방법
가. Thread 클래스 상속에 따른 구현
•
인스턴스 생성: Thread의 자손 클래스(ThreadEx1_1)의 인스턴스로 직접 생성 가능
•
현재 쓰레드 호출: 부모 클래스인 Thread의 메소드에 직접 접근 가능하므로 현재 쓰레드의 getName() 메소드도 직접 호출
public class ThreadEx1 {
public static void main(String args[]){
ThreadEx1_1 t1 = new ThreadEx1_1();
t1.start();
}
}
class ThreadEx1_1 extends Thread {
@Override
public void run() {
for(int i=0; i < 5; i++) System.out.println(getName());
}
}
Java
복사
나. Runnable 인터페이스 상속에 따른 구현
•
인스턴스 생성: Runnable 인터페이스를 구현한 클래스(ThreadEx1_2)의 인스턴스를 Thread 클래스의 매개변수로 전달하여 Thread 인스턴스 생성
•
현재 쓰레드 호출: Thread를 직접 상속 받지 않았기 때문에 Thread의 static 메소드(currentThread())를 호출하여 현재 쓰레드에 대한 참조변수를 얻어와야 한다. 참조변수를 얻어오면 현재 쓰레드에 대한 Thread의 메소드를 활용할 수 있음
public class ThreadEx1 {
public static void main(String args[])
Runnable r1 = new ThreadEx1_2();
Thread t2 = new Thread(r1);
t2.start();
}
}
class ThreadEx1_2 implements Runnable {
@Override
public void run() {
for(int i=0; i < 5; i++) System.out.println(Thread.currentThread().getName());
}
}
Java
복사
다. 쓰레드의 실행 start() vs run()
•
start() 메소드를 호출해야 쓰레드가 실행됨
•
start()메소드를 실행하면 해당 쓰레드는 실행대기 상태에서 기다리다가 순서가 되면 실행됨
•
한 번 실행이 종료된 쓰레드는 재실행할 수 없으므로 관련 인스턴스를 다시 생성해야 함
•
start() 메소드를 실행한다는 것은 해당 쓰레드 만의 호출 스택을 생성한다는 것을 의미함
•
반면 run() 메소드를 실행하는 것은 이미 존재하는 하나의 호출 스택 위에 main() 쓰레드 바로 위에 run()이라는 메소드를 추가적으로 올릴 뿐임
3. 비동기 병렬처리 방식 in Java
가. synchronized를 이용한 임계영역 설정
•
쓰레드 동기화가 필요한 이유
→ 특정 쓰레드가 작업 중인 데이터에 다른 쓰레드가 영향을 줄 수 있기 때문
public class ThreadEx21 {
public static void main(String args[]){
Runnable r = new RunnableEx21();
new Thread(r, "firstThread").start();
new Thread(r, "secondThread").start();
}
}
class Account {
private int balance = 1000;
public int getBalance(){
return balance;
}
public void withdraw(int money) {
if(balance >= money){
try {
// 이 순간 다른 쓰레드가 끼어들어서 balance data에 영향을 줌
Thread.sleep(1000);
} catch (InterruptedException e) {}
balance -= money;
}
}
}
class RunnableEx21 implements Runnable {
Account acc = new Account();
@Override
public void run() {
while(acc.getBalance() > 0){
int money = (int)(Math.random() * 3 + 1) * 100;
acc.withdraw(money);
System.out.println("balance" + acc.getBalance() + Thread.currentThread().getName());
System.out.println("balance" + acc.getBalance() + Thread.currentThread().getName());
}
}
}
Java
복사
•
쓰레드 동기화(Synchronization): 한 쓰레드가 작업 중인 영역에 대해 다른 쓰레드가 영향을 주지 않도록 하는 것
•
임계영역(critical section): 공유 데이터를 사용하는 코드 영역
•
락(lock): 쓰레드가 공유 데이터를 사용하기 위해 얻어야 할 권할
•
임계영역 설정: synchronized 키워드 활용
→ 1. 메서드 전체를 임계영역으로 설정
public synchronized void withdraw(int money) {
if(balance >= money){
try {
// 이 순간 다른 쓰레드가 끼어들어서 balance data에 영향을 줌
Thread.sleep(1000);
} catch (InterruptedException e) {}
balance -= money;
}
}
Java
복사
→ 2. 특정 영역을 임계영역으로 설정
public synchronized void withdraw(int money) {
synchronized(this){
if(balance >= money){
try {
// 이 순간 다른 쓰레드가 끼어들어서 balance data에 영향을 줌
Thread.sleep(1000);
} catch (InterruptedException e) {}
balance -= money;
}
}
}
Java
복사
•
특정 영역을 임계영역으로 설정하는 것을 권장
→ 임계영역 설정은 멀티쓰레드 프로그램의 성능에 영향을 주므로 최소화하는 것이 적절함
•
주의할 점) 만약 balance의 접근제어자가 public이라면 외부에서 balance의 값을 갱신할 때, synchronized로 동기화할 수 없음.
→ synchronized는 지정된 영역에 대해 하나의 쓰레드만 수행하는 것을 보장하는 것이므로 외부의 접근을 제어하지 못함
나. wait(), notify()
•
배경: 특정 쓰레드가 락을 가지고 임계영역에 장기간 머물러 있으면 시스템 전체의 지연으로 연결될 수 있으므로 이를 방지하기 위해 탄생
→ 효율적인 동기화 지원
•
특정 쓰레드가 락을 가지고 오래 있을 것 같다면 wait()을 호출하여 대기열(waiting pool)에 기다림
•
대기열에서 기다리다가 notify()가 호출되면 대기열에서 나와서 lock을 받을 수 있음
•
이하 예제 코드에서 두 가지의 예외 발생
→ 1) ConcurrentModificationException: Cook Thread의 run 부분에서 Cook이 임의의 요리를 dishes에 삽입하는 중에 Customer Thread가 dishes에 갱신 작업을 시도했기 때문
→ 2) IndexOfOfBoundsException: dishes의 마지막 요소를 CUST1이 제거하려는 중에 CUST2가 먼저 제거해서 빈 리스트에 접근하기 때문
public class ThreadWaitEx1 {
public static void main(String args[]) throws Exception {
Table table = new Table();
// Cook: 음식을 만들어서 table의 dishes list에 삽입
new Thread(new Cook(table), "COOK1").start();
// CUST1: donut 먹어서 table의 dishes list에서 dounut 요소 삭제
new Thread(new Customer(table, "donut"), "CUST1").start();
// CUST2: burger 먹어서 table의 dishes list에서 burger 요소 삭제
new Thread(new Customer(table, "burger"), "CUST2").start();
Thread.sleep(100);
System.exit(0);
}
}
class Customer implements Runnable {
private Table _table;
private String _food;
public Customer(Table table, String food) {
_table = table;
_food = food;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
String name = Thread.currentThread().getName();
if (eatFood())
System.out.println(name + " ate a " + _food);
else
System.out.println(name + "failed to eat.");
}
}
boolean eatFood() {
return _table.remove(_food);
}
}
class Cook implements Runnable {
private Table _table;
public Cook(Table table) {
_table = table;
}
@Override
public void run() {
while (true) {
// 임의의 요리를 선택해서 dishes에 추가함
int idx = (int) (Math.random() * _table.dishNum());
_table.add(_table.dishNames[idx]);
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
}
}
}
class Table {
String[] dishNames = {"donut", "donut", "burger"};
final int MAX_FOOD = 6;
private ArrayList<String> dishes = new ArrayList<>();
public void add(String dish){
if(dishes.size() >= MAX_FOOD) return;
dishes.add(dish);
System.out.println("Dishes:" + dishes.toString());
}
public boolean remove(String dishNames){
for(int i=0; i<dishes.size(); i++)
if(dishNames.equals(dishes.get(i))){
dishes.remove(i);
return true;
}
return false;
}
public int dishNum() { return dishNames.length; }
}
Java
복사
•
위의 두 예외가 발생하는 이유는 테이블의 dishes(공통데이터)에 여러 쓰레드가 동기화되지 않은 상태로 접근하기 때문임
→ 다음과 같이 synchronized 키워드로 동기화 영역 설정
public synchronized void add(String dish){
if(dishes.size() >= MAX_FOOD) return;
dishes.add(dish);
System.out.println("Dishes:" + dishes.toString());
}
public boolean remove(String dishNames){
synchronized (this) {
for (int i = 0; i < dishes.size(); i++)
if (dishNames.equals(dishes.get(i))) {
dishes.remove(i);
return true;
}
}
return false;
}
Java
복사
•
아래의 예제는 다음과 같이 수정하여 wait과 notify의 필요성을 부각함
1) Thread.sleep 시간을 조금 변경
2) dishes 리스트에 음식이 없으면 음식을 섭취(table.remove)하기 전에 0.5초 기다리는 코드를 추가함
public class ThreadWaitEx2 {
public static void main(String args[]) throws Exception {
Table table = new Table();
// Cook: 음식을 만들어서 table의 dishes list에 삽입
new Thread(new Cook(table), "COOK1").start();
// CUST1: donut 먹어서 table의 dishes list에서 dounut 요소 삭제
new Thread(new Customer(table, "donut"), "CUST1").start();
// CUST2: burger 먹어서 table의 dishes list에서 burger 요소 삭제
new Thread(new Customer(table, "burger"), "CUST2").start();
Thread.sleep(5000);
System.exit(0);
}
}
class Customer implements Runnable {
private Table _table;
private String _food;
public Customer(Table table, String food) {
_table = table;
_food = food;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
String name = Thread.currentThread().getName();
if (eatFood())
System.out.println(name + " ate a " + _food);
else
System.out.println(name + " failed to eat.");
}
}
boolean eatFood() {
return _table.remove(_food);
}
}
class Cook implements Runnable {
private Table _table;
public Cook(Table table) {
_table = table;
}
@Override
public void run() {
while (true) {
// 임의의 요리를 선택해서 dishes에 추가함
int idx = (int) (Math.random() * _table.dishNum());
_table.add(_table.dishNames[idx]);
try {
Thread.sleep(100);
} catch (InterruptedException e) {}
}
}
}
class Table {
String[] dishNames = {"donut", "donut", "burger"};
final int MAX_FOOD = 6;
private ArrayList<String> dishes = new ArrayList<>();
public synchronized void add(String dish){
if(dishes.size() >= MAX_FOOD) return;
dishes.add(dish);
System.out.println("Dishes:" + dishes.toString());
}
// 임계영역 최소화
public boolean remove(String dishNames){
synchronized (this) {
while(dishes.size() == 0){
String name = Thread.currentThread().getName();
System.out.println(name + " is waiting");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (int i = 0; i < dishes.size(); i++)
if (dishNames.equals(dishes.get(i))) {
dishes.remove(i);
return true;
}
}
return false;
}
public int dishNum() { return dishNames.length; }
}
Java
복사
•
위의 예제의 경우 Customer Thread가 dishes 리스트에 음식이 add되기를 무한히 waiting만 하고 있는 상황임
→ 임계영역에 대한 lock을 해당 Customer Thread가 가지고 있기 때문에 Cook Thread가 dishes 리스트에 음식을 추가할 수 없음
public class ThreadWaitEx2 {
public static void main(String args[]) throws Exception {
Table table = new Table();
// Cook: 음식을 만들어서 table의 dishes list에 삽입
new Thread(new Cook(table), "COOK1").start();
// CUST1: donut 먹어서 table의 dishes list에서 dounut 요소 삭제
new Thread(new Customer(table, "donut"), "CUST1").start();
// CUST2: burger 먹어서 table의 dishes list에서 burger 요소 삭제
new Thread(new Customer(table, "burger"), "CUST2").start();
Thread.sleep(5000);
System.exit(0);
}
}
class Customer implements Runnable {
private Table _table;
private String _food;
public Customer(Table table, String food) {
_table = table;
_food = food;
}
@Override
public void run() {
while (true) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
String name = Thread.currentThread().getName();
System.out.println(name + " ate a " + _food);
}
}
}
class Cook implements Runnable {
private Table _table;
public Cook(Table table) {
_table = table;
}
@Override
public void run() {
while (true) {
// 임의의 요리를 선택해서 dishes에 추가함
int idx = (int) (Math.random() * _table.dishNum());
_table.add(_table.dishNames[idx]);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
}
}
}
}
class Table {
String[] dishNames = {"donut", "donut", "burger"};
final int MAX_FOOD = 6;
private ArrayList<String> dishes = new ArrayList<>();
public synchronized void add(String dish) {
if (dishes.size() >= MAX_FOOD) return;
dishes.add(dish);
System.out.println("Dishes:" + dishes.toString());
}
public void remove(String dishNames) {
synchronized (this) {
while (dishes.size() == 0) {
String name = Thread.currentThread().getName();
System.out.println(name + " is waiting");
try {
wait(); // Customer Thread를 대기열에 삽입
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
while (true) {
for (int i = 0; i < dishes.size(); i++) {
if (dishNames.equals(dishes.get(i))) {
dishes.remove(i);
notify(); // 결과적으로 대기열에서 COOK Thread가 나올때까지 반복
return;
}
}
}
}
}
public int dishNum() {
return dishNames.length;
}
}
Java
복사
다. Future API
•
비동기 작업 결과 반환
라. Completable Future API
•
의존성 있는 비동기 작업 처리
Reference
•
비동기 병렬 처리 방식
남궁성, 자바의 정석 3rd Edition
https://pjh3749.tistory.com/280