위상 정렬 (Topological Sort)
위상 정렬(topological sort)은 DAG(Directed Acyclic ZGraph, 방향 비순환 그래프)에서 정점들을 선행 조건(간선 방향)을 지키며 나열하는 정렬 방식으로 ‘의존 관계를 지키는 실행 순서를 만드는 알고리즘’이다. 위상 정렬은 방향을 따라 나열하는 것이 아닌, 방향을 거스르지 않고 나열하는 것이기 때문에 여러가지 결과가 나올 수있으며, 작업 스케줄링, 컴파일 순서, 렌더링 패스 실행 순서 등 다양한 의존성 문제가 있는 곳에서 활용된다.

- 방향 그래프의 차수
- 진입 차수 (In-degree) : 외부에서 오는 간선의 수
- 진출 자수 (Out-degree) : 외부로 향하는 간선의 수
- 차수 (degree) : 진입 차수 + 진출 차수
위상 정렬 알고리즘 (Topological Sort Algorithm)
1. Kahn’s Algorithm (칸 알고리즘, 진입 차수 기반 방법)
칸 알고리즘은 진입 차수(in-degree)가 0인 정점을 큐에 넣고, 큐에서 정점을 하나씩 꺼내 해당 정점에서 나가는 간선을 제거하는 방식이다. 간선을 제거하면 해당 간선과 연결된 정점의 진입 차수은 줄어들게 되고 진입 차수가 0이 되면 큐에 추가된다. 이 과정을 반복하면 의존성을 해치지 않고 정점을 차례대로 처리할 수 있다.
- 사이클 검출 방식 : 큐가 비었는데 아직 처리되지 않은 노드들이 남아있다면 사이클이 존재하는 것이다. 칸 알고리즘은 전체 노드를 탐색해봐야 사이클 여부를 발견할 수 있다.
- 병렬 실행 적합 여부 : 진입차수 0 노드 집합 = 동시에 실행 가능한 Task이므로, 병렬 실행에 적합하다.
Input:
Graph G(V, E) // V는 노드 집합, E는 간선 집합
Output:
List result // 위상 정렬된 노드 순서
Algorithm Kahn_TopologicalSort(G):
1. result ← 빈 리스트
2. indegree[노드] ← 각 노드의 진입 차수 계산
3. Queue ← 빈 큐
4. for 모든 노드 v in V:
if indegree[v] == 0:
Queue.push(v) // 진입 차수가 0인 노드를 큐에 삽입
5. while Queue가 비어있지 않다면:
v ← Queue.pop() // 큐에서 노드를 꺼냄
result.append(v) // 결과 리스트에 추가
for v에서 나가는 모든 간선 (v → u)에 대해:
indegree[u] -= 1 // 연결된 노드의 진입 차수 감소
if indegree[u] == 0:
Queue.push(u) // 새로 0이 된 노드를 큐에 삽입
6. if result에 포함된 노드 수 < V의 전체 노드 수:
print("사이클 존재! 위상 정렬 불가능")
else:
return result
2. DFS 기반 위상 정렬
그래프를 DFS(깊이 우선 탐색)한 결과를 역순으로 꺼내 위상 정렬 결과를 얻는 방법이다. DFS는 모든 자식 노드를 먼저 방문 후 자신을 스택에 넣으므로, 최종적으로 스택을 뒤집으면 위상 정렬의 순서가 된다.
- 사이클 검출 방식 : 정점의 후속 노드를 탐색할 때 자신이 다시 나온다면 사이클이 발생한 것으로 알고리즘 진행 중에 사이클을 발견할 수 있다.
- 병렬 실행 적합 여부 : 부적합 (하려고 하면 할 수 있겠지만 구현이 복잡하고 굳이? 이다)
Input:
Graph G(V, E) // V는 노드 집합, E는 간선 집합
Output:
List result // 위상 정렬된 노드 순서
Algorithm DFS_TopologicalSort(G):
1. visited[노드] ← 0 // 노드 방문 여부를 저장하는 배열
2. result ← 빈 리스트
3. function DFS(node):
// 1) node가 방문중인 노드라면 사이클 발생
if visited[node] == 1:
print("사이클 발견! 위상 정렬 불가")
return false
// 2) node가 이미 방문을 마친 노드라면 후속노드 처리 x
if visited[node] == 2:
return true
// 3) node가 아직 미방문 노드라면 후속 노드 탐색 시작
visited[node] = 1
for 모든 후속 노드 next in adj[node]:
if DFS(next) == false: // 사이클 검출시 종료
return false
visited[node] = 2 // 탐색 완료
result.append(node) // 결과 리스트에 추가
return true
4. for 모든 노드 v in V: // 비연결그래프 처리
if visited[v] == 0:
if DFS(v) == false:
return "사이클 존재"
5. result.reverse() // DFS 결과를 역순으로 뒤집어 위상 정렬 결과 완성
6. return result
Task Graph
Task Graph는 여러개의 작업(Task)들을 노드(Node)로 표현하고, 작업간의 의존 관계를 간선으로 연결한 DAG(방향 비순환 그래프) 형태의 구조이다. Task Graph는 위상 정렬을 통해 의존 관계를 해치지 않고 작업을 순서대로 스케줄링 및 실행할 수 있다. 또한 의존성이 없는 Task들은 동시에 실행할 수 있으므로, 병렬 처리를 통해 전체 실행 속도를 향상시킬 수 있다. 여기서 의존성(dependency)이란 자신보다 먼저 실행되어야 하는 Task의 유무를 의미한다.

