Initial import

This commit is contained in:
Fabio Scotto di Santolo
2024-11-23 14:30:25 +01:00
parent 275f4dd5e5
commit 7952442254
13 changed files with 562 additions and 0 deletions

4
.gitignore vendored
View File

@@ -1,5 +1,6 @@
# Compiled class file
*.class
target/
# Log file
*.log
@@ -22,3 +23,6 @@
# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
hs_err_pid*
replay_pid*
# IntelliJ IDEA files
.idea/

View File

@@ -1,2 +1,3 @@
# java-concurrency
This repository is a nutshell of concurrency problems or features in Java

20
pom.xml Normal file
View File

@@ -0,0 +1,20 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>co.fscotto</groupId>
<artifactId>java-concurrency</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<properties>
<maven.compiler.source>23</maven.compiler.source>
<maven.compiler.target>23</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,56 @@
package co.fscotto.concurrency;
import java.security.SecureRandom;
import java.util.Random;
public class AtomicPrimitive {
public static final Random RANDOM = new SecureRandom();
private static int counter = 0;
public static void main(String[] args) throws InterruptedException {
final Runnable writerTask = () -> counter = RANDOM.nextInt(1000) + 1;
final Runnable readerTask = () -> System.out.println(Thread.currentThread().getName() + " read counter = " + counter);
Thread.ofPlatform().start(writerTask);
for (int i = 1; i <= 10; i++) {
Thread.ofPlatform().name("T" + 1).start(readerTask);
}
final var spinner = new Spinner(5);
Thread.ofPlatform().start(spinner);
for (int i = 1; i <= 10; i++) {
Thread.ofPlatform().name("T" + i).start(spinner::updateCurrent);
Thread.sleep(10L * i * RANDOM.nextLong(1, 100));
}
System.out.println("Spinner update done.");
}
public static class Spinner implements Runnable {
private int currentValue;
public Spinner(int currentValue) {
this.currentValue = currentValue;
}
public int getCurrentValue() {
return currentValue;
}
public void updateCurrent() {
currentValue = RANDOM.nextInt(1000) + 1;
}
@Override
public void run() {
while (true) {
try {
System.out.println(getCurrentValue());
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.interrupted();
}
}
}
}
}

View File

@@ -0,0 +1,28 @@
package co.fscotto.concurrency;
public class Babble extends Thread {
private static boolean doYield;
private static int howOften;
private final String word;
public Babble(String whatToSay) {
this.word = whatToSay;
}
@Override
public void run() {
for (int i = 0; i < howOften; i++) {
System.out.println(word);
if (doYield)
Thread.yield();
}
}
public static void main(String[] args) {
doYield = Boolean.parseBoolean(args[0]);
howOften = Integer.parseInt(args[1]);
for (int i = 2; i < args.length; i++) {
new Babble(args[i]).start();
}
}
}

View File

@@ -0,0 +1,89 @@
package co.fscotto.concurrency;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
class Downloader extends Thread {
private final URL url;
private final String fileName;
private final List<ProgressListener> listeners;
public Downloader(URL url, String outputFilename) {
this.url = url;
this.fileName = outputFilename;
this.listeners = new ArrayList<>();
}
public synchronized void addListener(ProgressListener listener) {
listeners.add(listener);
}
public synchronized void removeListener(ProgressListener listener) {
listeners.remove(listener);
}
private synchronized void updateProgress(int n) {
for (ProgressListener listener : listeners) {
listener.onProgress(n);
System.out.println("Next progress listener");
}
}
@Override
public void run() {
var n = 0;
var total = 0;
final var buffer = new byte[1024];
try (InputStream in = url.openStream();
OutputStream out = new FileOutputStream(fileName)) {
while ((n = in.read(buffer)) != -1) {
out.write(buffer, 0, n);
total += n;
System.out.printf("%s lock Downloader.updateProgress%n", Thread.currentThread().getName());
updateProgress(total);
System.out.printf("%s unlock Downloader.updateProgress%n", Thread.currentThread().getName());
}
out.flush();
} catch (IOException _) {
Thread.interrupted();
}
}
public static void main(String[] args) throws IOException {
final var url = URI.create("https://www.google.it").toURL();
final var downloader1 = new Downloader(url, "file1.txt");
final var downloader2 = new Downloader(url, "file2.txt");
final var listener = new MyProgressListener(downloader1);
downloader1.addListener(listener);
downloader2.addListener(listener);
downloader1.start();
downloader2.start();
}
}
interface ProgressListener {
void onProgress(int n);
}
class MyProgressListener implements ProgressListener {
private final Downloader target;
public MyProgressListener(Downloader target) {
this.target = target;
}
@Override
public synchronized void onProgress(int n) {
System.out.printf("Lock %s in ProgressListener.onProgress%n", Thread.currentThread().getName());
synchronized (target) {
System.out.printf("%s lock downloader field in the progress listener%n", Thread.currentThread().getName());
}
System.out.printf("Unlock %s in ProgressListener.onProgress%n", Thread.currentThread().getName());
}
}

