Concurrency & Multithreading

From threads and locks to virtual threads: Java's evolution in concurrent programming

Why Concurrency Matters

At Twilio scale, you're handling millions of concurrent API calls, SMS messages, and voice streams. Understanding Java's concurrency model is essential for building systems that are:

Python vs Java: The GIL Difference

# Python has the Global Interpreter Lock (GIL)
# Only ONE thread executes Python bytecode at a time

import threading

def cpu_intensive():
    total = 0
    for i in range(10_000_000):
        total += i
    return total

# These run SEQUENTIALLY due to GIL (for CPU work)
t1 = threading.Thread(target=cpu_intensive)
t2 = threading.Thread(target=cpu_intensive)
t1.start(); t2.start()
t1.join(); t2.join()

# For CPU parallelism, Python uses multiprocessing (separate processes)
from multiprocessing import Pool
with Pool(4) as p:
    results = p.map(cpu_intensive, range(4))

# asyncio for I/O concurrency (cooperative, single-threaded)
import asyncio
async def fetch_data():
    await asyncio.sleep(1)  # Non-blocking
// Java has TRUE parallel execution
// Multiple threads run SIMULTANEOUSLY on multiple CPU cores

public void cpuIntensive() {
    long total = 0;
    for (int i = 0; i < 10_000_000; i++) {
        total += i;
    }
}

// These run TRULY IN PARALLEL on different cores
Thread t1 = new Thread(this::cpuIntensive);
Thread t2 = new Thread(this::cpuIntensive);
t1.start(); t2.start();
t1.join(); t2.join();

// Or use parallel streams
IntStream.range(0, 4)
    .parallel()
    .forEach(i -> cpuIntensive());

// This is why Java is preferred for high-performance backends
// True parallelism without process overhead

Key Difference

Python's GIL means threads are useful for I/O but not CPU parallelism (use multiprocessing instead). Java threads provide true parallelism for both. This is a fundamental architectural consideration.

Thread Basics

// Method 1: Extend Thread (not recommended)
public class MyThread extends Thread {
    @Override
    public void run() {
        System.out.println("Running in: " + Thread.currentThread().getName());
    }
}
new MyThread().start();

// Method 2: Implement Runnable (better - separates task from execution)
Runnable task = () -> {
    System.out.println("Running in: " + Thread.currentThread().getName());
};
new Thread(task).start();

// Method 3: Use Callable for tasks that return values
Callable<Integer> callable = () -> {
    Thread.sleep(1000);
    return 42;
};

// Method 4: ExecutorService (recommended for production)
ExecutorService executor = Executors.newFixedThreadPool(4);
Future<Integer> future = executor.submit(callable);
Integer result = future.get();  // Blocks until complete
executor.shutdown();

// Method 5: Virtual Threads (Java 21+) - preferred for I/O
Thread.startVirtualThread(() -> {
    System.out.println("Virtual thread!");
});
// Thread lifecycle states

/*
NEW        → Thread created but not started
RUNNABLE   → Thread executing or ready to execute
BLOCKED    → Waiting for monitor lock (synchronized)
WAITING    → Waiting indefinitely (wait(), join())
TIMED_WAIT → Waiting with timeout (sleep(), wait(timeout))
TERMINATED → Thread finished execution
*/