| 요소 | 설명 |
| Graph 구조 | DAG (Directed Acyclic Graph) — 방향성이 있고 사이클이 없는 구조 |
| Node (태스크 노드) | 실행할 함수 객체(std::function) 나 작업 단위(Job) 를 담고 있는 실행 단위 |
| Edge (의존 간선) | 한 Task가 끝난 후 실행되어야 하는 선후 관계(Dependency) |
| Run 순서 결정 | 위상 정렬(Topological Sort)을 통해 의존 순서 보장 실행 |
| 병렬 표현 가능 | 독립된 노드(진입 차수 0)는 동시에 실행 가능 → fork/join 구조 표현 가능 |
실습 : 위상 정렬을 통한 Task 스케줄링 병렬 처리(멀티 스레드)
여러 Task들을 위상 정렬하여 의존성을 해치지 않으며 스케줄링 및 실행할 때, 의존성이 없는 Task들을 동시에 실행해서 실행 속도를 높일 수 있다. 의존 관계(간선)을 추가할 때 사이클이 발생하지 않는지 DFS로 사이클을 검출하여 DAG 형태의 Task Graph를 생성하고, 칸 알고리즘을 활용하여 진입 차수가 0인 Task들을 병렬 실행하면 된다.
사이클 검사 - DFS
위상 정렬을 위해서는 그래프가 DAG구조로 사이클이 없어야지만 스케줄링이 가능하다. 때문에 그래프에 의존성을 추가할 때 그래프에 사이클이 발생하는지 검사를 해야하며, 보통 DFS를 통해 사이클을 검출한다. 칸 위상 정렬시에는 모든 Task를 탐색하고 큐비 빈 시점에 처리하지 않은 Task가 있다는 걸 알 수 있지만, DFS는 탐색중 Task의 후속노드를 따라갔을때 자신이 다시 나온다면 바로 사이클이 있다는 걸 검출할 수 있기 때문에 DFS로 사이클 여부를 검사한다.