View File

@@ -0,0 +1,37 @@
package co.fscotto.concurrency;
public class Friendly {
private final String name;
private Friendly partner;
public Friendly(String name) {
this.name = name;
}
// Fix deadlock without synchronized on method signature
//public synchronized void hug() {
public void hug() {
System.out.println(Thread.currentThread().getName() + " in " + name + ".hug() trying to invoke " + partner.name + ".hugBack()");
synchronized (this) {
partner.hugBack();
}
}
public synchronized void hugBack() {
System.out.println(Thread.currentThread().getName() + " in " + name + ".hugBack()");
}
public void becomeFriend(Friendly partner) {
this.partner = partner;
}
public static void main(String[] args) {
final Friendly gareth = new Friendly("Gareth");
final Friendly cory = new Friendly("Cory");
gareth.becomeFriend(cory);
cory.becomeFriend(gareth);
new Thread(gareth::hug, "T1").start();
new Thread(cory::hug, "T2").start();
}
}

View File

@@ -0,0 +1,29 @@
package co.fscotto.concurrency;
public class HappensBefore {
private static int x;
private static volatile int g;
public static void main(String[] args) throws InterruptedException {
final Runnable run1 = () -> {
x = 1;
g = 1;
};
final Runnable run2 = () -> {
var r1 = g;
var r2 = x;
System.out.printf("r1 = %d, r2 = %d%n", r1, r2);
};
var t1 = new Thread(run1);
var t2 = new Thread(run2);
t1.start();
t2.start();
t1.join();
t2.join();
System.out.printf("x = %d, g = %d%n", x, g);
}
}

View File

