Esempi sugli stream paralleli

This commit is contained in:
Fabio Scotto di Santolo
2019-11-26 19:33:42 +01:00
parent ab75cb8f6b
commit 5aac09213a
4 changed files with 148 additions and 0 deletions

View File

@@ -0,0 +1,44 @@
package org.gym.fp.moderjava;
import java.util.concurrent.RecursiveTask;
public class ForkJoinSumCalculator extends RecursiveTask<Long> {
public static final long THRESHOLD = 10_000;
private final long[] numbers;
private final int start;
private final int end;
public ForkJoinSumCalculator(long[] numbers) {
this(numbers, 0, numbers.length);
}
private ForkJoinSumCalculator(long[] numbers, int start, int end) {
this.numbers = numbers;
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start;
if (length <= THRESHOLD) {
return computeSequentially();
}
ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length / 2);
leftTask.fork();
ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length / 2, end);
Long rightResult = rightTask.compute();
Long leftResult = leftTask.join();
return leftResult + rightResult;
}
private Long computeSequentially() {
long sum = 0;
for (int i = start; i < end; i++) {
sum += numbers[i];
}
return sum;
}
}

View File

@@ -1,8 +1,12 @@
package org.gym.fp.moderjava;
import java.util.*;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static java.lang.System.out;
import static java.util.Arrays.asList;
@@ -10,6 +14,10 @@ import static java.util.Comparator.comparingInt;
import static java.util.stream.Collectors.*;
public class StreamTest {
private static final String SENTENCE =
" Nel mezzo del cammin di nostra vita " +
"mi ritrovia in una selva oscura" +
" ché la dritta via era smarrita ";
public static void main(String[] args) {
doStreamFilterDemo();
@@ -17,6 +25,7 @@ public class StreamTest {
doStreamFindOrMatchingDemo();
doStreamReducingDemo();
doStreamCollectingDemo();
doStreamParallelDemo();
}
private static void doStreamFilterDemo() {
@@ -302,6 +311,31 @@ public class StreamTest {
out.println("----------------------------------------");
}
private static void doStreamParallelDemo() {
out.println("ForkJoin result (1, 100) = " + forkJoinSum(100));
out.println("----------------------------------------");
Stream<Character> stream1 = IntStream.range(0, SENTENCE.length()).mapToObj(SENTENCE::charAt);
out.printf("Found %d words\n", countWords(stream1));
out.println("----------------------------------------");
Spliterator<Character> spliterator = new WordCounterSpliterator(SENTENCE);
Stream<Character> stream2 = StreamSupport.stream(spliterator, true);
out.printf("Found %d words\n", countWords(stream2));
out.println("----------------------------------------");
}
private static int countWords(Stream<Character> stream) {
WordCounter wordCounter = stream.reduce(new WordCounter(0, true),
WordCounter::accumulate,
WordCounter::combine);
return wordCounter.getCounter();
}
private static long forkJoinSum(int n) {
long[] numbers = LongStream.rangeClosed(1, n).toArray();
ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
return new ForkJoinPool().invoke(task);
}
public static Map<Boolean, List<Integer>> partitionPrimes(int n) {
return IntStream.rangeClosed(2, n).boxed()
.collect(partitioningBy(candidate -> isPrime(candidate)));

View File

@@ -0,0 +1,25 @@
package org.gym.fp.moderjava;
public class WordCounter {
private final int counter;
private final boolean lastSpace;
public WordCounter(int counter, boolean lastSpace) {
this.counter = counter;
this.lastSpace = lastSpace;
}
public WordCounter accumulate(Character c) {
if (Character.isWhitespace(c)) return lastSpace ? this : new WordCounter(counter, true);
else return lastSpace ? new WordCounter(counter + 1, false) : this;
}
public WordCounter combine(WordCounter wordCounter) {
return new WordCounter(this.counter + wordCounter.getCounter(), wordCounter.lastSpace);
}
public int getCounter() {
return this.counter;
}
}

View File

@@ -0,0 +1,45 @@
package org.gym.fp.moderjava;
import java.util.Spliterator;
import java.util.function.Consumer;
public class WordCounterSpliterator implements Spliterator<Character> {
private final String string;
private int currentChar = 0;
public WordCounterSpliterator(String string) {
this.string = string;
}
@Override
public boolean tryAdvance(Consumer<? super Character> action) {
action.accept(string.charAt(currentChar++));
return currentChar < string.length();
}
@Override
public Spliterator<Character> trySplit() {
int currentSize = string.length() - currentChar;
if (currentSize < 10) return null;
for (int splitPos = currentSize / 2 + currentChar; splitPos < string.length(); splitPos++) {
if (Character.isWhitespace(string.charAt(splitPos))) {
Spliterator<Character> spliterator = new WordCounterSpliterator(
string.substring(currentChar, splitPos));
currentChar = splitPos;
return spliterator;
}
}
return null;
}
@Override
public long estimateSize() {
return string.length() - currentChar;
}
@Override
public int characteristics() {
return ORDERED + SIZED + SUBSIZED + NONNULL + IMMUTABLE;
}
}