Flow like water

This post will have a lot of code and Java at that, so warning has been given. I happened on this little gem in the Java standard library that allows you to make a nice asynchronous model of execution with some guidance. Only the thing missing from the documentation of the Java standard library, in contrast to the one provided by Python, are examples on how to use this one. There is an example on the main page of the package. So this is my attempt at creating the implementation of these interfaces and hooking them up together based on the wordings in the documentation.

The gem I found is in the java.util.concurrent.Flow package and consists of Producer, Subscriber and a Subscription with an optional Processor.

The end goal

We will make a machine that will find prime numbers for us using the Sieve of Eratosthenes. The method in its simplicity is the fact that once you found a prime number, any multiple of that prime cannot be prime.

This will utilize the above components and I will explain the concepts as I also show the code.

Overview

The root directory structure and files will be as follows:

processors
publishers
subscribers
subscriptions
util
CyclicRingBuffer.java
Main.java

Of course the CyclicRingBuffer stands out, we will get back to that one shortly.

First will be the util ones as they are small and concise and easy to explain.

util

In util there are three files:

ConcurrentUtil.java

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConcurrentUtil {

    public static ExecutorService executorService =   Executors.newWorkStealingPool();
}

This particular static method will just get us a ExecutorService so we can schedule work to be done in the background. You put tasks on a queue of sorts and the jobs will get executed for you in the background.

PrimeUtil.java

import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;

public class PrimeUtil {

    public static List<Integer> primes = new CopyOnWriteArrayList<>();
    public static AtomicInteger demand = new AtomicInteger(Integer.parseInt(System.getenv().getOrDefault("PRIMEFINDER_DEMAND", "100")));

    static {
        primes.add(2);
    }
}

So this class exists to hold the primes found so far with a concurrent thread safe CopyOnWriteArrayList so that simultaneous operations do not matter. Then we have an AtomicInteger that gets made by an environment variable or a default of 100. This demand is our end goal of how many primes to find.

Then we need to prime, pun intended (always own your puns), the list by putting in 2 as that will make our lives way easier down the line.

StatisticsUtil.java

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class StatisticsUtil {

    public static Map<String, Integer> stats = new ConcurrentHashMap<>();
}

So this class is there to keep a simple ConcurrentHashMap filled with the name of the class and how many primes they found.

That is all for the util . Let's dive into the more fun parts.

Subscription

First we will start with the subscription. Inside the subscriptions package there is one file named IntegerSubscription.java . What a subscription is, is a class that produces some output on one of the method calls you have to implement, namely request(long l) and can be cancelled by calling cancel(). This cancelling means no more messages can flow further into the subscribers.

If l is smaller than 0 in the request function then an error needs to be produced.

So the subscription has a method called request and it gets called by a subscriber .

public class IntegerSubscription implements Flow.Subscription  {

    private IntegerPublisher integerPublisher;
    private Iterator<Integer> integerStream;
    private boolean isCanceled = false;

    public IntegerSubscription(IntegerPublisher integerPublisher) {
        this.integerPublisher = integerPublisher;
        this.integerStream = Stream.iterate(2, i -> i + 1).iterator();
    }

    @Override
    public void request(long l) {
        if ( l < 0) {
            integerPublisher.publish(new IllegalArgumentException("Have to request a positive number of elements"));
        }
        for(long i = 0; i < l; i++) {
            if(isCanceled) {
                break;
            }
            executorService.submit(() -> integerPublisher.publish(integerStream.next()));
        }
    }

    @Override
    public void cancel() {
        isCanceled = true;
    }
}

There are a few parts, so first in the constructor we see a Publisher more on that later. Then we see a way to produce an infinite stream of Integers. Then in request we see the actual work by giving the publisher the next integer in the stream to publish. This gets given to the ExecutorService we saw earlier in order to put this on the backlog of things to execute.

Publisher

So the publisher is the part that actually glues the subscriptions and the subscribers. There is a little more logic in here, but not a whole lot more. Inside the publishers package there are two files:

import java.util.concurrent.Flow;

