Fork–Join Pattern #

Ada sebuah prinsip lama dalam komputasi: pekerjaan yang bisa dipecah menjadi bagian-bagian independen bisa diselesaikan lebih cepat jika bagian-bagian itu dikerjakan secara paralel. Sebuah array sejuta elemen tidak harus dijumlahkan satu per satu dari kiri ke kanan — ia bisa dipecah menjadi empat bagian, setiap bagian dijumlahkan di core yang berbeda, lalu keempat hasil dijumlahkan menjadi satu. Ini adalah inti dari Fork–Join Pattern: fork memecah pekerjaan besar menjadi sub-task yang lebih kecil dan menjalankannya paralel, join menunggu semua sub-task selesai dan menggabungkan hasilnya. Pola ini adalah tulang punggung dari algoritma divide and conquer konkuren — merge sort paralel, image processing per-region, map-reduce, word count di atas dataset besar — semua bermuara pada ide yang sama: pecah, paralel, gabung.

Apa itu Fork–Join Pattern? #

Fork–Join Pattern adalah pola concurrency yang memiliki dua fase yang selalu hadir bersama dan tidak bisa dipisahkan.

flowchart TD
    Main([Task Utama]) -->|Fork| S1[Sub-task 1]
    Main -->|Fork| S2[Sub-task 2]
    Main -->|Fork| S3[Sub-task 3]
    Main -->|Fork| S4[Sub-task 4]
    S1 -->|hasil parsial| Join([Join & Agregasi])
    S2 -->|hasil parsial| Join
    S3 -->|hasil parsial| Join
    S4 -->|hasil parsial| Join
    Join --> Result([Hasil Akhir])
FaseYang Terjadi
ForkTask induk memecah pekerjaan menjadi N sub-task dan meluncurkan semuanya secara paralel
JoinTask induk menunggu semua N sub-task selesai, lalu mengagregasi hasilnya

Yang membedakan Fork–Join dari sekadar “jalankan banyak goroutine” adalah titik sinkronisasi yang eksplisit: ada satu momen yang jelas di mana semua pekerjaan paralel harus selesai sebelum eksekusi bisa berlanjut. Di Go, titik sinkronisasi ini diekspresikan dengan sync.WaitGroup.

Fork–Join cocok untuk masalah yang bersifat embarrassingly parallel — ketika sub-task bisa berjalan sepenuhnya independen tanpa perlu berkomunikasi satu sama lain selama eksekusi. Jika sub-task saling bergantung, kamu butuh pola lain seperti pipeline atau Producer-Consumer.


Implementasi Dasar: Parallel Sum #

Kasus paling mudah untuk memahami Fork–Join adalah menjumlahkan array besar secara paralel. Ini adalah contoh klasik yang langsung menunjukkan dua fase dengan jelas.

package main

import (
	"fmt"
	"sync"
)

// sumChunk menjumlahkan satu bagian dari slice dan mengirim hasilnya ke channel
func sumChunk(nums []int, resultCh chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	sum := 0
	for _, n := range nums {
		sum += n
	}
	resultCh <- sum
}

func parallelSum(numbers []int, numWorkers int) int {
	chunkSize := (len(numbers) + numWorkers - 1) / numWorkers
	resultCh := make(chan int, numWorkers) // buffered: satu slot per sub-task
	var wg sync.WaitGroup

	// FORK: pecah array dan jalankan setiap bagian di goroutine terpisah
	for i := 0; i < len(numbers); i += chunkSize {
		end := i + chunkSize
		if end > len(numbers) {
			end = len(numbers)
		}
		wg.Add(1)
		go sumChunk(numbers[i:end], resultCh, &wg)
	}

	// Tutup channel setelah semua goroutine selesai
	go func() {
		wg.Wait()  // JOIN: tunggu semua sub-task
		close(resultCh)
	}()

	// Agregasi: kumpulkan semua hasil parsial
	total := 0
	for partial := range resultCh {
		total += partial
	}
	return total
}

func main() {
	numbers := make([]int, 1_000_000)
	for i := range numbers {
		numbers[i] = i + 1
	}

	result := parallelSum(numbers, 8) // gunakan 8 sub-task paralel
	fmt.Printf("Total: %d\n", result)
}

Perlu diperhatikan bahwa resultCh dideklarasikan sebagai buffered channel dengan kapasitas sejumlah worker. Ini penting: tanpa buffer, goroutine sub-task akan blocking saat mencoba menulis ke channel, dan jika goroutine penutup (wg.Wait() + close()) belum berjalan, terjadi deadlock.


