컴퓨터과학

cpu바운드 프로그램 멀티쓰레딩에 최적의 스레드 개수 결정 방법 및 멀티쓰레딩 개발에 주의할점

디찌s 2024. 5. 22. 14:01
728x90
반응형

 

 

 

기본 원칙

  • 최적의 스레드 수 = CPU 코어 수: 일반적으로, CPU 바운드 작업에서는 CPU 코어 수만큼의 스레드를 사용하는 것이 가장 효율적입니다. 이는 각 스레드가 동시에 실행될 수 있어 최대의 병렬성을 달성할 수 있기 때문입니다.
  • 과도한 스레드 사용 피하기: 스레드 수가 CPU 코어 수를 초과하면, 스레드 간의 문맥 전환 오버헤드가 발생하여 성능 저하를 초래할 수 있습니다.

쓰레드 갯수 선택 이유

  1. 병렬 처리 최적화: 각 스레드가 하나의 CPU 코어에서 실행될 수 있어 최대한의 병렬 처리가 가능해집니다.
  2. 컨텍스트 스위칭 감소: 불필요한 스레드 간의 컨텍스트 스위칭을 최소화하여 CPU 오버헤드를 줄입니다.

예제 코드

아래 예제는 CPU 바운드 작업을 수행하는 파이썬 프로그램입니다. 이 프로그램은 다중 스레드를 사용하여 계산 집약적인 작업을 수행합니다.

 

import threading
import time
import multiprocessing

# CPU 바운드 작업 예제: 큰 수의 팩토리얼 계산
def cpu_bound_task(n):
    result = 1
    for i in range(2, n + 1):
        result *= i
    return result

def worker():
    print(f"Thread {threading.current_thread().name} starting computation.")
    cpu_bound_task(50000)  # 예제: 큰 수의 팩토리얼 계산
    print(f"Thread {threading.current_thread().name} finished computation.")

if __name__ == "__main__":
    num_cores = multiprocessing.cpu_count()  # 시스템의 CPU 코어 수 확인
    print(f"Number of CPU cores available: {num_cores}")

    # 최적의 스레드 수 설정: CPU 코어 수와 동일하게 설정
    threads = []
    for i in range(num_cores):
        thread = threading.Thread(target=worker, name=f"Thread-{i+1}")
        threads.append(thread)
        thread.start()

    # 모든 스레드가 완료될 때까지 기다림
    for thread in threads:
        thread.join()

    print("All threads have finished execution.")

 

 

CPU 바운드 프로그램에서는 CPU 코어 수만큼의 스레드를 사용하는 것이 최적의 성능을 발휘합니다. 이는 각 스레드가 동시에 실행될 수 있는 환경을 제공하여 최대의 병렬성을 달성하고, 불필요한 컨텍스트 스위칭을 피할 수 있기 때문입니다. 위 예제 코드는 이러한 원칙을 기반으로 작성되었습니다.

 

 

멀티쓰레딩 프로그램 개발에 따른 주의점 정리

 

동기화 이슈 (Synchronization Issues)

레이스 컨디션 (Race Condition)

  • 문제점: 두 개 이상의 스레드가 동시에 동일한 자원에 접근하고 수정할 때 발생할 수 있습니다. 결과는 스레드 실행 순서에 따라 달라질 수 있습니다.
  • 해결책: 락, 뮤텍스, 세마포어와 같은 동기화 메커니즘을 사용하여 자원에 대한 동시 접근을 제어합니다.

예제

 

에러 상황:

 

여러 스레드가 동시에 counter를 증가시키려 하면, 각 스레드의 작업이 중첩되어 예상치 못한 최종 결과가 나올 수 있습니다. synchronized 블록 또는 ReentrantLock을 사용하지 않으면 이러한 문제가 발생할 수 있습니다.

 

보완 사항:

 

락을 사용하여 counter 접근을 동기화합니다.

 

에러가 발생할수 있는 코드:

public class RaceConditionExample {
    private static int counter = 0;

    public static void incrementCounter() {
        for (int i = 0; i < 1000000; i++) {
            counter++;
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(RaceConditionExample::incrementCounter);
            threads[i].start();
        }

        for (Thread thread : threads) {
            thread.join();
        }

        System.out.println("Final counter value: " + counter);
    }
}

 