public interface IntegerPublisher extends Flow.Publisher<Integer> {

    void publish(int i);

    void publish(IllegalArgumentException e);
}

This first file is just an interface with two methods that are not needed by the Flow framework but exist for my convenience and to make it easier to follow everything.

import CyclicRingBuffer;
import subscriptions.IntegerSubscription;

import java.util.Objects;
import java.util.concurrent.*;
import static util.ConcurrentUtil.executorService;

public class IntegerPublisherImpl implements IntegerPublisher  {

    private final IntegerSubscription subscription;
    private final CyclicRingBuffer<Flow.Subscriber<? super Integer>> subscribers;

    public IntegerPublisherImpl() {
        this.subscription = new IntegerSubscription(this);
        subscribers = new CyclicRingBuffer<>();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        subscribers.push(subscriber);
        executorService.submit(() -> subscriber.onSubscribe(this.subscription));
    }

    private void shutdown(boolean cancel) {
        if (cancel) {
            this.subscription.cancel();
        } else {
            for (Flow.Subscriber<? super Integer> subscriber : subscribers) {
                subscriber.onComplete();
            }
        }

        executorService.shutdown();

    }

    public void publish(int i) {
        executorService.submit(() -> Objects.requireNonNull(subscribers.poll()).onNext(i));
    }

    public void publish(IllegalArgumentException e) {
        executorService.submit(() -> Objects.requireNonNull(subscribers.poll()).onError(e));
    }

    public void sendShutdown(boolean cancel) {
        this.shutdown(cancel);
    }
}

So this class binds Subscriptions and Subscribers together. Well a subscription will produce an output and call the publish function to publish it to the subscribers of this subscription. The CyclicRingBuffer here will be explained later. Furthermore we have an actual method called subscribe that takes in a Subscriber with a certain bounded type, in my case Integer. We give the subscriber in question the subscription to subscribe to and then the circle is complete and things can get moving.

In the publish method the onNext gets called of the subscriber. That will be the actual processing logic of what we want to do with our Integers. So far we only saw who produces them (Subscription) and who syndicates them (Publisher) and now we see who processes them.

Subscriber

The subscriber requests data from the subscription in question. So the onSubscribe initialises the requesting of data from the subscription. Then the onNext will hold the main logic and at the end always call more request of the subscription if you want to keep things in motion. There is also onComplete when all is said and done and onError which should be called anytime an error occurs in the chain and needs to be propagated to the actual subscriber.

import util.StatisticsUtil;
import java.util.UUID;
import java.util.concurrent.Flow;
import java.util.logging.Level;
import java.util.logging.Logger;

import static util.PrimeUtil.demand;
import static util.PrimeUtil.primes;
import static util.ConcurrentUtil.executorService;

public class PrimeFinder implements Flow.Subscriber<Integer> {

    private final String name;
    private Flow.Subscription subscription;
    private static Logger LOGGER;

    public PrimeFinder() {
        this.name = String.format("%s-%s", this.getClass().getName(), UUID.randomUUID());
        LOGGER = Logger.getLogger(this.name);
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscription.request(1);
        this.subscription = subscription;
        StatisticsUtil.stats.putIfAbsent(this.name, 0);
    }

    @Override
    public void onNext(Integer integer) {
        LOGGER.log(Level.FINE, String.format("I'm %s working on %d", name, integer));

        if (primes.parallelStream().noneMatch(p -> integer % p == 0)) {
            if(demand.getAndDecrement() > 0) {
                primes.add(integer);
                LOGGER.log(Level.INFO, String.format("%d found by me: %s", integer, name));
                int newCount = StatisticsUtil.stats.get(this.name) + 1;
                StatisticsUtil.stats.put(this.name, newCount);
            }
        }
        if(demand.get() > 0) {
            executorService.submit(() -> subscription.request(1));
        }
    }

    @Override
    public void onError(Throwable throwable) {
        LOGGER.log(Level.SEVERE, throwable.getMessage());
    }

    @Override
    public void onComplete() {
        LOGGER.log(Level.INFO, name + " is done");
    }
}