위 사진에서 Task2~Task6에는 사이클이 존재하는데 의존 관계 간선 (2,6)을 추가할때 DFS검사를 하면 2를 따라갔을때 다시 2가 나오므로 사이클을 바로 검출할 수 있다.
병렬 처리 방식
위상 정렬을 칸 알고리즘 방식으로 하면 큐에 진입 차수가 0인 Task를 추가한다. 이 Task들은 의존성이 없어 동시에 처리가 가능하지만 단일 스레드로 처리한다면 결국 순서대로 하나씩 처리해야한다. 이때 멀티 스레드를 사용해서 큐에 추가된 Task들을 스레드들이 동시에 처리한다면 실행 속도가 빨라질 것이다. 이때 중요한 것이 큐 버퍼와 Task의 차수 동기화 작업이다.
- 큐 버퍼
- 진입차수 0인 Task를 담는 큐
- 여러 스레드가 동시에 큐에서 Task를 꺼내거나 새 Task를 넣을 수 있음
- 따라서 큐 접근 시 임계 구역/락 필요
- Task의 진입 차수
- Task 실행 완료 후, 후속 Task의 진입 차수를 감소시킴
- 여러 선행 Task가 동시에 후속 Task로 간선을 제거하려고 할 수 있음
- 따라서 진입차수 감소 연산과 0 체크는 원자적(atomic) 연산 또는 락 필요
- 처리되어야 할 남은 Task 개수
- 워커 스레드들의 종료 플래그로 사용될 변수로 0이 되면 작업 완료로 인식하고 스레드 종료
- 그래프에 있는 전체 Task개수로 초기화하며, 워커 스레드가 Task를 처리할때마다 하나씩 감소
- 여러 스레드가 접근하여 감소시키기 때문에 원자적 연산 (atomic) 또는 락 필요
코드
#include <windows.h>
#include <process.h>
#include <thread>
#include <iostream>
#include <vector>
#include <functional>
#include <queue>
using namespace std;
/*
[위상 정렬, 병렬 처리를 활용한 Task 스케줄링]
TaskGrap(DAG)
- 의존관계(간선)을 추가할때마다 사이클이 발생하지 않는지 검사하여
DAG 구조의 TaskGrap를 생성합니다.
위상 정렬
- 칸 알고리즘으로 진입차수가 0인 Task를 차례대로 처리하여
의존성을 해치지 않으면서 순서대로 작업을 처리합니다.
병렬 처리
- Critical Section, Manual-reset Event, Interlocked, volatile을 이용하여
스레드 간 동기화를 수행합니다.
- 여러 스레드가 동시에 큐에서 Task를 꺼내 병렬로 처리합니다.
1) Critical Section
- 큐 버퍼에 Task를 추가하거나 제거할 때 단일 스레드만 접근하도록 보호합니다.
2) Event (Manual-reset, 수동 리셋)
- 큐가 비어있을 경우 모든 스레드가 WaitForSingleObject()로 대기하도록 합니다.
- 새로운 Task가 큐에 추가되면 SetEvent()를 호출해 스레드를 깨웁니다.
- 큐가 다시 비게 되면 ResetEvent()를 호출해 스레드들을 다시 대기 상태로 만듭니다.
3) Interlocked
- inDegree 감소나 남은 Task 수(tasksRemaining) 감소 등의 연산을
원자적으로 수행하여 경쟁 상태(Race Condition)를 방지합니다.
4) volatile
- 컴파일러의 최적화로 인한 값 캐싱을 방지하여,
다른 스레드가 갱신한 변수를 항상 메모리에서 읽도록 보장합니다.
*/
/* ------------ Synchronization Tools---------------*/
CRITICAL_SECTION cs; // Critical section
HANDLE taskEvent; // Event(수동)
/* ------------------ Task Graph -------------------*/
struct TaskNode
{
function<void()> task; // 처리할 Task
vector<int> next; // 인접 리스트
volatile LONG inDegree; // 진입 차수(의존 수)
};
struct TaskGraph
{
vector<TaskNode> nodes; // Task Node
queue<int> ready_queue; // 의존이 제거되어 실행이 가능한 Task
volatile LONG tasksRemaining; // 스레드 종료를 위한 처리되어야할 남은 Task 개수
// 노드 추가
int AddTask(function<void()> fn)
{
nodes.push_back(TaskNode{ std::move(fn), {}, 0 });
return (int)nodes.size() - 1;
}
// 간선(의존성) 추가
void AddDependency(int u, int v)
{
nodes[u].next.push_back(v);
nodes[v].inDegree++;
// 사이클 발생 검사
vector<int> visited(nodes.size(), 0);
for (int i = 0; i < (int)nodes.size(); ++i)
{
if (visited[i] == 0)
{
if (HasCycleDFS(i, nodes, visited))
{
std::cout << "사이클이 존재합니다. 작업 그래프를 다시 설정하세요.\n";
return;
}
}
}
}
// Cycle Cheak (DFS)
bool HasCycleDFS(int node, vector<TaskNode>& nodes, vector<int>& visited)
{
visited[node] = 1; // 현재 경로에 있음
for (int next : nodes[node].next)
{
// 1) node가 방문중인 노드라면 사이클 발생
if (visited[next] == 1)
return true;
// 2) node가 아직 미방문 노드라면 후속 노드 탐색 시작
if (visited[next] == 0 && HasCycleDFS(next, nodes, visited))
return true;
}
visited[node] = 2; // 탐색 완료
return false;
}
// 단일 스레드 스케줄링
void SigleThreadScheduling()
{
// 초기 task 추가
for (int i = 0; i < nodes.size(); ++i)
if (nodes[i].inDegree == 0) ready_queue.push(i);
while (!ready_queue.empty())
{
// 큐에서 task 추출, 실행
int nodeIndex = ready_queue.front(); ready_queue.pop();
nodes[nodeIndex].task();
// 후속노드 진입차수 감소
for (int next : nodes[nodeIndex].next)
{
nodes[next].inDegree--;
// 만약 진입차수가 0이라면 큐에 추가
if (nodes[next].inDegree == 0)
ready_queue.push(next);
}
}
}
// 멀티 스레드 스케줄링을 위한 초기화
void MultiThreadSchedulingInit()
{
// 처리해야할 task 개수
tasksRemaining = nodes.size();
// 초기 task 추가
for (int i = 0; i < nodes.size(); ++i)
{
if (nodes[i].inDegree == 0) ready_queue.push(i);
SetEvent(taskEvent);
}
}
};
/* ------------ Thread ---------------*/
// Thread data
struct ThreadData
{
int threadId;
TaskGraph* taskGraph;
};
// Thread가 실행할 스레드 함수
// 큐 버퍼에서 Task를 꺼내와 처리
static unsigned __stdcall TaskProcessing(void* param)
{
ThreadData* threadData = static_cast<ThreadData*>(param);
TaskGraph* taskGraph = threadData->taskGraph;
std::cout << threadData->threadId << "번 스레드 시작\n";
while (true)
{
// [종료] task가 모두 처리되어있다면 다른 스레드를 깨우고 종료
if (InterlockedCompareExchange(&taskGraph->tasksRemaining, 0, 0) == 0)
{
SetEvent(taskEvent);
break;
}
// [대기] 큐 데이터 생산 이벤트가 발생할때까지 대기
WaitForSingleObject(taskEvent, INFINITE);
// [소비]
// 큐가 비어있다면 다른 스레드들을 대기시키고 continue
// 큐에 데이터가 있다면 Task를 추출
EnterCriticalSection(&cs);
if (taskGraph->ready_queue.empty()) {
ResetEvent(taskEvent);
LeaveCriticalSection(&cs);
continue;
}
int nodeIndex = taskGraph->ready_queue.front();
taskGraph->ready_queue.pop();
LeaveCriticalSection(&cs);
// Task 실행
// Interlocked 함수를 통해 원자적 연산으로 남은 task 개수 감소
taskGraph->nodes[nodeIndex].task();
InterlockedDecrement(&taskGraph->tasksRemaining);
// 후속 노드 처리
for (int nextIndex : taskGraph->nodes[nodeIndex].next) {
// Interlocked 함수를 통해 원자적 연산으로 후속 노드의 진입 차수 감소
LONG newInDegree = InterlockedDecrement(&taskGraph->nodes[nextIndex].inDegree);
// [생산]
// 의존성이 없는 task 큐에 추가
if (newInDegree == 0)
{
EnterCriticalSection(&cs);
taskGraph->ready_queue.push(nextIndex);
SetEvent(taskEvent);
LeaveCriticalSection(&cs);
}
}
}
std::cout << threadData->threadId << "번 스레드 종료\n";
return 0;
}
/* -------------------------------------------------------------------------------*/
void SingleRun(TaskGraph* taskGraph)
{
cout << "5개 작업을 하나의 스레드로 위상 정렬하여 처리합니다." << endl;
taskGraph->SigleThreadScheduling();
}
int MultiRun(TaskGraph* taskGraph)
{
cout << "5개 작업을 3개의 스레드로 위상 정렬하여 처리합니다" << endl;
// critical section
InitializeCriticalSection(&cs);
// event(수동)
taskEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if (!taskEvent) {
cout << "이벤트 생성 실패";
return -1;
}
// worker thread create
ThreadData threadDatas[3] = {
{1, taskGraph}, {2, taskGraph}, {3, taskGraph}
};
HANDLE hThreads[3];
for (int i = 0; i < 3; i++)
{
hThreads[i] = reinterpret_cast<HANDLE>(_beginthreadex(
NULL,
0,
TaskProcessing, // 스레드 함수
&threadDatas[i], // 스레드 매개변수
0,
NULL
));
if (hThreads[i] == NULL) {
std::cout << "스레드 " << i << " 생성 실패" << endl;
return -1;
}
}
// 초기 큐 init
taskGraph->MultiThreadSchedulingInit();
// 모든 스레드가 종료될때까지 대기
WaitForMultipleObjects(3, hThreads, TRUE, INFINITE);
std::cout << "모든 Task 처리 완료" << endl;
// 스레드, 동기화 도구 정리
for (int i = 0; i < 3; i++) CloseHandle(hThreads[i]);
DeleteCriticalSection(&cs);
CloseHandle(taskEvent);
}
int main()
{
// task graph
TaskGraph taskGraph;
// add tase(node)
int t1 = taskGraph.AddTask([]() { Sleep(1000); std::cout << "Task1 처리 완료\n"; });
int t2 = taskGraph.AddTask([]() { Sleep(1000); std::cout << "Task2 처리 완료\n"; });
int t3 = taskGraph.AddTask([]() { Sleep(1000); std::cout << "Task3 처리 완료\n"; });
int t4 = taskGraph.AddTask([]() { Sleep(1000); std::cout << "Task4 처리 완료\n"; });
int t5 = taskGraph.AddTask([]() { Sleep(1000); std::cout << "Task5 처리 완료\n"; });
/* add depends(egde)
가능한 위상정렬 순서
t1, t2, t3 → t4 → t5
*/
taskGraph.AddDependency(t1, t4);
taskGraph.AddDependency(t2, t4);
taskGraph.AddDependency(t3, t4);
taskGraph.AddDependency(t4, t5);
// 1. 단일 스레드 작업 스케줄링
//SingleRun(&taskGraph);
// 2. 멀티 스레드 작업 스케줄링
if (MultiRun(&taskGraph) == -1) return -1;;
return 0;
}
멀티스레드를 처음으로 활용한 예제라
동기화 방식을 올바르게 사용한 것인지는 잘 모르겠다..!
혹시 이 글을 읽는 사람 중 문제를 발견하거나
개선 사항이 보인다면 댓글을 남겨주시길...
'CS > 자료구조, 알고리즘' 카테고리의 다른 글
| [알고리즘] 정렬 알고리즘 1. 버블정렬, 선택정렬, 삽입정렬 (1) | 2025.10.31 |
|---|---|
| [알고리즘] A* 최단 경로 알고리즘 (0) | 2025.10.31 |
| [알고리즘] 다익스트라(Dijkstra) 최단 경로 알고리즘 (0) | 2025.10.20 |
| [자료구조] 그래프(Graph) (0) | 2025.10.15 |
| [알고리즘] 슬라이딩 윈도우(Sliding Window) 알고리즘과 단조 큐(Monotonic Queue) (0) | 2025.09.19 |