Search
🚤

비동기 기본 개념(feat. Java)

1. 주요 개념 정리

가. Synchronous vs Asynchronous

동기와 비동기를 구준하는 기준은 '작업 순서의 보장 여부'
Synchronous(동기화)
작업의 순서가 보장됨
프로세스는 요청한 작업의 결과를 받고나서 다음 작업 처리
Asynchronous(비동기화)
작업의 순서가 보장되지 않음
프로세스는 요청한 작업의 결과를 받지 않더라도 다음 작업 처리

나. Blocking vs Non-Blocking

블락킹과 논블락킹을 구분하는 기준은 '제어권 양도 시점'
Blocking
특정 함수(A)가 다른 함수(B)를 호출할 때, 호출한 함수(B)에게 제어권을 양도
→ 함수 A는 제어권을 상실함으로써 일시적으로 작업을 멈춤
프로세스는 요청한 작업이 끝날때가지 멈춤
Non-Blocking
특정 함수(A)가 다른 함수(B)를 호출할 때, 호출한 함수(B)에게 제어권을 양도하지 않음
프로세스는 요청한 작업이 끝나지 않더라도 작업을 멈추지 않음

다. Synchronous vs Asynchronous vs Blocking vs Non-Blocking

Synchronous Non-blocking I/O: 프로세스는 작업을 요청하고 해당 작업이 끝나기까지의 기간에 작업의 완료여부를 지속적으로 체크함
→ 다시 말해, 다음 작업을 기다리지만 해당 작업의 완료여부를 계속 체크함으로써 프로세스가 멈춘 건 아님
→ Polling: 지속적으로 상태 체크하는 것

2. 쓰레드 구현과 실행

가. Thread 클래스 상속 vs Runnable 인터페이스 구현

Thread 클래스 상속 방법: 상속 방법을 선택하면 다른 클래스를 상속 받을 수 없음
Runnable 인터페이스 구현: 인터페이스를 구현하는 방법을 선택하면 다른 클래스를 상속 받을 수 있기 때문에 보다 객체지향적으로 시스템을 설계할 수 있음
→ 보다 일반적인 방법

가. Thread 클래스 상속에 따른 구현

인스턴스 생성: Thread의 자손 클래스(ThreadEx1_1)의 인스턴스로 직접 생성 가능
현재 쓰레드 호출: 부모 클래스인 Thread의 메소드에 직접 접근 가능하므로 현재 쓰레드의 getName() 메소드도 직접 호출
public class ThreadEx1 { public static void main(String args[]){ ThreadEx1_1 t1 = new ThreadEx1_1(); t1.start(); } } class ThreadEx1_1 extends Thread { @Override public void run() { for(int i=0; i < 5; i++) System.out.println(getName()); } }
Java
복사

나. Runnable 인터페이스 상속에 따른 구현

인스턴스 생성: Runnable 인터페이스를 구현한 클래스(ThreadEx1_2)의 인스턴스를 Thread 클래스의 매개변수로 전달하여 Thread 인스턴스 생성
현재 쓰레드 호출: Thread를 직접 상속 받지 않았기 때문에 Thread의 static 메소드(currentThread())를 호출하여 현재 쓰레드에 대한 참조변수를 얻어와야 한다. 참조변수를 얻어오면 현재 쓰레드에 대한 Thread의 메소드를 활용할 수 있음
public class ThreadEx1 { public static void main(String args[]) Runnable r1 = new ThreadEx1_2(); Thread t2 = new Thread(r1); t2.start(); } } class ThreadEx1_2 implements Runnable { @Override public void run() { for(int i=0; i < 5; i++) System.out.println(Thread.currentThread().getName()); } }
Java
복사

다. 쓰레드의 실행 start() vs run()