So the onError and onComplete methods are just logging methods. The onSubscribe kickstarts the whole operation. The first thing that happens is request from the subscription in question and then setting that subscription locally to this instance so we can request more from it later on. Then finally register this instance in the StatisticsUtil so the score can be kept on who got how many primes.

Then onNext gets an integer, via the subscription through the publisher, and first checks if it does not occur in the Sieve. This line does all the work:

primes.parallelStream().noneMatch(p -> integer % p == 0)

The explanation is turn the primes list into a parallel stream. Then check if none match the expression given afterwards. The expression will check if the number to be worked on modulo the prime will give 0. This means that the integer given can be divided exactly by the prime and therefore it is not a prime. If the integer cannot be divided by any of the primes found so far then we have a new prime!!

Then it gets and decrements the demand in order to see if we still have to find more primes. If so then add it to the stats and the list of found primes.

Then a new check to demand is made, some other subscriber could have found the last one in the mean time, and then requests more from the subscription if more need to be found.

CyclicRingBuffer

The CyclicRingBuffer is a data structure that lets you cycle through a list endlessly and so it makes perfect sense in this case as I want to infinitely round robin the subscribers to publish the new data to them. How to implement it is by extending a well known class in Java and then overriding a few key methods.

import java.util.concurrent.ConcurrentLinkedDeque;

public class CyclicRingBuffer<E> extends ConcurrentLinkedDeque<E> {

    public E poll() {
        E e = super.pollFirst();

        assert e != null;

        this.offerLast(e);
        return e;
    }

    public E pollFirst() {
        E e = super.pollFirst();

        assert e != null;

        this.offerLast(e);
        return e;
    }

    public E pollLast() {
        E e = super.pollLast();

        assert e != null;

        this.offerFirst(e);
        return e;
    }

}

So the Deque is a queue where you can add both at the front and the back and take from both sides. It is a Double Ended Queue (Deque). So these three methods just implement that logic. If you take from the front you add the element to the back again and also if you take from the rear add it to the front.

Main

The main file puts everything together by hooking up all the necessary parts. Just remember, the flow is Subscriber requests data from a Subscription that should ask the Publisher to publish to all who are subscribed to that particular Subscription.

import publishers.IntegerPublisherImpl;
import subscribers.PrimeFinder;
import util.PrimeUtil;
import util.StatisticsUtil;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;


public class Main {
    final static int totalSubscribers = Runtime.getRuntime().availableProcessors();
    private static final Logger LOGGER = Logger.getAnonymousLogger();

    public static void main(String[] args) throws InterruptedException {
        IntegerPublisherImpl integerPublisher = new IntegerPublisherImpl();
        List<Flow.Subscriber<Integer>> subscribers = new ArrayList<>();

        for (int i = 0; i < totalSubscribers; i++) {
            subscribers.add(new PrimeFinder());
        }
        subscribers.forEach(integerPublisher::subscribe);

        while (PrimeUtil.demand.get() > 0 ) {
            TimeUnit.MILLISECONDS.sleep(100);
        }
        integerPublisher.sendShutdown(false);
        StatisticsUtil.stats.forEach((name, count) -> LOGGER.log(Level.INFO, String.format("%s found %d", name, count)));
        LOGGER.log(Level.INFO, PrimeUtil.primes.toString());
    }
}

This code gets the available processors and creates that many subscribers. Then each of those subscribers get the same subscription to the Integer stream. Then there is a sleep moment until all is found and they are then being nicely shutdown and the statistics of who found what is then printed out.

Why not both

So in order to chain things you can create Processors. They are subscribers and publishers at the same time, but of potential different types. For example a Publisher of Strings but a Subscriber of Integers. In other words you put String things in and out should come Integers to the next Subscriber or Processor .

The main benefit is you can transmute things but also have one Processor start off many other chains with the same data. For example one word can be counted for in length, how many vowels, what kind of word it is, what language and so on. As this processor is just a hub to kick off many other flows, it makes more sense maybe to think of our Prime finder example and then just extend it by operating on the primes found.

Seeing if we found sexy primes or cousin primes for example.

#code #java