diff --git a/.gitignore b/.gitignore index 524f096..3f68f95 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # Compiled class file *.class +target/ # Log file *.log @@ -22,3 +23,6 @@ # virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml hs_err_pid* replay_pid* + +# IntelliJ IDEA files +.idea/ diff --git a/README.md b/README.md index ed8c694..b9b7ac3 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,3 @@ # java-concurrency + This repository is a nutshell of concurrency problems or features in Java diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..4709a86 --- /dev/null +++ b/pom.xml @@ -0,0 +1,20 @@ + + 4.0.0 + co.fscotto + java-concurrency + jar + 1.0 + + 23 + 23 + + + + junit + junit + 3.8.1 + test + + + diff --git a/src/main/java/co/fscotto/concurrency/AtomicPrimitive.java b/src/main/java/co/fscotto/concurrency/AtomicPrimitive.java new file mode 100644 index 0000000..5ff8a38 --- /dev/null +++ b/src/main/java/co/fscotto/concurrency/AtomicPrimitive.java @@ -0,0 +1,56 @@ +package co.fscotto.concurrency; + +import java.security.SecureRandom; +import java.util.Random; + +public class AtomicPrimitive { + public static final Random RANDOM = new SecureRandom(); + private static int counter = 0; + + public static void main(String[] args) throws InterruptedException { + final Runnable writerTask = () -> counter = RANDOM.nextInt(1000) + 1; + final Runnable readerTask = () -> System.out.println(Thread.currentThread().getName() + " read counter = " + counter); + + Thread.ofPlatform().start(writerTask); + for (int i = 1; i <= 10; i++) { + Thread.ofPlatform().name("T" + 1).start(readerTask); + } + + final var spinner = new Spinner(5); + Thread.ofPlatform().start(spinner); + for (int i = 1; i <= 10; i++) { + Thread.ofPlatform().name("T" + i).start(spinner::updateCurrent); + Thread.sleep(10L * i * RANDOM.nextLong(1, 100)); + } + + System.out.println("Spinner update done."); + } + + public static class Spinner implements Runnable { + private int currentValue; + + public Spinner(int currentValue) { + this.currentValue = currentValue; + } + + public int getCurrentValue() { + return currentValue; + } + + public void updateCurrent() { + currentValue = RANDOM.nextInt(1000) + 1; + } + + @Override + public void run() { + while (true) { + try { + System.out.println(getCurrentValue()); + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.interrupted(); + } + } + } + } +} diff --git a/src/main/java/co/fscotto/concurrency/Babble.java b/src/main/java/co/fscotto/concurrency/Babble.java new file mode 100644 index 0000000..33b039e --- /dev/null +++ b/src/main/java/co/fscotto/concurrency/Babble.java @@ -0,0 +1,28 @@ +package co.fscotto.concurrency; + +public class Babble extends Thread { + private static boolean doYield; + private static int howOften; + private final String word; + + public Babble(String whatToSay) { + this.word = whatToSay; + } + + @Override + public void run() { + for (int i = 0; i < howOften; i++) { + System.out.println(word); + if (doYield) + Thread.yield(); + } + } + + public static void main(String[] args) { + doYield = Boolean.parseBoolean(args[0]); + howOften = Integer.parseInt(args[1]); + for (int i = 2; i < args.length; i++) { + new Babble(args[i]).start(); + } + } +} diff --git a/src/main/java/co/fscotto/concurrency/Downloader.java b/src/main/java/co/fscotto/concurrency/Downloader.java new file mode 100644 index 0000000..ab4eba9 --- /dev/null +++ b/src/main/java/co/fscotto/concurrency/Downloader.java @@ -0,0 +1,89 @@ +package co.fscotto.concurrency; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.URI; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; + +class Downloader extends Thread { + private final URL url; + private final String fileName; + private final List listeners; + + public Downloader(URL url, String outputFilename) { + this.url = url; + this.fileName = outputFilename; + this.listeners = new ArrayList<>(); + } + + public synchronized void addListener(ProgressListener listener) { + listeners.add(listener); + } + + public synchronized void removeListener(ProgressListener listener) { + listeners.remove(listener); + } + + private synchronized void updateProgress(int n) { + for (ProgressListener listener : listeners) { + listener.onProgress(n); + System.out.println("Next progress listener"); + } + } + + @Override + public void run() { + var n = 0; + var total = 0; + final var buffer = new byte[1024]; + try (InputStream in = url.openStream(); + OutputStream out = new FileOutputStream(fileName)) { + while ((n = in.read(buffer)) != -1) { + out.write(buffer, 0, n); + total += n; + System.out.printf("%s lock Downloader.updateProgress%n", Thread.currentThread().getName()); + updateProgress(total); + System.out.printf("%s unlock Downloader.updateProgress%n", Thread.currentThread().getName()); + } + out.flush(); + } catch (IOException _) { + Thread.interrupted(); + } + } + + public static void main(String[] args) throws IOException { + final var url = URI.create("https://www.google.it").toURL(); + final var downloader1 = new Downloader(url, "file1.txt"); + final var downloader2 = new Downloader(url, "file2.txt"); + final var listener = new MyProgressListener(downloader1); + downloader1.addListener(listener); + downloader2.addListener(listener); + downloader1.start(); + downloader2.start(); + } +} + +interface ProgressListener { + void onProgress(int n); +} + +class MyProgressListener implements ProgressListener { + private final Downloader target; + + public MyProgressListener(Downloader target) { + this.target = target; + } + + @Override + public synchronized void onProgress(int n) { + System.out.printf("Lock %s in ProgressListener.onProgress%n", Thread.currentThread().getName()); + synchronized (target) { + System.out.printf("%s lock downloader field in the progress listener%n", Thread.currentThread().getName()); + } + System.out.printf("Unlock %s in ProgressListener.onProgress%n", Thread.currentThread().getName()); + } +} diff --git a/src/main/java/co/fscotto/concurrency/Friendly.java b/src/main/java/co/fscotto/concurrency/Friendly.java new file mode 100644 index 0000000..ce315c3 --- /dev/null +++ b/src/main/java/co/fscotto/concurrency/Friendly.java @@ -0,0 +1,37 @@ +package co.fscotto.concurrency; + +public class Friendly { + private final String name; + private Friendly partner; + + public Friendly(String name) { + this.name = name; + } + + // Fix deadlock without synchronized on method signature + //public synchronized void hug() { + public void hug() { + System.out.println(Thread.currentThread().getName() + " in " + name + ".hug() trying to invoke " + partner.name + ".hugBack()"); + synchronized (this) { + partner.hugBack(); + } + } + + public synchronized void hugBack() { + System.out.println(Thread.currentThread().getName() + " in " + name + ".hugBack()"); + } + + public void becomeFriend(Friendly partner) { + this.partner = partner; + } + + public static void main(String[] args) { + final Friendly gareth = new Friendly("Gareth"); + final Friendly cory = new Friendly("Cory"); + gareth.becomeFriend(cory); + cory.becomeFriend(gareth); + + new Thread(gareth::hug, "T1").start(); + new Thread(cory::hug, "T2").start(); + } +} diff --git a/src/main/java/co/fscotto/concurrency/HappensBefore.java b/src/main/java/co/fscotto/concurrency/HappensBefore.java new file mode 100644 index 0000000..4dbbc14 --- /dev/null +++ b/src/main/java/co/fscotto/concurrency/HappensBefore.java @@ -0,0 +1,29 @@ +package co.fscotto.concurrency; + +public class HappensBefore { + private static int x; + private static volatile int g; + + public static void main(String[] args) throws InterruptedException { + final Runnable run1 = () -> { + x = 1; + g = 1; + }; + final Runnable run2 = () -> { + var r1 = g; + var r2 = x; + System.out.printf("r1 = %d, r2 = %d%n", r1, r2); + }; + + var t1 = new Thread(run1); + var t2 = new Thread(run2); + + t1.start(); + t2.start(); + + t1.join(); + t2.join(); + + System.out.printf("x = %d, g = %d%n", x, g); + } +} diff --git a/src/main/java/co/fscotto/concurrency/MultiThreading.java b/src/main/java/co/fscotto/concurrency/MultiThreading.java new file mode 100644 index 0000000..13ca54e --- /dev/null +++ b/src/main/java/co/fscotto/concurrency/MultiThreading.java @@ -0,0 +1,114 @@ +package co.fscotto.concurrency; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class MultiThreading { + private static final AtomicInteger ATOMIC_COUNTER = new AtomicInteger(0); + private static final Object MONITOR_OBJ = new Object(); + private static int counter = 0; + + public static void main(String[] args) throws Exception { + final Runnable atomicRunnable = () -> { + for (int i = 0; i < 10_000; i++) { + ATOMIC_COUNTER.incrementAndGet(); + } + }; +// final Runnable runnable = () -> { +// for (int i = 0; i < 10_000; i++) { +// counter++; +// } +// }; + final Runnable runnable = () -> { + for (int i = 0; i < 10_000; i++) { + synchronized (MONITOR_OBJ) { + counter++; + } + } + }; + + final var t1 = new Thread(runnable); + final var t2 = new Thread(runnable); + + t1.start(); + t1.join(); + + t2.start(); + t2.join(); + + System.out.printf("The primitive counter is %d%n", counter); + + try (final var pool = new ExecutorShutdownDecorator(Executors.newFixedThreadPool(2))) { + pool.submit(atomicRunnable).get(1, TimeUnit.SECONDS); + pool.submit(atomicRunnable).get(1, TimeUnit.SECONDS); + System.out.printf("The atomic counter is %d%n", ATOMIC_COUNTER.intValue()); + } + + final var server = new PrintServer(); + server.print(new PrintJob("work1")); + server.print(new PrintJob("work2")); + server.print(new PrintJob("work3")); + } + + record ExecutorShutdownDecorator(ExecutorService executor) implements AutoCloseable { + + public Future submit(Runnable runnable) { + return executor.submit(runnable); + } + + @Override + public void close() { + executor.shutdown(); + } + } + + public record PrintJob(String name) { + } + + public static class PrintQueue { + private final Queue queue = new LinkedList<>(); + + public synchronized void add(PrintJob job) { + queue.add(job); + notifyAll(); + } + + public synchronized PrintJob remove() throws InterruptedException { + while (queue.isEmpty()) + wait(); + return queue.remove(); + } + } + + public static class PrintServer { + private final PrintQueue requests = new PrintQueue(); + + public PrintServer() { + Runnable service = () -> { + while (true) { + try { + realPrint(requests.remove()); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + }; + new Thread(service).start(); + } + + public void print(PrintJob job) { + requests.add(job); + } + + private void realPrint(PrintJob job) { + // effettua la stampa + System.out.printf("Stampo il job %s%n", job.name()); + } + } +} diff --git a/src/main/java/co/fscotto/concurrency/Philosopher.java b/src/main/java/co/fscotto/concurrency/Philosopher.java new file mode 100644 index 0000000..51245cc --- /dev/null +++ b/src/main/java/co/fscotto/concurrency/Philosopher.java @@ -0,0 +1,114 @@ +package co.fscotto.concurrency; + +import java.lang.management.ManagementFactory; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Random; + +public class Philosopher extends Thread { + private final String name; + private final Chopstick first; + private final Chopstick second; + private final Random random; + + public Philosopher(String name, Chopstick left, Chopstick right) { + super(name); + this.name = name; + this.first = left; + this.second = right; + this.random = new Random(); + } + + @Override + public void run() { + try { + while (!isInterrupted()) { + Thread.sleep(random.nextInt(1)); // THINK + synchronized (first) { + synchronized (second) { + System.out.println(this.name + " is eating"); + Thread.sleep(random.nextInt(1000)); // EAT + } + } + System.out.println(this.name + " have eaten"); + } + } catch (InterruptedException _) { + if (Thread.interrupted()) { + System.out.println(name + " interrupted"); + } + } + } + + public record Chopstick(int id) { + } + + public static void main(String[] args) { + final var chopsticks = new Chopstick[5]; + for (var i = 1; i <= chopsticks.length; i++) { + chopsticks[i - 1] = new Chopstick(i); + } + + final var philosophers = new Philosopher[5]; + philosophers[0] = new Philosopher("Aristotele", chopsticks[0], chopsticks[1]); + philosophers[1] = new Philosopher("Socrate", chopsticks[1], chopsticks[2]); + philosophers[2] = new Philosopher("Voltaire", chopsticks[2], chopsticks[3]); + philosophers[3] = new Philosopher("Pitagora", chopsticks[3], chopsticks[4]); + philosophers[4] = new Philosopher("Platone", chopsticks[4], chopsticks[0]); + + final var deadlockChecker = new DeadlockChecker(philosophers); + deadlockChecker.watch(); + + for (var philosopher : philosophers) { + philosopher.start(); + } + } + + public static class DeadlockChecker { + private final List threads; + + public DeadlockChecker(Thread... threads) { + this.threads = new ArrayList<>(List.of(threads)); + } + + public synchronized void addThread(Thread runnable) { + threads.add(runnable); + } + + public void watch() { + if (threads == null || threads.isEmpty()) + throw new IllegalStateException("No threads are registered"); + + Thread.ofPlatform().start(() -> { + final var threadMXBean = ManagementFactory.getThreadMXBean(); + while (!Thread.currentThread().isInterrupted()) { + try { + Thread.sleep(Duration.ofSeconds(5)); + final var deadlockedThreadsIds = threadMXBean.findDeadlockedThreads(); + if (deadlockedThreadsIds != null) { + System.out.println("Deadlock found!!!"); + for (var threadId : deadlockedThreadsIds) { + final var threadInfo = threadMXBean.getThreadInfo(threadId); + selectThread(threadInfo.getThreadName()).ifPresent(t -> { + System.out.printf("Stopping thread %s%n", t.getName()); + t.interrupt(); // FIXME There isn't a way to force to stop another thread + }); + } + } + } catch (InterruptedException _) { + Thread.interrupted(); + System.out.println("Deadlock Checker is interrupted"); + } + } + }); + } + + private Optional selectThread(String threadName) { + return threads + .stream() + .filter(t -> t.getName().equals(threadName)) + .findAny(); + } + } +} diff --git a/src/main/java/co/fscotto/concurrency/Puzzle.java b/src/main/java/co/fscotto/concurrency/Puzzle.java new file mode 100644 index 0000000..ccd70bb --- /dev/null +++ b/src/main/java/co/fscotto/concurrency/Puzzle.java @@ -0,0 +1,24 @@ +package co.fscotto.concurrency; + +public class Puzzle { + static boolean answerReady = false; + static int answer = 0; + static Thread t1 = new Thread(() -> { + answer = 42; + answerReady = true; + }); + static Thread t2 = new Thread(() -> { + if (answerReady) { + System.out.println("This meaning of life is: " + answer); + } else { + System.out.println("I don't know the answer"); + } + }); + + public static void main(String[] args) throws InterruptedException { + t1.start(); + t2.start(); + t1.join(); + t2.join(); + } +} diff --git a/src/main/java/co/fscotto/concurrency/ShowJoin.java b/src/main/java/co/fscotto/concurrency/ShowJoin.java new file mode 100644 index 0000000..735711f --- /dev/null +++ b/src/main/java/co/fscotto/concurrency/ShowJoin.java @@ -0,0 +1,39 @@ +package co.fscotto.concurrency; + +import java.util.concurrent.TimeUnit; + +public class ShowJoin { + + public static void main(String[] args) { + try { + CalcThread calcThread = new CalcThread(); + calcThread.start(); + doSomethingElse(); + calcThread.join(); + System.out.println("result is " + calcThread.getResult()); + } catch (InterruptedException e) { + System.out.println("No answer: interrupted"); + } + } + + public static void doSomethingElse() throws InterruptedException { + Thread.sleep(TimeUnit.SECONDS.toMillis(5)); + } +} + +class CalcThread extends Thread { + private double result; + + @Override + public void run() { + result = calculate(); + } + + public double getResult() { + return result; + } + + private double calculate() { + return Math.random(); + } +} diff --git a/src/test/java/co/fscotto/MultiThreadingTest.java b/src/test/java/co/fscotto/MultiThreadingTest.java new file mode 100644 index 0000000..3482bde --- /dev/null +++ b/src/test/java/co/fscotto/MultiThreadingTest.java @@ -0,0 +1,7 @@ +package co.fscotto; + +import junit.framework.TestCase; + +public class MultiThreadingTest extends TestCase { + +}