dh_0e

[특강] 멀티스레드 동기화 문제 (뮤텍스, 세마포어, CAS) 본문

내일배움캠프/특강

[특강] 멀티스레드 동기화 문제 (뮤텍스, 세마포어, CAS)

dh_0e 2024. 7. 26. 21:04

동기화가 고려되지 않은 멀티스레드 코드 예시 (JAVA)

public class CounterTest {
    private static int counter = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[10];

        // 동기화 없이 실행
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    counter++;
                }
            });
            threads[i].start();
        }

        for (int i = 0; i < 10; i++) {
            threads[i].join();
        }

        System.out.println("카운터 값: " + counter);
    }
}
  • 위 코드를 실행시키면 counter 값이 10000이 아니라 그에 못 미치는 99xx 값이 나온다.
  • 이는 데이터 경합 즉, Race condition이 발생한 결관데 동시에 counter 변수의 값을 읽고 그 값을 기반으로 각각 증가를 시키면 두 스레드 모두가 같은 값으로 증가시켜 증가된 횟수가 하나 줄어든 것이다.

 

 

lock으로 동기화가 적용 된 멀티스레드 코드 예시

public class CounterTest {
    private static int counter = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[10];

        // 동기화를 사용하여 실행
        Object lock = new Object(); // lock 객체 생성
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    synchronized (lock) {
                        counter++;
                    }
                }
            });
            threads[i].start();
        }

        for (int i = 0; i < 10; i++) {
            threads[i].join();
        }

        System.out.println("카운터 값: " + counter);
    }
}
  • 다른 스레드가 이 블록을 실행하려고 할 때는 lock 객체가 잠근 상태이기 때문에 접근이 불가능하게 된다.
  • 이러한 lock 기법을 조금 더 세부적으로 들어가면 Mutex, Semaphore를 활용하여 잠금을 할 수 있다.

 

 

뮤텍스(Mutex)

  • 뮤텍스는 기본적으로 잠금과 해제 기능을 제공한다.
  • 한 스레드가 뮤텍스를 특정 크리티컬 섹션에 대해 잠그면 다른 스레드는 해당 뮤텍스가 해제될 때까지 기다려야 함
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class CounterTest {
    private static int counter = 0;
    private static Lock lock = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[10];

        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    lock.lock(); // 뮤텍스 잠금
                    try {
                        counter++;
                    } finally {
                        lock.unlock(); // 뮤텍스 해제 (잠금이 있으면 반드시 해제를 해야 함!)
                    }
                }
            });
            threads[i].start();
        }

        for (int i = 0; i < 10; i++) {
            threads[i].join();
        }

        System.out.println("카운터 값: " + counter);
    }
}

 

 

 

데드락(deadlock)

  • 스레드 2개가 각각 다른 뮤텍스를 잠금하고 다음으로 각자 잠금한 뮤텍스를 잠그고자 기다리게 되는 교착상태
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class DeadlockExample {
    private static Lock lock1 = new ReentrantLock();
    private static Lock lock2 = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
        Thread t1 = new Thread(new Task1());
        Thread t2 = new Thread(new Task2());

        t1.start();
        t2.start();

        t1.join();
        t2.join();
    }

    static class Task1 implements Runnable {
        @Override
        public void run() {
            try {
                lock1.lock(); // lock1 잠금
                System.out.println("[Task1] lock1 잠금 성공");

                // 잠시 대기하여 다른 스레드가 lock2를 잠글 시간을 줍니다.
                Thread.sleep(50);

                lock2.lock(); // lock2 잠금
                System.out.println("[Task1] lock2 잠금 성공");

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock1.unlock();
                lock2.unlock();
            }
        }
    }

    static class Task2 implements Runnable {
        @Override
        public void run() {
            try {
                lock2.lock(); // lock2 잠금
                System.out.println("[Task2] lock2 잠금 성공");

                // 잠시 대기하여 다른 스레드가 lock1을 잠글 시간을 줍니다.
                Thread.sleep(50);

                lock1.lock(); // lock1 잠금
                System.out.println("[Task2] lock1 잠금 성공");

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                lock2.unlock();
                lock1.unlock();
            }
        }
    }
}

 

  1. Task1 스레드가 lock1을 잠그고, Task2 스레드가 lock2를 잠근다.
  2. Task1 스레드가 lock2를 잠그기위해 lock2가 잠금이 풀릴 때까지 기다린다.
  3. 이때 Task2 스레드도 lock1을 잠그기 위해 기다리면서 서로 lock1과 lock2를 잠그고 기다리면서 발생

정리: 두 스레드가 서로가 필요로 하는 자원을 잠근 상태에서 양보를 해주지 않고 무한 대기 상태에 빠지는 상황

 

 

세마포어(Semaphore)

  • 뮤텍스와 유사하지만 여러 스레드가 동시에 자원을 접근할 수 있도록 허용하는 차이가 있음

  • 두 가지 주요 작업을 제공
    • P(Wait) 연산: 세마포어 값을 감소시키고, 세마포어 값이 0보다 작으면 스레드가 대기 상태에 들어감
    • V(Signal) 연산: 세마포어 값을 증가시키고, 대기 중인 스레드를 깨움
  • 세마포어는 내부적으로 카운터를 유지하여 현재 접근 가능한 자원의 수를 나타냄

 

 

다음은 목적은 다르지만 유사한 Java의 WeakReference 코드 예시