Fork–Join Rekursif #

Kekuatan sesungguhnya dari Fork–Join muncul ketika diterapkan secara rekursif — setiap sub-task bisa melakukan fork lagi ke sub-sub-task yang lebih kecil. Ini adalah struktur yang dipakai oleh merge sort, quick sort paralel, dan tree traversal konkuren.

package main

import (
	"fmt"
	"sync"
)

const threshold = 500 // di bawah ini, kerjakan secara sekuensial

// parallelMergeSort mengurutkan slice secara rekursif dengan Fork–Join
func parallelMergeSort(data []int, wg *sync.WaitGroup) {
	if wg != nil {
		defer wg.Done()
	}

	n := len(data)
	if n <= 1 {
		return
	}

	// Jika ukuran kecil, selesaikan secara sekuensial (hindari overhead goroutine)
	if n <= threshold {
		sequentialSort(data)
		return
	}

	mid := n / 2
	left := make([]int, mid)
	right := make([]int, n-mid)
	copy(left, data[:mid])
	copy(right, data[mid:])

	var childWG sync.WaitGroup
	childWG.Add(2)

	// FORK: dua sub-task berjalan paralel
	go parallelMergeSort(left, &childWG)
	go parallelMergeSort(right, &childWG)

	// JOIN: tunggu kedua sub-task selesai
	childWG.Wait()

	// Gabungkan hasil
	merge(data, left, right)
}

func merge(dst, left, right []int) {
	i, j, k := 0, 0, 0
	for i < len(left) && j < len(right) {
		if left[i] <= right[j] {
			dst[k] = left[i]; i++
		} else {
			dst[k] = right[j]; j++
		}
		k++
	}
	for i < len(left) { dst[k] = left[i]; i++; k++ }
	for j < len(right) { dst[k] = right[j]; j++; k++ }
}

func sequentialSort(data []int) {
	// insertion sort sederhana untuk potongan kecil
	for i := 1; i < len(data); i++ {
		key := data[i]
		j := i - 1
		for j >= 0 && data[j] > key {
			data[j+1] = data[j]
			j--
		}
		data[j+1] = key
	}
}

func main() {
	data := []int{9, 3, 7, 1, 5, 8, 2, 6, 4, 0}
	parallelMergeSort(data, nil)
	fmt.Println("Sorted:", data)
}

Visualisasi rekursinya membentuk pohon biner:

flowchart TD
    A["[9,3,7,1,5,8,2,6,4,0]"] -->|Fork| B["[9,3,7,1,5]"]
    A -->|Fork| C["[8,2,6,4,0]"]
    B -->|Fork| D["[9,3]"]
    B -->|Fork| E["[7,1,5]"]
    C -->|Fork| F["[8,2]"]
    C -->|Fork| G["[6,4,0]"]
    D -->|Join| B2["[3,9]"]
    E -->|Join| B3["[1,5,7]"]
    B2 -->|Merge| BJ["[1,3,5,7,9]"]
    B3 -->|Merge| BJ
    F -->|Join| C2["[2,8]"]
    G -->|Join| C3["[0,4,6]"]
    C2 -->|Merge| CJ["[0,2,4,6,8]"]
    C3 -->|Merge| CJ
    BJ -->|Merge| Result["[0,1,2,3,4,5,6,7,8,9]"]
    CJ -->|Merge| Result

Error Propagation dari Sub-Task #

Di implementasi dasar di atas, sub-task tidak bisa gagal. Di dunia nyata — misalnya saat memproses file, memanggil API, atau membaca database — sub-task bisa menghasilkan error. Error harus dipropagasi ke task induk dengan benar.

package main

import (
	"fmt"
	"sync"
)

type PartialResult struct {
	Value int
	Err   error
}

// processChunk memproses satu bagian data dan bisa menghasilkan error
func processChunk(id int, data []int, resultCh chan<- PartialResult, wg *sync.WaitGroup) {
	defer wg.Done()

	// Simulasi error pada chunk tertentu
	if id == 2 {
		resultCh <- PartialResult{Err: fmt.Errorf("chunk %d gagal diproses: data korup", id)}
		return
	}

	sum := 0
	for _, v := range data {
		sum += v
	}
	resultCh <- PartialResult{Value: sum}
}