Thread t = new Thread(() -> {
    try {
        Thread.sleep(1000);  // TIMED_WAITING
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
});

System.out.println(t.getState());  // NEW
t.start();
System.out.println(t.getState());  // RUNNABLE (or TIMED_WAITING)
t.join();
System.out.println(t.getState());  // TERMINATED

// Interrupt a thread (cooperative cancellation)
Thread worker = new Thread(() -> {
    while (!Thread.currentThread().isInterrupted()) {
        // Do work
    }
    System.out.println("Interrupted, cleaning up...");
});
worker.start();
worker.interrupt();  // Request cancellation

Synchronization: Protecting Shared State

The Core Problem

When multiple threads access shared mutable state, you get race conditions. The result depends on unpredictable timing. This is the hardest bug to find because it may only happen under specific load conditions in production.

// BROKEN: Race condition
public class BrokenCounter {
    private int count = 0;

    public void increment() {
        count++;  // NOT ATOMIC! Read-modify-write
    }

    public int getCount() {
        return count;
    }
}

// What happens with 2 threads:
// Thread 1: reads count (0)
// Thread 2: reads count (0)  ← same value!
// Thread 1: writes count (1)
// Thread 2: writes count (1) ← lost update!
// Expected: 2, Actual: 1

BrokenCounter counter = new BrokenCounter();
ExecutorService executor = Executors.newFixedThreadPool(10);

for (int i = 0; i < 1000; i++) {
    executor.submit(counter::increment);
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);

System.out.println(counter.getCount());  // Often less than 1000!
// FIX 1: synchronized keyword
public class SynchronizedCounter {
    private int count = 0;

    // Only one thread can execute this at a time
    public synchronized void increment() {
        count++;
    }

    public synchronized int getCount() {
        return count;
    }
}

// Or synchronize on a specific object
public class Counter {
    private final Object lock = new Object();
    private int count = 0;

    public void increment() {
        synchronized (lock) {
            count++;
        }
    }

    public int getCount() {
        synchronized (lock) {
            return count;
        }
    }
}

// synchronized provides:
// 1. Mutual exclusion (only one thread in critical section)
// 2. Visibility (changes visible to other threads)
// 3. Happens-before ordering (memory model guarantees)
// FIX 2: Explicit Lock (more flexible than synchronized)
import java.util.concurrent.locks.*;

public class LockCounter {
    private final Lock lock = new ReentrantLock();
    private int count = 0;

    public void increment() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();  // ALWAYS unlock in finally!
        }
    }

    // Try to acquire lock without blocking
    public boolean tryIncrement() {
        if (lock.tryLock()) {
            try {
                count++;
                return true;
            } finally {
                lock.unlock();
            }
        }
        return false;  // Couldn't get lock
    }

    // Try with timeout
    public boolean tryIncrementWithTimeout() throws InterruptedException {
        if (lock.tryLock(100, TimeUnit.MILLISECONDS)) {
            try {
                count++;
                return true;
            } finally {
                lock.unlock();
            }
        }
        return false;
    }
}

// ReadWriteLock: multiple readers OR single writer
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();

public int read() {
    rwLock.readLock().lock();  // Multiple readers allowed
    try {
        return count;
    } finally {
        rwLock.readLock().unlock();
    }
}

public void write(int value) {
    rwLock.writeLock().lock();  // Exclusive access
    try {
        count = value;
    } finally {
        rwLock.writeLock().unlock();
    }
}

Atomic Classes: Lock-Free Thread Safety

// Atomic classes use CPU-level compare-and-swap (CAS)
// No locking needed - faster under contention

import java.util.concurrent.atomic.*;

// AtomicInteger - thread-safe integer
AtomicInteger counter = new AtomicInteger(0);
counter.incrementAndGet();      // Atomic ++counter
counter.getAndIncrement();      // Atomic counter++
counter.addAndGet(5);           // Atomic counter += 5
counter.compareAndSet(5, 10);   // If value is 5, set to 10
counter.updateAndGet(x -> x * 2);  // Atomic transformation

// AtomicReference - thread-safe object reference
AtomicReference<User> currentUser = new AtomicReference<>();
currentUser.set(new User("Alice"));
currentUser.compareAndSet(oldUser, newUser);

// AtomicBoolean - thread-safe flag
AtomicBoolean running = new AtomicBoolean(true);
while (running.get()) {
    // Do work
}
running.set(false);  // Another thread can stop the loop

// LongAdder - better than AtomicLong for high contention
LongAdder adder = new LongAdder();
adder.increment();
adder.add(10);
long total = adder.sum();  // Get final value

// Use LongAdder when many threads update frequently
// Use AtomicLong when you need exact value often

Concurrent Collections

// ConcurrentHashMap: thread-safe, high-performance map
// Uses segment-level locking (Java 7) or CAS (Java 8+)

