Java 8 Streams Teil 4: Streams, Threads und sequentielle Verarbeitung – eine Gegenüberstellung

Für Sie interessant:

 

Java Erweiterungen

Sie erweitern Ihre bereits vorhandenen Kenntnisse in Java durch die Inhalte dieses Seminars.

Dies ist der vierte und letzte Teil einer Artikel-Serie zur Java 8 Streams.

In Teil 1 dieser Artikel-Serie wurde mit einem Exkurs in Lambdas die Initialisierung und Verwertung der Ergebnisse in das Thema eingeführt. Im zweiten Teil wurden bereits konkrete Basismethoden wie forEach, map und findFirst beleuchtet. Im dritten Teil folgten weitere Basismethoden, unter anderem limit, Skip und Match

Das könnte Sie auch interessieren: Java 8: Die wichtigsten Neuerungen für Entwickler

Das Beispiel der Primzahlenberechnung eignet sich sehr gut, um die Unterschiede zwischen einer sequentiellen, einer threadbasierten und einer streambasierten Implementierung darzustellen. Der einfachste Ansatz ist die sequentielle Berechnung. Für ein definiertes Intervall wird geprüft, ob die aktuelle Zahl durch eine bereits gefundene Primzahl teilbar ist, oder nicht. Die Prüfung erfolgt auf Basis der bereits gefundenen Primzahlen.

public static Set<Long> findPrimesSequentially() {
        Set<Long> primes = new HashSet<>();
        for(int currentCandidate=3; currentCandidate <= LAST_CANDIDATE_TO_CHECK; currentCandidate++){
            boolean currentCandidateIsPrime = true;
            for(Long prime : primes) {
                if(currentCandidate % prime == 0){
                    currentCandidateIsPrime = false;
                    break;
                }
            }
            if(currentCandidateIsPrime){
                primes.add(Long.valueOf(currentCandidate));
            }
        }
        return primes;
    }

}

Die folgende Abbildung  zeigt die sequentielle Implementierung als einfachsten Ansatz. Die Laufzeit ist dementsprechend hoch:

Laufzeit der sequentiellen Implementierung

Durch den einfachen Ansatz ist die Implementierung entsprechend leicht. Es sind keine größeren Schwierigkeiten zu erwarten, nur die Laufzeit ist nicht akzeptabel.

Bei einer parallelen Implementierung kann nun auf die Verwendung von Threads zurückgegriffen werden. Damit wird die Implementierung allerdings komplizierter und es müssen einige Bedingungen beachtet werden. Es wird eine Implementierung eines Threads für die Ermittlung der Primzahlen erstellt:

public class PrimeFinderThread extends Thread {

    public final int nrOfParallelThreads;
    public final long lastCandidateToCheck;

    public static long currentPosition = 2;
    public static Set<Long> primes = new HashSet<>();

    public PrimeFinderThread(int nrOfParallelThreads, long lastCandidateToCheck) {
        this.nrOfParallelThreads = nrOfParallelThreads;
        this.lastCandidateToCheck = lastCandidateToCheck;
    }

    @Override
    public void run() {
        long currentCandidate;
        while((currentCandidate = getNextCandidate()) <= lastCandidateToCheck){
            if(isPrime(currentCandidate)){
                synchronized (primes) {
                    primes.add(currentCandidate);
                }
            }
        }
    }

    private boolean isPrime(long currentCandidate) {
        boolean foundPrime = true;
        synchronized (primes) {
            for (Long prime : primes) {
                if (0 == currentCandidate % prime) {
                    foundPrime = false;
                    break;
                }
            }
        }
        return foundPrime;
    }

    public synchronized long getNextCandidate(){
        long nextCandidate = currentPosition;
        currentPosition++;
        return nextCandidate;
    }
}

Für eine sichere Implementierung werden die Ergebnisse als Klassenvariablen gehalten. So ist eine Aktualisierung durch die verschiedenen Threads möglich. Damit eine parallele Verarbeitung möglich ist, wird die aktuell zu prüfende Zahl ebenfalls als Klassenvariable gehalten und aktualisiert. Der Zugriff auf die Variable muss an dieser Stelle natürlich synchronisiert erfolgen, sodass keine Zahlen doppelt geprüft werden. Das Überprüfen der Zahl erfolgt ebenfalls synchronisiert, als Monitor dienen die ermittelten Primzahlen. Der zweite Teil der Implementierung beinhaltet das Starten der Threads sowie das Zusammenführen.

public static Set<Long> findPrimesUsingThreads() throws InterruptedException {
        final int nrOfThreads=5;
        List<PrimeFinderThread> threads = new ArrayList<PrimeFinderThread>(nrOfThreads){
            {
                for(int i=0; i<nrOfThreads; i++) {
                    add(new PrimeFinderThread(nrOfThreads, LAST_CANDIDATE_TO_CHECK));
                    get(i).start();
                }
            }
        };

        for(PrimeFinderThread primeFinderThread : threads){
            primeFinderThread.join();
        }
        return threads.get(0).primes;
    }

Zu erkennen ist, dass die Implementierung deutlich aufwändiger und komplizierter ist. Der Laufzeitgewinn ist allerdings deutlich bemerkbar:

Laufzeit der Implementierung mit Threads

Mit diesen zwei unterschiedlichen Methoden als Referenz ist zu sehen, dass die Implementierung mit Hilfe der Streams deutlich schlanker ausfällt:

public static long[] findPrimesUsingStreams() {
        LongStream primes = LongStream.range(2, LAST_CANDIDATE_TO_CHECK).parallel().filter(n -> LongStream.rangeClosed(2, (long) Math.sqrt(n))
                                                .noneMatch(divisor -> n % divisor == 0));
        return primes.toArray();
    }

public static long[] findPrimesUsingStreamsSequentially() {
        LongStream primes = LongStream.range(2, LAST_CANDIDATE_TO_CHECK).filter(n -> LongStream.rangeClosed(2, (long) Math.sqrt(n))
                        .noneMatch(divisor -> n % divisor == 0));
        return primes.toArray();
    }

Mit Hilfe eines Streams wird die Wertemenge erzeugt und mit zwei einfachen Lambdas eingegrenzt. Da das Umschalten zwischen einer parallelen Verarbeitung und einer sequentiellen Verarbeitung so leicht fällt, wurden direkt beide Methoden implementiert.

Abschließend ist zu sehen, dass die Laufzeiten sich deutlich von den vorherigen Implementierungen unterscheiden.

Laufzeit für die sequentielle Verarbeitung mit Hilfe von Streams

Laufzeit für die parallele Verarbeitung mit Hilfe von Streams

Fazit

Streams mindern Verschachtelungen im Kontrollfluss und können ein Ersatz für schwergewichtige Implementierungen sein. Gleichzeitig ist die Implementierung von Nebenläufigkeit leichter und in vielen Fällen schneller. Nicht erspart bleibt allerdings die Analyse, ob eine parallele Implementierung sinnvoll ist, oder nicht. Die Integration von Streams in bestehende Projekte ist relativ leicht aufgrund der erweiterten Schnittstellen, zum Beispiel auf den Collections. Die Wartbarkeit kann spürbar verbessert und es können in vielen Fällen einfachere Konstrukte gebildet werden.

Java 8 Streams Artikel-Serie

Bei diesem Blog-Eintrag handelt es sich um den vierten und letzten Teil der Artikel-Serie zu Java 8 Streams.

Den einleitenden Teil 1 des Artikels finden Sie hier .
Teil 2 über die Basismethoden forEach, map, findFirst und weitere finden Sie hier .
Teil 3 über die Basismethoden limit, Skip, Match und weitere finden Sie hier .