에러를 보완한 코드

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class RaceConditionExample {
    private static int counter = 0;
    private static Lock lock = new ReentrantLock();

    public static void incrementCounter() {
        for (int i = 0; i < 1000000; i++) {
            lock.lock();
            try {
                counter++;
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(RaceConditionExample::incrementCounter);
            threads[i].start();
        }

        for (Thread thread : threads) {
            thread.join();
        }

        System.out.println("Final counter value: " + counter);
    }
}

 

데드락 (Deadlock)

  • 문제점: 두 개 이상의 스레드가 서로가 소유한 락을 기다릴 때 발생하여 영원히 대기 상태에 빠집니다.
  • 해결책: 락 획득 순서를 고정하거나, 타임아웃을 설정하여 데드락을 피합니다.

 

에러 상황:

 

task1과 task2가 서로가 소유한 락을 기다리면서 데드락이 발생할 수 있습니다. 예를 들어, task1이 lock1을 획득하고 lock2를 기다리는 동안, task2가 lock2를 획득하고 lock1을 기다리면 두 스레드가 영원히 대기 상태에 빠집니다.

 

보완 사항:

 

락을 획득하는 순서를 동일하게 하거나 타임아웃을 설정하여 데드락을 피합니다.

 

에러가 발생할수있는 코드

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class DeadlockExample {
    private static Lock lock1 = new ReentrantLock();
    private static Lock lock2 = new ReentrantLock();

    public static void task1() {
        lock1.lock();
        try {
            Thread.sleep(50); // Simulate some work
            lock2.lock();
            try {
                System.out.println("Task 1 is running");
            } finally {
                lock2.unlock();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock1.unlock();
        }
    }

    public static void task2() {
        lock2.lock();
        try {
            Thread.sleep(50); // Simulate some work
            lock1.lock();
            try {
                System.out.println("Task 2 is running");
            } finally {
                lock1.unlock();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock2.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(DeadlockExample::task1);
        Thread thread2 = new Thread(DeadlockExample::task2);

        thread1.start();
        thread2.start();

        thread1.join();
        thread2.join();
    }
}

에러를 보완한 코드

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.TimeUnit;

public class DeadlockExample {
    private static Lock lock1 = new ReentrantLock();
    private static Lock lock2 = new ReentrantLock();

    public static void task1() {
        try {
            if (lock1.tryLock(50, TimeUnit.MILLISECONDS)) {
                try {
                    Thread.sleep(50); // Simulate some work
                    if (lock2.tryLock(50, TimeUnit.MILLISECONDS)) {
                        try {
                            System.out.println("Task 1 is running");
                        } finally {
                            lock2.unlock();
                        }
                    }
                } finally {
                    lock1.unlock();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void task2() {
        try {
            if (lock2.tryLock(50, TimeUnit.MILLISECONDS)) {
                try {
                    Thread.sleep(50); // Simulate some work
                    if (lock1.tryLock(50, TimeUnit.MILLISECONDS)) {
                        try {
                            System.out.println("Task 2 is running");
                        } finally {
                            lock1.unlock();
                        }
                    }
                } finally {
                    lock2.unlock();
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(DeadlockExample::task1);
        Thread thread2 = new Thread(DeadlockExample::task2);

        thread1.start();
        thread2.start();

        thread1.join();
        thread2.join();
    }
}

 

리소스 관리 (Resource Management)

메모리 누수 (Memory Leak)

  • 문제점: 스레드가 종료되지 않고 계속해서 메모리를 점유하는 경우 발생할 수 있습니다.
  • 해결책: 스레드가 종료될 수 있도록 하고, 종료된 스레드를 적절히 정리합니다.

에러 상황:

 

스레드가 종료되지 않거나 적절히 정리되지 않으면 메모리 누수가 발생할 수 있습니다.

 

보완 사항:

 

스레드가 정상적으로 종료되도록 하고, 종료된 스레드를 적절히 정리합니다.

 

public class MemoryLeakExample {

    public static void worker() {
        // Simulate work
    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(MemoryLeakExample::worker);
            threads[i].start();
        }

        // If we forget to join the threads, they may still be running,
        // causing potential memory leaks.
    }
}

 

에러를 보완한 코드

public class MemoryLeakExample {

    public static void worker() {
        // Simulate work
    }

    public static void main(String[] args) throws InterruptedException {
        Thread[] threads = new Thread[10];
        for (int i = 0; i < 10; i++) {
            threads[i] = new Thread(MemoryLeakExample::worker);
            threads[i].start();
        }

        for (Thread thread : threads) {
            thread.join();
        }

        System.out.println("All threads have finished.");
    }
}

 

성능 이슈 (Performance Issues)

과도한 컨텍스트 스위칭 (Excessive Context Switching)

  • 문제점: 스레드 수가 너무 많으면 컨텍스트 스위칭 오버헤드가 발생하여 성능이 저하됩니다.
  • 해결책: CPU 코어 수에 맞는 적절한 스레드 수를 유지합니다.

스레드 풀 (Thread Pool) 사용

  • 문제점: 매번 새로운 스레드를 생성하고 종료하는 것은 비용이 많이 듭니다.
  • 해결책: 스레드 풀을 사용하여 스레드를 재사용합니다.

에러 상황:

 

스레드를 직접 관리하면 스레드 생성과 종료에 많은 오버헤드가 발생할 수 있습니다.

 

보완 사항:

 

스레드 풀을 사용하여 스레드를 재사용합니다.

 

에러가 발생할수 있는 코드

public class ThreadPoolExample {

    public static void worker() {
        System.out.println(Thread.currentThread().getName() + " is running");
    }

    public static void main(String[] args) {
        for (int i = 0; i < 20; i++) {
            new Thread(ThreadPoolExample::worker).start();
        }
    }
}

 

 

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ThreadPoolExample {

    public static void worker() {
        System.out.println(Thread.currentThread().getName() + " is running");
    }

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 20; i++) {
            executor.submit(ThreadPoolExample::worker);
        }

        executor.shutdown();
    }
}
728x90
반응형