ConcurrentMap<String, Integer> map = new ConcurrentHashMap<>();

// Basic operations are atomic
map.put("key", 1);
map.get("key");
map.remove("key");

// Atomic compound operations (these are KEY!)
map.putIfAbsent("key", 1);           // Only put if not present
map.computeIfAbsent("key", k -> 1);  // Compute only if absent
map.computeIfPresent("key", (k, v) -> v + 1);  // Update if present
map.compute("key", (k, v) -> v == null ? 1 : v + 1);  // Upsert
map.merge("key", 1, Integer::sum);   // Merge with existing

// Common pattern: counting occurrences
ConcurrentMap<String, LongAdder> wordCounts = new ConcurrentHashMap<>();

void countWord(String word) {
    wordCounts.computeIfAbsent(word, k -> new LongAdder()).increment();
}

// DON'T do this (not atomic!):
if (!map.containsKey("key")) {  // Check
    map.put("key", 1);          // Then act - RACE CONDITION!
}

// DO this instead:
map.putIfAbsent("key", 1);  // Atomic check-then-act
// CopyOnWriteArrayList: thread-safe list
// Great for: many reads, few writes (e.g., listener lists)
List<EventListener> listeners = new CopyOnWriteArrayList<>();
listeners.add(listener);     // Creates new array copy
for (EventListener l : listeners) {  // Safe iteration
    l.onEvent(event);
}

// BlockingQueue: producer-consumer pattern
BlockingQueue<Task> queue = new LinkedBlockingQueue<>(100);

// Producer
queue.put(task);      // Blocks if full
queue.offer(task);    // Returns false if full
queue.offer(task, 1, TimeUnit.SECONDS);  // Timeout

// Consumer
Task task = queue.take();   // Blocks if empty
Task task = queue.poll();   // Returns null if empty
Task task = queue.poll(1, TimeUnit.SECONDS);  // Timeout

// ConcurrentLinkedQueue: non-blocking queue
Queue<Task> queue = new ConcurrentLinkedQueue<>();
queue.offer(task);   // Never blocks
queue.poll();        // Returns null if empty

// ConcurrentSkipListMap: sorted concurrent map (like TreeMap)
ConcurrentNavigableMap<String, Integer> sortedMap = new ConcurrentSkipListMap<>();

ExecutorService: Thread Pool Management

// DON'T create raw threads in production
// DO use ExecutorService for thread pooling

// Fixed thread pool: bounded, reusable threads
ExecutorService executor = Executors.newFixedThreadPool(10);

// Cached thread pool: grows as needed, reuses idle threads
ExecutorService cached = Executors.newCachedThreadPool();

// Single thread: sequential execution, survives task failure
ExecutorService single = Executors.newSingleThreadExecutor();

// Scheduled: delayed/periodic execution
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
scheduler.schedule(task, 5, TimeUnit.SECONDS);  // Run after delay
scheduler.scheduleAtFixedRate(task, 0, 1, TimeUnit.SECONDS);  // Periodic

// Custom ThreadPoolExecutor for production
ThreadPoolExecutor custom = new ThreadPoolExecutor(
    10,                      // Core pool size
    50,                      // Maximum pool size
    60, TimeUnit.SECONDS,    // Keep-alive time for idle threads
    new LinkedBlockingQueue<>(1000),  // Work queue
    new ThreadPoolExecutor.CallerRunsPolicy()  // Rejection policy
);

// Submit tasks
executor.execute(runnable);           // Fire and forget
Future<Result> future = executor.submit(callable);  // Get result

// Get result (blocking)
Result result = future.get();                     // Wait forever
Result result = future.get(5, TimeUnit.SECONDS); // Timeout

// ALWAYS shut down executors
executor.shutdown();                   // No new tasks, finish existing
executor.awaitTermination(60, TimeUnit.SECONDS);
executor.shutdownNow();               // Interrupt running tasks
// CompletableFuture: composable async operations (Java 8+)
// Like JavaScript Promises

// Create async computation
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    return fetchFromDatabase();  // Runs in ForkJoinPool
});