func forkJoinWithErrors(data []int, numChunks int) (int, []error) {
	chunkSize := (len(data) + numChunks - 1) / numChunks
	resultCh := make(chan PartialResult, numChunks)
	var wg sync.WaitGroup

	// FORK
	for i := 0; i < numChunks; i++ {
		start := i * chunkSize
		if start >= len(data) {
			break
		}
		end := start + chunkSize
		if end > len(data) {
			end = len(data)
		}
		wg.Add(1)
		go processChunk(i, data[start:end], resultCh, &wg)
	}

	// JOIN
	go func() {
		wg.Wait()
		close(resultCh)
	}()

	// Agregasi hasil dan kumpulkan semua error
	total := 0
	var errs []error
	for result := range resultCh {
		if result.Err != nil {
			errs = append(errs, result.Err)
			continue
		}
		total += result.Value
	}

	return total, errs
}

func main() {
	data := make([]int, 100)
	for i := range data {
		data[i] = i + 1
	}

	total, errs := forkJoinWithErrors(data, 5)
	if len(errs) > 0 {
		fmt.Println("Beberapa chunk gagal:")
		for _, e := range errs {
			fmt.Println(" -", e)
		}
	}
	fmt.Printf("Total dari chunk yang berhasil: %d\n", total)
}
Jangan abaikan error dari sub-task. Pola Fork–Join yang hanya mengumpulkan “nilai sukses” dan membuang error diam-diam sangat berbahaya — hasil akhir akan salah dan tidak ada yang tahu mengapa. Selalu kumpulkan error dari semua sub-task dan laporkan ke caller, bahkan jika kamu memutuskan untuk tetap menggunakan hasil parsial dari sub-task yang berhasil.

Membatasi Concurrency dengan Semaphore #

Fork–Join tanpa batas bisa memunculkan jutaan goroutine jika data sangat besar. Untuk data processing berskala besar, kita perlu membatasi jumlah sub-task yang berjalan secara bersamaan menggunakan semaphore — sebuah buffered channel yang bertindak sebagai “izin” untuk berjalan.

package main

import (
	"fmt"
	"sync"
	"time"
)

// forkJoinBounded menjalankan task paralel tapi membatasi concurrency
func forkJoinBounded(tasks []func() int, maxConcurrent int) []int {
	results := make([]int, len(tasks))
	sem := make(chan struct{}, maxConcurrent) // semaphore: maks N goroutine aktif
	var wg sync.WaitGroup
	var mu sync.Mutex

	// FORK dengan batas concurrency
	for i, task := range tasks {
		wg.Add(1)
		go func(idx int, t func() int) {
			defer wg.Done()
			sem <- struct{}{}        // ambil slot — blocking jika penuh
			defer func() { <-sem }() // lepas slot saat selesai

			result := t()
			mu.Lock()
			results[idx] = result
			mu.Unlock()
		}(i, task)
	}

	// JOIN
	wg.Wait()
	return results
}

func main() {
	tasks := make([]func() int, 20)
	for i := range tasks {
		id := i
		tasks[i] = func() int {
			time.Sleep(100 * time.Millisecond)
			return id * id // kuadrat dari id
		}
	}

	// Hanya 4 task yang berjalan secara bersamaan, meski ada 20 task
	results := forkJoinBounded(tasks, 4)
	fmt.Println("Hasil (20 task, maks 4 concurrent):", results[:5], "...")
}

Visualisasi bagaimana semaphore mengontrol concurrency:

flowchart LR
    T1[Task 1] --> SEM{Semaphore\nmaks 4}
    T2[Task 2] --> SEM
    T3[Task 3] --> SEM
    T4[Task 4] --> SEM
    T5[Task 5] -.->|menunggu slot| SEM
    T6[Task 6] -.->|menunggu slot| SEM
    SEM -->|slot tersedia| W1[Worker 1]
    SEM -->|slot tersedia| W2[Worker 2]
    SEM -->|slot tersedia| W3[Worker 3]
    SEM -->|slot tersedia| W4[Worker 4]

Menentukan Granularity yang Tepat #

Salah satu keputusan terpenting dalam Fork–Join adalah seberapa kecil sub-task harus dipecah. Ini disebut granularity — dan ada dua ekstrem yang harus dihindari.

// ANTI-PATTERN: granularity terlalu halus — satu goroutine per elemen
// overhead goroutine (stack, scheduler) jauh lebih mahal dari komputasinya
func tooFineGrained(data []int) int {
	resultCh := make(chan int, len(data))
	var wg sync.WaitGroup
	for _, v := range data { // 1 juta elemen = 1 juta goroutine ✗
		wg.Add(1)
		go func(n int) {
			defer wg.Done()
			resultCh <- n // overhead goroutine >> cost penjumlahan
		}(v)
	}
	go func() { wg.Wait(); close(resultCh) }()
	total := 0
	for v := range resultCh { total += v }
	return total
}