import java.util.concurrent.Semaphore;

public class CounterTest {
    private static int counter = 0;
    // 세마포어 초기화, 허용된 동시 접근 수는 1
    private static Semaphore semaphore = new Semaphore(1);

    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[10];

        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    try {
                        semaphore.acquire(); // 세마포어 P 연산 (잠금)
                        counter++;
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        semaphore.release(); // 세마포어 V 연산 (해제)
                    }
                }
            });
            threads[i].start();
        }

        for (int i = 0; i < 10; i++) {
            threads[i].join();
        }

        System.out.println("카운터 값: " + counter);
    }
}
  • 객체에 새로운 참조가 생길 때마다 카운터를 증가시키고, 참조가 사라질 때마다 카운터를 감소시켜서 카운터가 0이 되는 순간 Garbage Collect 대상이 되게끔 해줌
  • 이는 세마포어가 1개이기 때문에 뮤텍스와 동일하게 사용됨

 

세마포어로 클라이언트 3개를 연결 처리를 관리하는 예제 코드

import java.util.concurrent.Semaphore;

public class NetworkServer {
    private static final int MAX_CONNECTIONS = 3;
    // 최대 3개의 클라이언트 연결을 허용합니다!
    private static Semaphore semaphore = new Semaphore(MAX_CONNECTIONS);

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 10; i++) {
            new Thread(new Client()).start();
        }
    }

    static class Client implements Runnable {
        @Override
        public void run() {
            try {
                System.out.println(Thread.currentThread().getName() + " 접속 시도!");
                // 4번째 스레드부터는 세마포어 카운터가 0이므로
                // semaphore.acquire()에서 대기 상태에 들어갑니다.
                semaphore.acquire(); 
                System.out.println(Thread.currentThread().getName() + " 접속 완료!");
                // 로직 실행
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println(Thread.currentThread().getName() + " 접속 종료 시도!");
                // 자원이 해제되어 세마포어 카운터가 1 이상이 되면
                // 대기 중인 스레드 중 하나가 깨어나서 자원을 사용할 수 있어요!
                semaphore.release();
                System.out.println(Thread.currentThread().getName() + " 접속 종료!");
            }
        }
    }
}
  • 하지만, 세마포어 역시 잘못 사용하면 데드락을 발생시킬 수 있음
  • 데드락을 발생시키지 않기 위한 원칙
    • 호출은 항상 쌍으로 한다.
      • lock이 있으면 unlock을 호출
      • acquire가 있으면 release를 호출
    • 자원 획득 순서 통일
      • A 클라이언트는 a > b > c 순서, B 클라이언트는 c > b > a 순서대로 획득하려고 하면 데드락이 발생하기 쉬움

 

 

lock-free 알고리즘

  • 자원을 잠그지 않고 동기화하는 방법
  • 다른 스레드에 의해 중단되지 않고, 최소한 하나의 스레드가 계속해서 진행할 수 있는 알고리즘
  • 근데 이제 락을 안 곁들인
  • 락을 쓰지 않기 때문에 데드락과 같은 문제를 방지할 수 있음
  • lock-free 알고리즘은 CPU가 원자적인 연산을 지원해야됨 (현대의 CPU들은 거의 가능)

 

원자적 연산의 예시

import java.util.concurrent.atomic.AtomicInteger;

public class LockFreeCounter {
    private AtomicInteger counter = new AtomicInteger(0);

    public void increment() {
        int oldValue, newValue;
        do {
            oldValue = counter.get();
            newValue = oldValue + 1;
        } while (!counter.compareAndSet(oldValue, newValue));
    }

    public int getCounter() {
        return counter.get();
    }

    public static void main(String[] args) throws InterruptedException {
        LockFreeCounter lockFreeCounter = new LockFreeCounter();
        Thread[] threads = new Thread[10];

        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(() -> {
                for (int j = 0; j < 1000; j++) {
                    lockFreeCounter.increment();
                }
            });
            threads[i].start();
        }

        for (int i = 0; i < 10; i++) {
            threads[i].join();
        }

        System.out.println("카운터 값: " + lockFreeCounter.getCounter());
    }
}
  • Java에서 AtomicInteger 클래스는 원자적 연산을 제공
  • 이를 통해 락을 쓰지 않고 동시성 문제를 해결
  • while의 조건문인 !counter.compareAndSet(oldValue, newValue)가 원자적 연산의 종류중 하나인 CAS(Compare-And-Swap)의 개념을 구현한 메소드를 사용하여 원자적으로 counter값을 증가시킴

 

CAS 연산의 원리

function compareAndSwap(m, a, b) {
    if(m === a){
    	m = b;
	return true;
    } else {
	return false;
    }
}

 


CAS 연산이 발생할 수 있는 문제

  1. 스레드 1이 처음에 0x01을 읽고 스택에서 값을 POP 하기 위해 준비
  2. 그 사이에, 스레드 2가 0x01과 0x02를 POP하여 스택을 변경
  3. 스레드 3이 0x01을 재사용하여 새로운 값을 스택에 PUSH
  4. 이 사실을 모르는 스레드 1은 CAS를 수행하여, 0x01이 여전히 유효한 값이라고 잘못 판단하고 바꾸고 싶은 값인 0x02로 바꿈
  5. 이제, 스택의 TOP 원소는 0x02지만 스레드 2가 POP하여 메모리 해제가 된 공간이라 오류 발생 (C++로 치면 댕글링 포인터)