start() 메소드를 호출해야 쓰레드가 실행됨
start()메소드를 실행하면 해당 쓰레드는 실행대기 상태에서 기다리다가 순서가 되면 실행됨
한 번 실행이 종료된 쓰레드는 재실행할 수 없으므로 관련 인스턴스를 다시 생성해야 함
start() 메소드를 실행한다는 것은 해당 쓰레드 만의 호출 스택을 생성한다는 것을 의미함
반면 run() 메소드를 실행하는 것은 이미 존재하는 하나의 호출 스택 위에 main() 쓰레드 바로 위에 run()이라는 메소드를 추가적으로 올릴 뿐임

3. 비동기 병렬처리 방식 in Java

가. synchronized를 이용한 임계영역 설정

쓰레드 동기화가 필요한 이유
→ 특정 쓰레드가 작업 중인 데이터에 다른 쓰레드가 영향을 줄 수 있기 때문
public class ThreadEx21 { public static void main(String args[]){ Runnable r = new RunnableEx21(); new Thread(r, "firstThread").start(); new Thread(r, "secondThread").start(); } } class Account { private int balance = 1000; public int getBalance(){ return balance; } public void withdraw(int money) { if(balance >= money){ try { // 이 순간 다른 쓰레드가 끼어들어서 balance data에 영향을 줌 Thread.sleep(1000); } catch (InterruptedException e) {} balance -= money; } } } class RunnableEx21 implements Runnable { Account acc = new Account(); @Override public void run() { while(acc.getBalance() > 0){ int money = (int)(Math.random() * 3 + 1) * 100; acc.withdraw(money); System.out.println("balance" + acc.getBalance() + Thread.currentThread().getName()); System.out.println("balance" + acc.getBalance() + Thread.currentThread().getName()); } } }
Java
복사
쓰레드 동기화(Synchronization): 한 쓰레드가 작업 중인 영역에 대해 다른 쓰레드가 영향을 주지 않도록 하는 것
임계영역(critical section): 공유 데이터를 사용하는 코드 영역
락(lock): 쓰레드가 공유 데이터를 사용하기 위해 얻어야 할 권할
임계영역 설정: synchronized 키워드 활용
→ 1. 메서드 전체를 임계영역으로 설정
public synchronized void withdraw(int money) { if(balance >= money){ try { // 이 순간 다른 쓰레드가 끼어들어서 balance data에 영향을 줌 Thread.sleep(1000); } catch (InterruptedException e) {} balance -= money; } }
Java
복사
→ 2. 특정 영역을 임계영역으로 설정
public synchronized void withdraw(int money) { synchronized(this){ if(balance >= money){ try { // 이 순간 다른 쓰레드가 끼어들어서 balance data에 영향을 줌 Thread.sleep(1000); } catch (InterruptedException e) {} balance -= money; } } }
Java
복사
특정 영역을 임계영역으로 설정하는 것을 권장
→ 임계영역 설정은 멀티쓰레드 프로그램의 성능에 영향을 주므로 최소화하는 것이 적절함
주의할 점) 만약 balance의 접근제어자가 public이라면 외부에서 balance의 값을 갱신할 때, synchronized로 동기화할 수 없음.
→ synchronized는 지정된 영역에 대해 하나의 쓰레드만 수행하는 것을 보장하는 것이므로 외부의 접근을 제어하지 못함

나. wait(), notify()

배경: 특정 쓰레드가 락을 가지고 임계영역에 장기간 머물러 있으면 시스템 전체의 지연으로 연결될 수 있으므로 이를 방지하기 위해 탄생
→ 효율적인 동기화 지원
특정 쓰레드가 락을 가지고 오래 있을 것 같다면 wait()을 호출하여 대기열(waiting pool)에 기다림
대기열에서 기다리다가 notify()가 호출되면 대기열에서 나와서 lock을 받을 수 있음
이하 예제 코드에서 두 가지의 예외 발생
→ 1) ConcurrentModificationException: Cook Thread의 run 부분에서 Cook이 임의의 요리를 dishes에 삽입하는 중에 Customer Thread가 dishes에 갱신 작업을 시도했기 때문
→ 2) IndexOfOfBoundsException: dishes의 마지막 요소를 CUST1이 제거하려는 중에 CUST2가 먼저 제거해서 빈 리스트에 접근하기 때문
public class ThreadWaitEx1 { public static void main(String args[]) throws Exception { Table table = new Table(); // Cook: 음식을 만들어서 table의 dishes list에 삽입 new Thread(new Cook(table), "COOK1").start(); // CUST1: donut 먹어서 table의 dishes list에서 dounut 요소 삭제 new Thread(new Customer(table, "donut"), "CUST1").start(); // CUST2: burger 먹어서 table의 dishes list에서 burger 요소 삭제 new Thread(new Customer(table, "burger"), "CUST2").start(); Thread.sleep(100); System.exit(0); } } class Customer implements Runnable { private Table _table; private String _food; public Customer(Table table, String food) { _table = table; _food = food; } @Override public void run() { while (true) { try { Thread.sleep(10); } catch (InterruptedException e) { } String name = Thread.currentThread().getName(); if (eatFood()) System.out.println(name + " ate a " + _food); else System.out.println(name + "failed to eat."); } } boolean eatFood() { return _table.remove(_food); } } class Cook implements Runnable { private Table _table; public Cook(Table table) { _table = table; } @Override public void run() { while (true) { // 임의의 요리를 선택해서 dishes에 추가함 int idx = (int) (Math.random() * _table.dishNum()); _table.add(_table.dishNames[idx]); try { Thread.sleep(10); } catch (InterruptedException e) {} } } } class Table { String[] dishNames = {"donut", "donut", "burger"}; final int MAX_FOOD = 6; private ArrayList<String> dishes = new ArrayList<>(); public void add(String dish){ if(dishes.size() >= MAX_FOOD) return; dishes.add(dish); System.out.println("Dishes:" + dishes.toString()); } public boolean remove(String dishNames){ for(int i=0; i<dishes.size(); i++) if(dishNames.equals(dishes.get(i))){ dishes.remove(i); return true; } return false; } public int dishNum() { return dishNames.length; } }
Java
복사
위의 두 예외가 발생하는 이유는 테이블의 dishes(공통데이터)에 여러 쓰레드가 동기화되지 않은 상태로 접근하기 때문임
→ 다음과 같이 synchronized 키워드로 동기화 영역 설정
public synchronized void add(String dish){ if(dishes.size() >= MAX_FOOD) return; dishes.add(dish); System.out.println("Dishes:" + dishes.toString()); } public boolean remove(String dishNames){ synchronized (this) { for (int i = 0; i < dishes.size(); i++) if (dishNames.equals(dishes.get(i))) { dishes.remove(i); return true; } } return false; }
Java
복사
아래의 예제는 다음과 같이 수정하여 wait과 notify의 필요성을 부각함 1) Thread.sleep 시간을 조금 변경 2) dishes 리스트에 음식이 없으면 음식을 섭취(table.remove)하기 전에 0.5초 기다리는 코드를 추가함
public class ThreadWaitEx2 { public static void main(String args[]) throws Exception { Table table = new Table(); // Cook: 음식을 만들어서 table의 dishes list에 삽입 new Thread(new Cook(table), "COOK1").start(); // CUST1: donut 먹어서 table의 dishes list에서 dounut 요소 삭제 new Thread(new Customer(table, "donut"), "CUST1").start(); // CUST2: burger 먹어서 table의 dishes list에서 burger 요소 삭제 new Thread(new Customer(table, "burger"), "CUST2").start(); Thread.sleep(5000); System.exit(0); } } class Customer implements Runnable { private Table _table; private String _food; public Customer(Table table, String food) { _table = table; _food = food; } @Override public void run() { while (true) { try { Thread.sleep(10); } catch (InterruptedException e) { } String name = Thread.currentThread().getName(); if (eatFood()) System.out.println(name + " ate a " + _food); else System.out.println(name + " failed to eat."); } } boolean eatFood() { return _table.remove(_food); } } class Cook implements Runnable { private Table _table; public Cook(Table table) { _table = table; } @Override public void run() { while (true) { // 임의의 요리를 선택해서 dishes에 추가함 int idx = (int) (Math.random() * _table.dishNum()); _table.add(_table.dishNames[idx]); try { Thread.sleep(100); } catch (InterruptedException e) {} } } } class Table { String[] dishNames = {"donut", "donut", "burger"}; final int MAX_FOOD = 6; private ArrayList<String> dishes = new ArrayList<>(); public synchronized void add(String dish){ if(dishes.size() >= MAX_FOOD) return; dishes.add(dish); System.out.println("Dishes:" + dishes.toString()); } // 임계영역 최소화 public boolean remove(String dishNames){ synchronized (this) { while(dishes.size() == 0){ String name = Thread.currentThread().getName(); System.out.println(name + " is waiting"); try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } for (int i = 0; i < dishes.size(); i++) if (dishNames.equals(dishes.get(i))) { dishes.remove(i); return true; } } return false; } public int dishNum() { return dishNames.length; } }
Java
복사
위의 예제의 경우 Customer Thread가 dishes 리스트에 음식이 add되기를 무한히 waiting만 하고 있는 상황임
→ 임계영역에 대한 lock을 해당 Customer Thread가 가지고 있기 때문에 Cook Thread가 dishes 리스트에 음식을 추가할 수 없음
public class ThreadWaitEx2 { public static void main(String args[]) throws Exception { Table table = new Table(); // Cook: 음식을 만들어서 table의 dishes list에 삽입 new Thread(new Cook(table), "COOK1").start(); // CUST1: donut 먹어서 table의 dishes list에서 dounut 요소 삭제 new Thread(new Customer(table, "donut"), "CUST1").start(); // CUST2: burger 먹어서 table의 dishes list에서 burger 요소 삭제 new Thread(new Customer(table, "burger"), "CUST2").start(); Thread.sleep(5000); System.exit(0); } } class Customer implements Runnable { private Table _table; private String _food; public Customer(Table table, String food) { _table = table; _food = food; } @Override public void run() { while (true) { try { Thread.sleep(10); } catch (InterruptedException e) { } String name = Thread.currentThread().getName(); System.out.println(name + " ate a " + _food); } } } class Cook implements Runnable { private Table _table; public Cook(Table table) { _table = table; } @Override public void run() { while (true) { // 임의의 요리를 선택해서 dishes에 추가함 int idx = (int) (Math.random() * _table.dishNum()); _table.add(_table.dishNames[idx]); try { Thread.sleep(100); } catch (InterruptedException e) { } } } } class Table { String[] dishNames = {"donut", "donut", "burger"}; final int MAX_FOOD = 6; private ArrayList<String> dishes = new ArrayList<>(); public synchronized void add(String dish) { if (dishes.size() >= MAX_FOOD) return; dishes.add(dish); System.out.println("Dishes:" + dishes.toString()); } public void remove(String dishNames) { synchronized (this) { while (dishes.size() == 0) { String name = Thread.currentThread().getName(); System.out.println(name + " is waiting"); try { wait(); // Customer Thread를 대기열에 삽입 Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } } while (true) { for (int i = 0; i < dishes.size(); i++) { if (dishNames.equals(dishes.get(i))) { dishes.remove(i); notify(); // 결과적으로 대기열에서 COOK Thread가 나올때까지 반복 return; } } } } } public int dishNum() { return dishNames.length; } }
Java
복사

다. Future API

비동기 작업 결과 반환

라. Completable Future API

의존성 있는 비동기 작업 처리

Reference

비동기 병렬 처리 방식 남궁성, 자바의 정석 3rd Edition https://pjh3749.tistory.com/280