// ANTI-PATTERN: granularity terlalu kasar — hanya satu goroutine
// tidak ada paralelisme sama sekali
func tooCoarseGrained(data []int) int {
	resultCh := make(chan int, 1)
	go func() { // hanya 1 goroutine — tidak lebih baik dari sekuensial ✗
		sum := 0
		for _, v := range data { sum += v }
		resultCh <- sum
	}()
	return <-resultCh
}

// BENAR: granularity sesuai — jumlah chunk = jumlah CPU core
func wellGrained(data []int) int {
	numWorkers := runtime.NumCPU() // ✓ sesuaikan dengan jumlah core
	chunkSize := (len(data) + numWorkers - 1) / numWorkers
	resultCh := make(chan int, numWorkers)
	var wg sync.WaitGroup

	for i := 0; i < len(data); i += chunkSize {
		end := i + chunkSize
		if end > len(data) { end = len(data) }
		wg.Add(1)
		go func(chunk []int) {
			defer wg.Done()
			sum := 0
			for _, v := range chunk { sum += v }
			resultCh <- sum
		}(data[i:end])
	}

	go func() { wg.Wait(); close(resultCh) }()
	total := 0
	for partial := range resultCh { total += partial }
	return total
}

Panduan praktis untuk menentukan granularity:

FaktorPanduan
Jumlah goroutineMulai dari runtime.NumCPU(), ukur, lalu sesuaikan
Ukuran dataSemakin besar data, semakin banyak chunk bisa dibuat
Cost per itemSemakin mahal (I/O, komputasi berat), semakin kecil chunk yang optimal
Threshold rekursiTentukan batas di mana rekursi berhenti dan beralih ke sekuensial

Fork–Join vs Pola Concurrency Lain #

flowchart TD
    Q{Jenis masalah\nconcurrency?} --> Q1{Data bisa\ndipecah independen?}
    Q1 -- Ya --> Q2{Butuh hasil\ndigabungkan?}
    Q1 -- Tidak --> PC[Producer-Consumer\natau Pipeline]
    Q2 -- Ya --> FJ[Fork–Join ✓]
    Q2 -- Tidak --> FF[Fire-and-Forget\ngoroutine biasa]
    FJ --> Q3{Perlu batasi\nconcurrency?}
    Q3 -- Ya --> FJW[Fork–Join\n+ Semaphore]
    Q3 -- Tidak --> FJP[Fork–Join\nmurni]
AspekFork–JoinWorker PoolPipeline
StrukturPohon (pecah & gabung)Pool flatRantai linear
SinkronisasiDi titik join yang eksplisitTerus-menerus (channel)Per stage
Ideal untukDivide & conquer, agregasiTask queue, rate limitingTransformasi data bertahap
Sub-taskIndependen sepenuhnyaIndependenBergantung pada stage sebelumnya

Anti-Pattern yang Harus Dihindari #

// ✗ Fork tanpa join — goroutine leak, hasil tidak dikumpulkan
func noJoin(data []int) {
	for _, chunk := range splitData(data) {
		go processChunk(chunk) // ✗ tidak ada WaitGroup, tidak ada join
	}
	// fungsi kembali sebelum sub-task selesai — hasilnya hilang
}

// ✓ Selalu ada join yang eksplisit sebelum menggunakan hasil
func withJoin(data []int) int {
	resultCh := make(chan int, len(data))
	var wg sync.WaitGroup
	for _, chunk := range splitData(data) {
		wg.Add(1)
		go func(c []int) {
			defer wg.Done()
			resultCh <- process(c) // ✓ hasil dikirim ke channel
		}(chunk)
	}
	go func() { wg.Wait(); close(resultCh) }() // ✓ join eksplisit
	total := 0
	for r := range resultCh { total += r }
	return total
}

// ✗ Shared mutable state tanpa sinkronisasi — data race
var sharedTotal int
func racyFork(data []int) {
	var wg sync.WaitGroup
	for _, chunk := range splitData(data) {
		wg.Add(1)
		go func(c []int) {
			defer wg.Done()
			for _, v := range c {
				sharedTotal += v // ✗ race condition — multiple goroutine write bersamaan
			}
		}(chunk)
	}
	wg.Wait()
}