// Chain transformations (non-blocking)
CompletableFuture<Integer> result = future
    .thenApply(s -> s.toUpperCase())        // Transform
    .thenApply(s -> s.length())             // Transform again
    .exceptionally(ex -> {                  // Handle errors
        log.error("Failed", ex);
        return 0;
    });

// Combine multiple futures
CompletableFuture<User> userFuture = fetchUser(id);
CompletableFuture<List<Order>> ordersFuture = fetchOrders(id);

CompletableFuture<UserWithOrders> combined = userFuture
    .thenCombine(ordersFuture, (user, orders) -> {
        return new UserWithOrders(user, orders);
    });

// Wait for all
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);

// Wait for any (first to complete)
CompletableFuture<Object> any = CompletableFuture.anyOf(f1, f2, f3);

// Async with custom executor
ExecutorService executor = Executors.newFixedThreadPool(10);
CompletableFuture.supplyAsync(() -> fetchData(), executor)
    .thenApplyAsync(data -> process(data), executor)
    .thenAcceptAsync(result -> save(result), executor);

Virtual Threads (Project Loom) Java 21+

// GAME CHANGER: Virtual threads are cheap (millions possible)
// Perfect for I/O-bound workloads like Twilio API calls

// Platform threads (traditional): 1MB stack each, OS-managed
// Virtual threads: ~1KB, JVM-managed, multiplexed onto platform threads

// Create virtual thread
Thread.startVirtualThread(() -> {
    // This is a virtual thread!
    String result = httpClient.send(request);  // Blocking is OK!
});

// Virtual thread per task executor
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
    // Submit millions of tasks - each gets its own virtual thread
    for (int i = 0; i < 1_000_000; i++) {
        executor.submit(() -> {
            String response = callExternalApi();  // Blocking I/O
            processResponse(response);
        });
    }
}  // Auto-closes, waits for completion

// Thread.Builder for configuration
Thread vt = Thread.ofVirtual()
    .name("my-virtual-thread")
    .start(() -> doWork());

// Key insight: Write SIMPLE BLOCKING CODE
// No callbacks, no reactive streams, no async/await
// JVM handles the scheduling

public User getUser(String id) {
    // These blocking calls are FINE with virtual threads
    User user = userRepository.findById(id);      // DB call
    List<Order> orders = orderService.getOrders(id);  // API call
    user.setOrders(orders);
    return user;
}

// Spring Boot 3.2+: Just enable it!
// spring.threads.virtual.enabled=true
// Structured Concurrency (Preview in Java 21)
// Treats concurrent tasks as a unit

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
    // Fork concurrent tasks
    Subtask<User> userTask = scope.fork(() -> fetchUser(id));
    Subtask<List<Order>> ordersTask = scope.fork(() -> fetchOrders(id));

    // Wait for all tasks (or first failure)
    scope.join();
    scope.throwIfFailed();

    // Get results
    User user = userTask.get();
    List<Order> orders = ordersTask.get();

    return new UserWithOrders(user, orders);
}
// If ANY task fails, all others are cancelled
// No dangling threads, no resource leaks

// ShutdownOnSuccess: return first successful result
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
    // Try multiple sources, take first success
    scope.fork(() -> fetchFromCache(key));
    scope.fork(() -> fetchFromDb(key));
    scope.fork(() -> fetchFromBackup(key));

    scope.join();
    return scope.result();  // First successful result
}

// Benefits:
// - Thread lifetime tied to scope (no leaks)
// - Automatic cancellation on failure
// - Clear parent-child relationship
// - Better debugging (stack traces show relationships)

Why Virtual Threads Matter for Twilio

Traditional approach: Complex async/reactive code (WebFlux, RxJava) to avoid blocking thread pools.
Virtual threads approach: Write simple blocking code, get async performance.

At Twilio scale (millions of concurrent API calls), this is transformative. One virtual thread per request, simple synchronous code, no callback hell.

Common Concurrency Patterns

Pattern 1: Producer-Consumer

// Classic pattern using BlockingQueue
BlockingQueue<Message> queue = new LinkedBlockingQueue<>(1000);

