Fork–Join Pattern #
Ada sebuah prinsip lama dalam komputasi: pekerjaan yang bisa dipecah menjadi bagian-bagian independen bisa diselesaikan lebih cepat jika bagian-bagian itu dikerjakan secara paralel. Sebuah array sejuta elemen tidak harus dijumlahkan satu per satu dari kiri ke kanan — ia bisa dipecah menjadi empat bagian, setiap bagian dijumlahkan di core yang berbeda, lalu keempat hasil dijumlahkan menjadi satu. Ini adalah inti dari Fork–Join Pattern: fork memecah pekerjaan besar menjadi sub-task yang lebih kecil dan menjalankannya paralel, join menunggu semua sub-task selesai dan menggabungkan hasilnya. Pola ini adalah tulang punggung dari algoritma divide and conquer konkuren — merge sort paralel, image processing per-region, map-reduce, word count di atas dataset besar — semua bermuara pada ide yang sama: pecah, paralel, gabung.
Apa itu Fork–Join Pattern? #
Fork–Join Pattern adalah pola concurrency yang memiliki dua fase yang selalu hadir bersama dan tidak bisa dipisahkan.
flowchart TD
Main([Task Utama]) -->|Fork| S1[Sub-task 1]
Main -->|Fork| S2[Sub-task 2]
Main -->|Fork| S3[Sub-task 3]
Main -->|Fork| S4[Sub-task 4]
S1 -->|hasil parsial| Join([Join & Agregasi])
S2 -->|hasil parsial| Join
S3 -->|hasil parsial| Join
S4 -->|hasil parsial| Join
Join --> Result([Hasil Akhir])| Fase | Yang Terjadi |
|---|---|
| Fork | Task induk memecah pekerjaan menjadi N sub-task dan meluncurkan semuanya secara paralel |
| Join | Task induk menunggu semua N sub-task selesai, lalu mengagregasi hasilnya |
Yang membedakan Fork–Join dari sekadar “jalankan banyak goroutine” adalah titik sinkronisasi yang eksplisit: ada satu momen yang jelas di mana semua pekerjaan paralel harus selesai sebelum eksekusi bisa berlanjut. Di Go, titik sinkronisasi ini diekspresikan dengan sync.WaitGroup.
Fork–Join cocok untuk masalah yang bersifat embarrassingly parallel — ketika sub-task bisa berjalan sepenuhnya independen tanpa perlu berkomunikasi satu sama lain selama eksekusi. Jika sub-task saling bergantung, kamu butuh pola lain seperti pipeline atau Producer-Consumer.
Implementasi Dasar: Parallel Sum #
Kasus paling mudah untuk memahami Fork–Join adalah menjumlahkan array besar secara paralel. Ini adalah contoh klasik yang langsung menunjukkan dua fase dengan jelas.
package main
import (
"fmt"
"sync"
)
// sumChunk menjumlahkan satu bagian dari slice dan mengirim hasilnya ke channel
func sumChunk(nums []int, resultCh chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
sum := 0
for _, n := range nums {
sum += n
}
resultCh <- sum
}
func parallelSum(numbers []int, numWorkers int) int {
chunkSize := (len(numbers) + numWorkers - 1) / numWorkers
resultCh := make(chan int, numWorkers) // buffered: satu slot per sub-task
var wg sync.WaitGroup
// FORK: pecah array dan jalankan setiap bagian di goroutine terpisah
for i := 0; i < len(numbers); i += chunkSize {
end := i + chunkSize
if end > len(numbers) {
end = len(numbers)
}
wg.Add(1)
go sumChunk(numbers[i:end], resultCh, &wg)
}
// Tutup channel setelah semua goroutine selesai
go func() {
wg.Wait() // JOIN: tunggu semua sub-task
close(resultCh)
}()
// Agregasi: kumpulkan semua hasil parsial
total := 0
for partial := range resultCh {
total += partial
}
return total
}
func main() {
numbers := make([]int, 1_000_000)
for i := range numbers {
numbers[i] = i + 1
}
result := parallelSum(numbers, 8) // gunakan 8 sub-task paralel
fmt.Printf("Total: %d\n", result)
}
Perlu diperhatikan bahwa resultCh dideklarasikan sebagai buffered channel dengan kapasitas sejumlah worker. Ini penting: tanpa buffer, goroutine sub-task akan blocking saat mencoba menulis ke channel, dan jika goroutine penutup (wg.Wait() + close()) belum berjalan, terjadi deadlock.
Fork–Join Rekursif #
Kekuatan sesungguhnya dari Fork–Join muncul ketika diterapkan secara rekursif — setiap sub-task bisa melakukan fork lagi ke sub-sub-task yang lebih kecil. Ini adalah struktur yang dipakai oleh merge sort, quick sort paralel, dan tree traversal konkuren.
package main
import (
"fmt"
"sync"
)
const threshold = 500 // di bawah ini, kerjakan secara sekuensial
// parallelMergeSort mengurutkan slice secara rekursif dengan Fork–Join
func parallelMergeSort(data []int, wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
}
n := len(data)
if n <= 1 {
return
}
// Jika ukuran kecil, selesaikan secara sekuensial (hindari overhead goroutine)
if n <= threshold {
sequentialSort(data)
return
}
mid := n / 2
left := make([]int, mid)
right := make([]int, n-mid)
copy(left, data[:mid])
copy(right, data[mid:])
var childWG sync.WaitGroup
childWG.Add(2)
// FORK: dua sub-task berjalan paralel
go parallelMergeSort(left, &childWG)
go parallelMergeSort(right, &childWG)
// JOIN: tunggu kedua sub-task selesai
childWG.Wait()
// Gabungkan hasil
merge(data, left, right)
}
func merge(dst, left, right []int) {
i, j, k := 0, 0, 0
for i < len(left) && j < len(right) {
if left[i] <= right[j] {
dst[k] = left[i]; i++
} else {
dst[k] = right[j]; j++
}
k++
}
for i < len(left) { dst[k] = left[i]; i++; k++ }
for j < len(right) { dst[k] = right[j]; j++; k++ }
}
func sequentialSort(data []int) {
// insertion sort sederhana untuk potongan kecil
for i := 1; i < len(data); i++ {
key := data[i]
j := i - 1
for j >= 0 && data[j] > key {
data[j+1] = data[j]
j--
}
data[j+1] = key
}
}
func main() {
data := []int{9, 3, 7, 1, 5, 8, 2, 6, 4, 0}
parallelMergeSort(data, nil)
fmt.Println("Sorted:", data)
}
Visualisasi rekursinya membentuk pohon biner:
flowchart TD
A["[9,3,7,1,5,8,2,6,4,0]"] -->|Fork| B["[9,3,7,1,5]"]
A -->|Fork| C["[8,2,6,4,0]"]
B -->|Fork| D["[9,3]"]
B -->|Fork| E["[7,1,5]"]
C -->|Fork| F["[8,2]"]
C -->|Fork| G["[6,4,0]"]
D -->|Join| B2["[3,9]"]
E -->|Join| B3["[1,5,7]"]
B2 -->|Merge| BJ["[1,3,5,7,9]"]
B3 -->|Merge| BJ
F -->|Join| C2["[2,8]"]
G -->|Join| C3["[0,4,6]"]
C2 -->|Merge| CJ["[0,2,4,6,8]"]
C3 -->|Merge| CJ
BJ -->|Merge| Result["[0,1,2,3,4,5,6,7,8,9]"]
CJ -->|Merge| ResultError Propagation dari Sub-Task #
Di implementasi dasar di atas, sub-task tidak bisa gagal. Di dunia nyata — misalnya saat memproses file, memanggil API, atau membaca database — sub-task bisa menghasilkan error. Error harus dipropagasi ke task induk dengan benar.
package main
import (
"fmt"
"sync"
)
type PartialResult struct {
Value int
Err error
}
// processChunk memproses satu bagian data dan bisa menghasilkan error
func processChunk(id int, data []int, resultCh chan<- PartialResult, wg *sync.WaitGroup) {
defer wg.Done()
// Simulasi error pada chunk tertentu
if id == 2 {
resultCh <- PartialResult{Err: fmt.Errorf("chunk %d gagal diproses: data korup", id)}
return
}
sum := 0
for _, v := range data {
sum += v
}
resultCh <- PartialResult{Value: sum}
}
func forkJoinWithErrors(data []int, numChunks int) (int, []error) {
chunkSize := (len(data) + numChunks - 1) / numChunks
resultCh := make(chan PartialResult, numChunks)
var wg sync.WaitGroup
// FORK
for i := 0; i < numChunks; i++ {
start := i * chunkSize
if start >= len(data) {
break
}
end := start + chunkSize
if end > len(data) {
end = len(data)
}
wg.Add(1)
go processChunk(i, data[start:end], resultCh, &wg)
}
// JOIN
go func() {
wg.Wait()
close(resultCh)
}()
// Agregasi hasil dan kumpulkan semua error
total := 0
var errs []error
for result := range resultCh {
if result.Err != nil {
errs = append(errs, result.Err)
continue
}
total += result.Value
}
return total, errs
}
func main() {
data := make([]int, 100)
for i := range data {
data[i] = i + 1
}
total, errs := forkJoinWithErrors(data, 5)
if len(errs) > 0 {
fmt.Println("Beberapa chunk gagal:")
for _, e := range errs {
fmt.Println(" -", e)
}
}
fmt.Printf("Total dari chunk yang berhasil: %d\n", total)
}
Jangan abaikan error dari sub-task. Pola Fork–Join yang hanya mengumpulkan “nilai sukses” dan membuang error diam-diam sangat berbahaya — hasil akhir akan salah dan tidak ada yang tahu mengapa. Selalu kumpulkan error dari semua sub-task dan laporkan ke caller, bahkan jika kamu memutuskan untuk tetap menggunakan hasil parsial dari sub-task yang berhasil.
Membatasi Concurrency dengan Semaphore #
Fork–Join tanpa batas bisa memunculkan jutaan goroutine jika data sangat besar. Untuk data processing berskala besar, kita perlu membatasi jumlah sub-task yang berjalan secara bersamaan menggunakan semaphore — sebuah buffered channel yang bertindak sebagai “izin” untuk berjalan.
package main
import (
"fmt"
"sync"
"time"
)
// forkJoinBounded menjalankan task paralel tapi membatasi concurrency
func forkJoinBounded(tasks []func() int, maxConcurrent int) []int {
results := make([]int, len(tasks))
sem := make(chan struct{}, maxConcurrent) // semaphore: maks N goroutine aktif
var wg sync.WaitGroup
var mu sync.Mutex
// FORK dengan batas concurrency
for i, task := range tasks {
wg.Add(1)
go func(idx int, t func() int) {
defer wg.Done()
sem <- struct{}{} // ambil slot — blocking jika penuh
defer func() { <-sem }() // lepas slot saat selesai
result := t()
mu.Lock()
results[idx] = result
mu.Unlock()
}(i, task)
}
// JOIN
wg.Wait()
return results
}
func main() {
tasks := make([]func() int, 20)
for i := range tasks {
id := i
tasks[i] = func() int {
time.Sleep(100 * time.Millisecond)
return id * id // kuadrat dari id
}
}
// Hanya 4 task yang berjalan secara bersamaan, meski ada 20 task
results := forkJoinBounded(tasks, 4)
fmt.Println("Hasil (20 task, maks 4 concurrent):", results[:5], "...")
}
Visualisasi bagaimana semaphore mengontrol concurrency:
flowchart LR
T1[Task 1] --> SEM{Semaphore\nmaks 4}
T2[Task 2] --> SEM
T3[Task 3] --> SEM
T4[Task 4] --> SEM
T5[Task 5] -.->|menunggu slot| SEM
T6[Task 6] -.->|menunggu slot| SEM
SEM -->|slot tersedia| W1[Worker 1]
SEM -->|slot tersedia| W2[Worker 2]
SEM -->|slot tersedia| W3[Worker 3]
SEM -->|slot tersedia| W4[Worker 4]Menentukan Granularity yang Tepat #
Salah satu keputusan terpenting dalam Fork–Join adalah seberapa kecil sub-task harus dipecah. Ini disebut granularity — dan ada dua ekstrem yang harus dihindari.
// ANTI-PATTERN: granularity terlalu halus — satu goroutine per elemen
// overhead goroutine (stack, scheduler) jauh lebih mahal dari komputasinya
func tooFineGrained(data []int) int {
resultCh := make(chan int, len(data))
var wg sync.WaitGroup
for _, v := range data { // 1 juta elemen = 1 juta goroutine ✗
wg.Add(1)
go func(n int) {
defer wg.Done()
resultCh <- n // overhead goroutine >> cost penjumlahan
}(v)
}
go func() { wg.Wait(); close(resultCh) }()
total := 0
for v := range resultCh { total += v }
return total
}
// ANTI-PATTERN: granularity terlalu kasar — hanya satu goroutine
// tidak ada paralelisme sama sekali
func tooCoarseGrained(data []int) int {
resultCh := make(chan int, 1)
go func() { // hanya 1 goroutine — tidak lebih baik dari sekuensial ✗
sum := 0
for _, v := range data { sum += v }
resultCh <- sum
}()
return <-resultCh
}
// BENAR: granularity sesuai — jumlah chunk = jumlah CPU core
func wellGrained(data []int) int {
numWorkers := runtime.NumCPU() // ✓ sesuaikan dengan jumlah core
chunkSize := (len(data) + numWorkers - 1) / numWorkers
resultCh := make(chan int, numWorkers)
var wg sync.WaitGroup
for i := 0; i < len(data); i += chunkSize {
end := i + chunkSize
if end > len(data) { end = len(data) }
wg.Add(1)
go func(chunk []int) {
defer wg.Done()
sum := 0
for _, v := range chunk { sum += v }
resultCh <- sum
}(data[i:end])
}
go func() { wg.Wait(); close(resultCh) }()
total := 0
for partial := range resultCh { total += partial }
return total
}
Panduan praktis untuk menentukan granularity:
| Faktor | Panduan |
|---|---|
| Jumlah goroutine | Mulai dari runtime.NumCPU(), ukur, lalu sesuaikan |
| Ukuran data | Semakin besar data, semakin banyak chunk bisa dibuat |
| Cost per item | Semakin mahal (I/O, komputasi berat), semakin kecil chunk yang optimal |
| Threshold rekursi | Tentukan batas di mana rekursi berhenti dan beralih ke sekuensial |
Fork–Join vs Pola Concurrency Lain #
flowchart TD
Q{Jenis masalah\nconcurrency?} --> Q1{Data bisa\ndipecah independen?}
Q1 -- Ya --> Q2{Butuh hasil\ndigabungkan?}
Q1 -- Tidak --> PC[Producer-Consumer\natau Pipeline]
Q2 -- Ya --> FJ[Fork–Join ✓]
Q2 -- Tidak --> FF[Fire-and-Forget\ngoroutine biasa]
FJ --> Q3{Perlu batasi\nconcurrency?}
Q3 -- Ya --> FJW[Fork–Join\n+ Semaphore]
Q3 -- Tidak --> FJP[Fork–Join\nmurni]| Aspek | Fork–Join | Worker Pool | Pipeline |
|---|---|---|---|
| Struktur | Pohon (pecah & gabung) | Pool flat | Rantai linear |
| Sinkronisasi | Di titik join yang eksplisit | Terus-menerus (channel) | Per stage |
| Ideal untuk | Divide & conquer, agregasi | Task queue, rate limiting | Transformasi data bertahap |
| Sub-task | Independen sepenuhnya | Independen | Bergantung pada stage sebelumnya |
Anti-Pattern yang Harus Dihindari #
// ✗ Fork tanpa join — goroutine leak, hasil tidak dikumpulkan
func noJoin(data []int) {
for _, chunk := range splitData(data) {
go processChunk(chunk) // ✗ tidak ada WaitGroup, tidak ada join
}
// fungsi kembali sebelum sub-task selesai — hasilnya hilang
}
// ✓ Selalu ada join yang eksplisit sebelum menggunakan hasil
func withJoin(data []int) int {
resultCh := make(chan int, len(data))
var wg sync.WaitGroup
for _, chunk := range splitData(data) {
wg.Add(1)
go func(c []int) {
defer wg.Done()
resultCh <- process(c) // ✓ hasil dikirim ke channel
}(chunk)
}
go func() { wg.Wait(); close(resultCh) }() // ✓ join eksplisit
total := 0
for r := range resultCh { total += r }
return total
}
// ✗ Shared mutable state tanpa sinkronisasi — data race
var sharedTotal int
func racyFork(data []int) {
var wg sync.WaitGroup
for _, chunk := range splitData(data) {
wg.Add(1)
go func(c []int) {
defer wg.Done()
for _, v := range c {
sharedTotal += v // ✗ race condition — multiple goroutine write bersamaan
}
}(chunk)
}
wg.Wait()
}
// ✓ Setiap sub-task menghasilkan hasil parsial, digabungkan di akhir
func safeFork(data []int) int {
resultCh := make(chan int, 8)
var wg sync.WaitGroup
for _, chunk := range splitData(data) {
wg.Add(1)
go func(c []int) {
defer wg.Done()
local := 0
for _, v := range c { local += v } // ✓ variabel lokal, tidak ada race
resultCh <- local
}(chunk)
}
go func() { wg.Wait(); close(resultCh) }()
total := 0
for r := range resultCh { total += r } // ✓ gabung di sini, bukan di goroutine
return total
}
// ✗ Tidak ada threshold untuk rekursi — goroutine untuk data ukuran 1
func noThreshold(data []int) int {
if len(data) <= 1 { // ✗ seharusnya ada threshold yang lebih besar
return data[0]
}
// ... fork rekursif
return 0
}
// ✓ Threshold mencegah overhead goroutine untuk data kecil
func withThreshold(data []int) int {
if len(data) <= 1000 { // ✓ kerjakan sekuensial untuk chunk kecil
sum := 0
for _, v := range data { sum += v }
return sum
}
// ... fork rekursif hanya untuk data besar
return 0
}
Checklist Review Fork–Join #
DESAIN TASK:
□ Sub-task benar-benar independen — tidak ada shared mutable state antar sub-task
□ Ukuran chunk dipilih berdasarkan jumlah core dan cost per item
□ Ada threshold untuk menghentikan rekursi dan beralih ke sekuensial
SINKRONISASI:
□ WaitGroup.Add() selalu dipanggil sebelum goroutine diluncurkan
□ WaitGroup.Done() selalu di-defer di awal goroutine
□ Channel hasil selalu buffered (kapasitas ≥ jumlah sub-task)
□ Channel ditutup setelah WaitGroup.Wait() selesai
ERROR HANDLING:
□ Error dari sub-task dikumpulkan, bukan dibuang diam-diam
□ Semua error dilaporkan ke caller setelah join
□ Program memutuskan dengan eksplisit: hentikan semua atau lanjut dengan partial
RESOURCE CONTROL:
□ Jumlah goroutine concurrent dibatasi (semaphore) untuk data berskala besar
□ Tidak ada goroutine yang bisa leak setelah join selesai
□ Memory allocation per sub-task diperhitungkan untuk dataset besar
PENGUJIAN:
□ Diuji dengan go test -race untuk mendeteksi data race
□ Hasil dibandingkan dengan versi sekuensial untuk memverifikasi kebenaran
□ Benchmark dilakukan untuk memastikan ada speedup nyata
Ringkasan #
- Fork–Join memiliki dua fase yang tidak terpisahkan — fork memecah pekerjaan dan meluncurkan sub-task paralel, join menunggu semua selesai dan mengagregasi hasilnya.
sync.WaitGroupadalah mekanisme join di Go —Add()sebelum goroutine diluncurkan,Done()di-defer di awal goroutine,Wait()memblokir sampai semua selesai.- Channel hasil harus buffered dengan kapasitas minimal sejumlah sub-task — channel unbuffered menyebabkan deadlock karena sub-task blocking saat join belum berjalan.
- Sub-task harus benar-benar independen — tidak boleh ada shared mutable state antar sub-task; setiap sub-task menghasilkan nilai parsial yang aman untuk digabungkan di titik join.
- Granularity menentukan efisiensi — terlalu halus (satu goroutine per elemen) mengakibatkan overhead scheduler yang mengalahkan manfaat paralel; mulai dari
runtime.NumCPU()chunk sebagai titik awal.- Threshold wajib ada di rekursi — hentikan fork dan beralih ke komputasi sekuensial ketika ukuran data di bawah ambang batas tertentu; overhead goroutine tidak sebanding untuk data kecil.
- Error dari sub-task harus dikumpulkan, bukan dibuang — gunakan struct result yang membawa nilai dan error sekaligus, agregasi keduanya di titik join.
- Semaphore membatasi concurrency untuk dataset besar — buffered channel sebagai semaphore mencegah ledakan goroutine saat data yang diproses sangat banyak.
- Verifikasi dengan
go test -race— masalah concurrency di Fork–Join sering tidak terdeteksi saat testing biasa dan hanya muncul di production; race detector adalah alat wajib.- Fork–Join adalah fondasi divide and conquer konkuren — merge sort paralel, parallel map-reduce, dan image processing per-region semua menggunakan struktur yang sama.