// ✓ Setiap sub-task menghasilkan hasil parsial, digabungkan di akhir
func safeFork(data []int) int {
	resultCh := make(chan int, 8)
	var wg sync.WaitGroup
	for _, chunk := range splitData(data) {
		wg.Add(1)
		go func(c []int) {
			defer wg.Done()
			local := 0
			for _, v := range c { local += v } // ✓ variabel lokal, tidak ada race
			resultCh <- local
		}(chunk)
	}
	go func() { wg.Wait(); close(resultCh) }()
	total := 0
	for r := range resultCh { total += r } // ✓ gabung di sini, bukan di goroutine
	return total
}

// ✗ Tidak ada threshold untuk rekursi — goroutine untuk data ukuran 1
func noThreshold(data []int) int {
	if len(data) <= 1 { // ✗ seharusnya ada threshold yang lebih besar
		return data[0]
	}
	// ... fork rekursif
	return 0
}

// ✓ Threshold mencegah overhead goroutine untuk data kecil
func withThreshold(data []int) int {
	if len(data) <= 1000 { // ✓ kerjakan sekuensial untuk chunk kecil
		sum := 0
		for _, v := range data { sum += v }
		return sum
	}
	// ... fork rekursif hanya untuk data besar
	return 0
}

Checklist Review Fork–Join #

DESAIN TASK:
  □ Sub-task benar-benar independen — tidak ada shared mutable state antar sub-task
  □ Ukuran chunk dipilih berdasarkan jumlah core dan cost per item
  □ Ada threshold untuk menghentikan rekursi dan beralih ke sekuensial

SINKRONISASI:
  □ WaitGroup.Add() selalu dipanggil sebelum goroutine diluncurkan
  □ WaitGroup.Done() selalu di-defer di awal goroutine
  □ Channel hasil selalu buffered (kapasitas ≥ jumlah sub-task)
  □ Channel ditutup setelah WaitGroup.Wait() selesai

ERROR HANDLING:
  □ Error dari sub-task dikumpulkan, bukan dibuang diam-diam
  □ Semua error dilaporkan ke caller setelah join
  □ Program memutuskan dengan eksplisit: hentikan semua atau lanjut dengan partial

RESOURCE CONTROL:
  □ Jumlah goroutine concurrent dibatasi (semaphore) untuk data berskala besar
  □ Tidak ada goroutine yang bisa leak setelah join selesai
  □ Memory allocation per sub-task diperhitungkan untuk dataset besar

PENGUJIAN:
  □ Diuji dengan go test -race untuk mendeteksi data race
  □ Hasil dibandingkan dengan versi sekuensial untuk memverifikasi kebenaran
  □ Benchmark dilakukan untuk memastikan ada speedup nyata

Ringkasan #

  • Fork–Join memiliki dua fase yang tidak terpisahkan — fork memecah pekerjaan dan meluncurkan sub-task paralel, join menunggu semua selesai dan mengagregasi hasilnya.
  • sync.WaitGroup adalah mekanisme join di GoAdd() sebelum goroutine diluncurkan, Done() di-defer di awal goroutine, Wait() memblokir sampai semua selesai.
  • Channel hasil harus buffered dengan kapasitas minimal sejumlah sub-task — channel unbuffered menyebabkan deadlock karena sub-task blocking saat join belum berjalan.
  • Sub-task harus benar-benar independen — tidak boleh ada shared mutable state antar sub-task; setiap sub-task menghasilkan nilai parsial yang aman untuk digabungkan di titik join.
  • Granularity menentukan efisiensi — terlalu halus (satu goroutine per elemen) mengakibatkan overhead scheduler yang mengalahkan manfaat paralel; mulai dari runtime.NumCPU() chunk sebagai titik awal.
  • Threshold wajib ada di rekursi — hentikan fork dan beralih ke komputasi sekuensial ketika ukuran data di bawah ambang batas tertentu; overhead goroutine tidak sebanding untuk data kecil.
  • Error dari sub-task harus dikumpulkan, bukan dibuang — gunakan struct result yang membawa nilai dan error sekaligus, agregasi keduanya di titik join.
  • Semaphore membatasi concurrency untuk dataset besar — buffered channel sebagai semaphore mencegah ledakan goroutine saat data yang diproses sangat banyak.
  • Verifikasi dengan go test -race — masalah concurrency di Fork–Join sering tidak terdeteksi saat testing biasa dan hanya muncul di production; race detector adalah alat wajib.
  • Fork–Join adalah fondasi divide and conquer konkuren — merge sort paralel, parallel map-reduce, dan image processing per-region semua menggunakan struktur yang sama.

← Sebelumnya: Async Callback   Berikutnya: Read–Write Lock →

About | Author | Content Scope | Editorial Policy | Privacy Policy | Disclaimer | Contact