// Producer thread(s)
Runnable producer = () -> {
    while (running) {
        Message msg = createMessage();
        queue.put(msg);  // Blocks if queue full
    }
};

// Consumer thread(s)
Runnable consumer = () -> {
    while (running) {
        Message msg = queue.take();  // Blocks if queue empty
        process(msg);
    }
};

// Start producers and consumers
ExecutorService executor = Executors.newFixedThreadPool(6);
executor.submit(producer);  // 1 producer
for (int i = 0; i < 5; i++) {
    executor.submit(consumer);  // 5 consumers
}

Pattern 2: Scatter-Gather

// Call multiple services in parallel, aggregate results
public AggregatedData fetchAll(String userId) {
    CompletableFuture<User> userFuture =
        CompletableFuture.supplyAsync(() -> userService.get(userId));

    CompletableFuture<List<Order>> ordersFuture =
        CompletableFuture.supplyAsync(() -> orderService.getByUser(userId));

    CompletableFuture<List<Review>> reviewsFuture =
        CompletableFuture.supplyAsync(() -> reviewService.getByUser(userId));

    // Wait for all and combine
    return CompletableFuture.allOf(userFuture, ordersFuture, reviewsFuture)
        .thenApply(v -> new AggregatedData(
            userFuture.join(),
            ordersFuture.join(),
            reviewsFuture.join()
        ))
        .join();  // Block for final result
}

// With timeout and fallback
public AggregatedData fetchAllWithTimeout(String userId) {
    return CompletableFuture.allOf(...)
        .orTimeout(5, TimeUnit.SECONDS)
        .exceptionally(ex -> defaultData())
        .join();
}

Deadlocks: How to Avoid Them

Deadlock Conditions (all 4 required)

  1. Mutual exclusion - Resources can't be shared
  2. Hold and wait - Hold one resource while waiting for another
  3. No preemption - Can't forcibly take a resource
  4. Circular wait - A waits for B, B waits for A
// DEADLOCK: Thread 1 holds A, waits for B
//           Thread 2 holds B, waits for A

final Object lockA = new Object();
final Object lockB = new Object();

// Thread 1
synchronized (lockA) {
    Thread.sleep(100);  // Simulate work
    synchronized (lockB) {  // BLOCKED - Thread 2 has lockB
        // Never reached
    }
}

// Thread 2
synchronized (lockB) {
    Thread.sleep(100);  // Simulate work
    synchronized (lockA) {  // BLOCKED - Thread 1 has lockA
        // Never reached
    }
}

// Both threads waiting forever = DEADLOCK
// PREVENTION 1: Lock ordering (always acquire in same order)
synchronized (lockA) {  // Always A first
    synchronized (lockB) {  // Then B
        // Work
    }
}

// PREVENTION 2: Try-lock with timeout
if (lockA.tryLock(100, TimeUnit.MILLISECONDS)) {
    try {
        if (lockB.tryLock(100, TimeUnit.MILLISECONDS)) {
            try {
                // Work
            } finally {
                lockB.unlock();
            }
        } else {
            // Couldn't get lockB, release lockA and retry
        }
    } finally {
        lockA.unlock();
    }
}

// PREVENTION 3: Single lock (coarser granularity)
synchronized (singleLock) {
    // Access both resources
}

// PREVENTION 4: Lock-free data structures
ConcurrentHashMap<> map = new ConcurrentHashMap<>();
AtomicReference<> ref = new AtomicReference<>();

// PREVENTION 5: Immutable objects (no locks needed)
record Point(int x, int y) {}  // Thread-safe by design

Best Practices Summary

DoDon't
Use ExecutorService, not raw ThreadsCreate unbounded thread pools
Prefer immutable objectsShare mutable state without synchronization
Use concurrent collectionsSynchronize on Collections.synchronizedXxx()
Use atomic classes for countersUse synchronized for simple increment
Always release locks in finallyHold locks during I/O operations
Use virtual threads for I/O (Java 21+)Use reactive for everything
Document thread-safety guaranteesAssume code is thread-safe