@@ -0,0 +1,114 @@
package co.fscotto.concurrency;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class MultiThreading {
private static final AtomicInteger ATOMIC_COUNTER = new AtomicInteger(0);
private static final Object MONITOR_OBJ = new Object();
private static int counter = 0;
public static void main(String[] args) throws Exception {
final Runnable atomicRunnable = () -> {
for (int i = 0; i < 10_000; i++) {
ATOMIC_COUNTER.incrementAndGet();
}
};
// final Runnable runnable = () -> {
// for (int i = 0; i < 10_000; i++) {
// counter++;
// }
// };
final Runnable runnable = () -> {
for (int i = 0; i < 10_000; i++) {
synchronized (MONITOR_OBJ) {
counter++;
}
}
};
final var t1 = new Thread(runnable);
final var t2 = new Thread(runnable);
t1.start();
t1.join();
t2.start();
t2.join();
System.out.printf("The primitive counter is %d%n", counter);
try (final var pool = new ExecutorShutdownDecorator(Executors.newFixedThreadPool(2))) {
pool.submit(atomicRunnable).get(1, TimeUnit.SECONDS);
pool.submit(atomicRunnable).get(1, TimeUnit.SECONDS);
System.out.printf("The atomic counter is %d%n", ATOMIC_COUNTER.intValue());
}
final var server = new PrintServer();
server.print(new PrintJob("work1"));
server.print(new PrintJob("work2"));
server.print(new PrintJob("work3"));
}
record ExecutorShutdownDecorator(ExecutorService executor) implements AutoCloseable {
public Future<?> submit(Runnable runnable) {
return executor.submit(runnable);
}
@Override
public void close() {
executor.shutdown();
}
}
public record PrintJob(String name) {
}
public static class PrintQueue {
private final Queue<PrintJob> queue = new LinkedList<>();
public synchronized void add(PrintJob job) {
queue.add(job);
notifyAll();
}
public synchronized PrintJob remove() throws InterruptedException {
while (queue.isEmpty())
wait();
return queue.remove();
}
}
public static class PrintServer {
private final PrintQueue requests = new PrintQueue();
public PrintServer() {
Runnable service = () -> {
while (true) {
try {
realPrint(requests.remove());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
};
new Thread(service).start();
}
public void print(PrintJob job) {
requests.add(job);
}
private void realPrint(PrintJob job) {
// effettua la stampa
System.out.printf("Stampo il job %s%n", job.name());
}
}
}

View File

@@ -0,0 +1,114 @@
package co.fscotto.concurrency;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Random;
public class Philosopher extends Thread {
private final String name;
private final Chopstick first;
private final Chopstick second;
private final Random random;
public Philosopher(String name, Chopstick left, Chopstick right) {
super(name);
this.name = name;
this.first = left;
this.second = right;
this.random = new Random();
}
@Override
public void run() {
try {
while (!isInterrupted()) {
Thread.sleep(random.nextInt(1)); // THINK
synchronized (first) {
synchronized (second) {
System.out.println(this.name + " is eating");
Thread.sleep(random.nextInt(1000)); // EAT
}
}
System.out.println(this.name + " have eaten");
}
} catch (InterruptedException _) {
if (Thread.interrupted()) {
System.out.println(name + " interrupted");
}
}
}
public record Chopstick(int id) {
}
public static void main(String[] args) {
final var chopsticks = new Chopstick[5];
for (var i = 1; i <= chopsticks.length; i++) {
chopsticks[i - 1] = new Chopstick(i);
}
final var philosophers = new Philosopher[5];
philosophers[0] = new Philosopher("Aristotele", chopsticks[0], chopsticks[1]);
philosophers[1] = new Philosopher("Socrate", chopsticks[1], chopsticks[2]);
philosophers[2] = new Philosopher("Voltaire", chopsticks[2], chopsticks[3]);
philosophers[3] = new Philosopher("Pitagora", chopsticks[3], chopsticks[4]);
philosophers[4] = new Philosopher("Platone", chopsticks[4], chopsticks[0]);
final var deadlockChecker = new DeadlockChecker(philosophers);
deadlockChecker.watch();
for (var philosopher : philosophers) {
philosopher.start();
}
}
public static class DeadlockChecker {
private final List<Thread> threads;
public DeadlockChecker(Thread... threads) {
this.threads = new ArrayList<>(List.of(threads));
}
public synchronized void addThread(Thread runnable) {
threads.add(runnable);
}
public void watch() {
if (threads == null || threads.isEmpty())
throw new IllegalStateException("No threads are registered");
Thread.ofPlatform().start(() -> {
final var threadMXBean = ManagementFactory.getThreadMXBean();
while (!Thread.currentThread().isInterrupted()) {
try {
Thread.sleep(Duration.ofSeconds(5));
final var deadlockedThreadsIds = threadMXBean.findDeadlockedThreads();
if (deadlockedThreadsIds != null) {
System.out.println("Deadlock found!!!");
for (var threadId : deadlockedThreadsIds) {
final var threadInfo = threadMXBean.getThreadInfo(threadId);
selectThread(threadInfo.getThreadName()).ifPresent(t -> {
System.out.printf("Stopping thread %s%n", t.getName());
t.interrupt(); // FIXME There isn't a way to force to stop another thread
});
}
}
} catch (InterruptedException _) {
Thread.interrupted();
System.out.println("Deadlock Checker is interrupted");
}
}
});
}
private Optional<Thread> selectThread(String threadName) {
return threads
.stream()
.filter(t -> t.getName().equals(threadName))
.findAny();
}
}
}

View File

@@ -0,0 +1,24 @@
package co.fscotto.concurrency;
public class Puzzle {
static boolean answerReady = false;
static int answer = 0;
static Thread t1 = new Thread(() -> {
answer = 42;
answerReady = true;
});
static Thread t2 = new Thread(() -> {
if (answerReady) {
System.out.println("This meaning of life is: " + answer);
} else {
System.out.println("I don't know the answer");
}
});
public static void main(String[] args) throws InterruptedException {
t1.start();
t2.start();
t1.join();
t2.join();
}
}

View File

@@ -0,0 +1,39 @@
package co.fscotto.concurrency;
import java.util.concurrent.TimeUnit;
public class ShowJoin {
public static void main(String[] args) {
try {
CalcThread calcThread = new CalcThread();
calcThread.start();
doSomethingElse();
calcThread.join();
System.out.println("result is " + calcThread.getResult());
} catch (InterruptedException e) {
System.out.println("No answer: interrupted");
}
}
public static void doSomethingElse() throws InterruptedException {
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
}
}
class CalcThread extends Thread {
private double result;
@Override
public void run() {
result = calculate();
}
public double getResult() {
return result;
}
private double calculate() {
return Math.random();
}
}

View File

@@ -0,0 +1,7 @@
package co.fscotto;
import junit.framework.TestCase;
public class MultiThreadingTest extends TestCase {
}