From 5aac09213ad3b257ce5421096e21d5a82216079f Mon Sep 17 00:00:00 2001 From: Fabio Scotto di Santolo Date: Tue, 26 Nov 2019 19:33:42 +0100 Subject: [PATCH] Esempi sugli stream paralleli --- .../fp/moderjava/ForkJoinSumCalculator.java | 44 ++++++++++++++++++ .../java/org/gym/fp/moderjava/StreamTest.java | 34 ++++++++++++++ .../org/gym/fp/moderjava/WordCounter.java | 25 +++++++++++ .../fp/moderjava/WordCounterSpliterator.java | 45 +++++++++++++++++++ 4 files changed, 148 insertions(+) create mode 100644 modern-java/src/main/java/org/gym/fp/moderjava/ForkJoinSumCalculator.java create mode 100644 modern-java/src/main/java/org/gym/fp/moderjava/WordCounter.java create mode 100644 modern-java/src/main/java/org/gym/fp/moderjava/WordCounterSpliterator.java diff --git a/modern-java/src/main/java/org/gym/fp/moderjava/ForkJoinSumCalculator.java b/modern-java/src/main/java/org/gym/fp/moderjava/ForkJoinSumCalculator.java new file mode 100644 index 0000000..ae57bfd --- /dev/null +++ b/modern-java/src/main/java/org/gym/fp/moderjava/ForkJoinSumCalculator.java @@ -0,0 +1,44 @@ +package org.gym.fp.moderjava; + +import java.util.concurrent.RecursiveTask; + +public class ForkJoinSumCalculator extends RecursiveTask { + public static final long THRESHOLD = 10_000; + + private final long[] numbers; + private final int start; + private final int end; + + public ForkJoinSumCalculator(long[] numbers) { + this(numbers, 0, numbers.length); + } + + private ForkJoinSumCalculator(long[] numbers, int start, int end) { + this.numbers = numbers; + this.start = start; + this.end = end; + } + + @Override + protected Long compute() { + int length = end - start; + if (length <= THRESHOLD) { + return computeSequentially(); + } + ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2); + leftTask.fork(); + ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end); + Long rightResult = rightTask.compute(); + Long leftResult = leftTask.join(); + return leftResult + rightResult; + } + + private Long computeSequentially() { + long sum = 0; + for (int i = start; i < end; i++) { + sum += numbers[i]; + } + return sum; + } + +} diff --git a/modern-java/src/main/java/org/gym/fp/moderjava/StreamTest.java b/modern-java/src/main/java/org/gym/fp/moderjava/StreamTest.java index aec6962..42860c1 100644 --- a/modern-java/src/main/java/org/gym/fp/moderjava/StreamTest.java +++ b/modern-java/src/main/java/org/gym/fp/moderjava/StreamTest.java @@ -1,8 +1,12 @@ package org.gym.fp.moderjava; import java.util.*; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.ForkJoinTask; import java.util.stream.IntStream; +import java.util.stream.LongStream; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import static java.lang.System.out; import static java.util.Arrays.asList; @@ -10,6 +14,10 @@ import static java.util.Comparator.comparingInt; import static java.util.stream.Collectors.*; public class StreamTest { + private static final String SENTENCE = + " Nel mezzo del cammin di nostra vita " + + "mi ritrovia in una selva oscura" + + " ché la dritta via era smarrita "; public static void main(String[] args) { doStreamFilterDemo(); @@ -17,6 +25,7 @@ public class StreamTest { doStreamFindOrMatchingDemo(); doStreamReducingDemo(); doStreamCollectingDemo(); + doStreamParallelDemo(); } private static void doStreamFilterDemo() { @@ -302,6 +311,31 @@ public class StreamTest { out.println("----------------------------------------"); } + private static void doStreamParallelDemo() { + out.println("ForkJoin result (1, 100) = " + forkJoinSum(100)); + out.println("----------------------------------------"); + Stream stream1 = IntStream.range(0, SENTENCE.length()).mapToObj(SENTENCE::charAt); + out.printf("Found %d words\n", countWords(stream1)); + out.println("----------------------------------------"); + Spliterator spliterator = new WordCounterSpliterator(SENTENCE); + Stream stream2 = StreamSupport.stream(spliterator, true); + out.printf("Found %d words\n", countWords(stream2)); + out.println("----------------------------------------"); + } + + private static int countWords(Stream stream) { + WordCounter wordCounter = stream.reduce(new WordCounter(0, true), + WordCounter::accumulate, + WordCounter::combine); + return wordCounter.getCounter(); + } + + private static long forkJoinSum(int n) { + long[] numbers = LongStream.rangeClosed(1, n).toArray(); + ForkJoinTask task = new ForkJoinSumCalculator(numbers); + return new ForkJoinPool().invoke(task); + } + public static Map> partitionPrimes(int n) { return IntStream.rangeClosed(2, n).boxed() .collect(partitioningBy(candidate -> isPrime(candidate))); diff --git a/modern-java/src/main/java/org/gym/fp/moderjava/WordCounter.java b/modern-java/src/main/java/org/gym/fp/moderjava/WordCounter.java new file mode 100644 index 0000000..38aa1b3 --- /dev/null +++ b/modern-java/src/main/java/org/gym/fp/moderjava/WordCounter.java @@ -0,0 +1,25 @@ +package org.gym.fp.moderjava; + +public class WordCounter { + private final int counter; + private final boolean lastSpace; + + public WordCounter(int counter, boolean lastSpace) { + this.counter = counter; + this.lastSpace = lastSpace; + } + + public WordCounter accumulate(Character c) { + if (Character.isWhitespace(c)) return lastSpace ? this : new WordCounter(counter, true); + else return lastSpace ? new WordCounter(counter + 1, false) : this; + } + + public WordCounter combine(WordCounter wordCounter) { + return new WordCounter(this.counter + wordCounter.getCounter(), wordCounter.lastSpace); + } + + public int getCounter() { + return this.counter; + } + +} diff --git a/modern-java/src/main/java/org/gym/fp/moderjava/WordCounterSpliterator.java b/modern-java/src/main/java/org/gym/fp/moderjava/WordCounterSpliterator.java new file mode 100644 index 0000000..b122de1 --- /dev/null +++ b/modern-java/src/main/java/org/gym/fp/moderjava/WordCounterSpliterator.java @@ -0,0 +1,45 @@ +package org.gym.fp.moderjava; + +import java.util.Spliterator; +import java.util.function.Consumer; + +public class WordCounterSpliterator implements Spliterator { + private final String string; + private int currentChar = 0; + + public WordCounterSpliterator(String string) { + this.string = string; + } + + @Override + public boolean tryAdvance(Consumer action) { + action.accept(string.charAt(currentChar++)); + return currentChar < string.length(); + } + + @Override + public Spliterator trySplit() { + int currentSize = string.length() - currentChar; + if (currentSize < 10) return null; + for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) { + if (Character.isWhitespace(string.charAt(splitPos))) { + Spliterator spliterator = new WordCounterSpliterator( + string.substring(currentChar, splitPos)); + currentChar = splitPos; + return spliterator; + } + } + return null; + } + + @Override + public long estimateSize() { + return string.length() - currentChar; + } + + @Override + public int characteristics() { + return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE; + } + +}