Flow like water diverted

I have gotten some experience using the Flow constructs in a real scenario and learned some lessons. Therefore I wanted to make some code changes to my original design.

Integer Subscription

In the subscription we will use a internal ExecutorService to handle the input.

import publishers.IntegerPublisher;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class IntegerSubscription implements Flow.Subscription  {

    private IntegerPublisher integerPublisher;
    private Iterator<Integer> integerStream;
    private final AtomicBoolean isCanceled = new AtomicBoolean();
    private final ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    public IntegerSubscription(IntegerPublisher integerPublisher) {
        this.integerPublisher = integerPublisher;
        this.integerStream = Stream.iterate(2, i -> i + 1).iterator();
    }

    @Override
    public void request(long l) {
        if ( l < 0) {
            integerPublisher.publish(new IllegalArgumentException("Have to request a positive number of elements"));
        }
        executorService.submit(this.handleRequests(l));
        
    }

    private Runnable handleRequests(long l) {
        return () -> {
            for (long i = 0; i < l; i++) {
                if (isCanceled.get()) {
                    break;
                }
                integerPublisher.publish(integerStream.next());
            }
        };
    }

    @Override
    public void cancel() {
        isCanceled.set(true);
        executorService.shutdown();
        try {
            executorService.awaitTermination(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Publisher

In the publisher take out the executorService parts. One of the major problems was actually that the publisher itself was on the executorService somewhere and so it would never shutdown. Also I did not call the onComplete and therefore the executorService would not shutdown correctly either.

import CyclicRingBuffer;
import subscriptions.IntegerSubscription;

import java.util.Objects;
import java.util.concurrent.*;

public class IntegerPublisherImpl implements IntegerPublisher  {

    private final IntegerSubscription subscription;
    private final CyclicRingBuffer<Flow.Subscriber<? super Integer>> subscribers;

    public IntegerPublisherImpl() {
        this.subscription = new IntegerSubscription(this);
        subscribers = new CyclicRingBuffer<>();
    }

    @Override
    public void subscribe(Flow.Subscriber<? super Integer> subscriber) {
        subscribers.push(subscriber);
        subscriber.onSubscribe(this.subscription);
    }

    private void shutdown(boolean cancel) {
        if (cancel) {
            this.subscription.cancel();
        } 
        for (Flow.Subscriber<? super Integer> subscriber : subscribers) {
           subscriber.onComplete();
        }

        executorService.shutdown();

    }

    public void publish(int i) {
        Flow.Subscriber<? super Integer> subscriber = subscribers.poll();
        if (Objects.nonNull(subscriber)) {
            subscriber.onNext(i);
        }
    }

    public void publish(IllegalArgumentException e) {
        Flow.Subscriber<? super Integer> subscriber = subscribers.poll();
        if (Objects.nonNull(subscriber)) {
            subscriber.onError(e);
        }
    }

    public void sendShutdown(boolean cancel) {
        this.shutdown(cancel);
    }
}

I learned that having everything on the ForkJoinPool with work stealing ethics causes a lot, but a lot, context switching and threads. So in my case the CPU just flared 100% on thread switching whilst not much was going on.

Subscriber

The subscriber will ask for Long.MAX_VALUE so it is essentially unbounded. Which is also in the documentation, so I should have just read that more carefully and think about it.

import util.StatisticsUtil;
import java.util.UUID;
import java.util.concurrent.Flow;
import java.util.logging.Level;
import java.util.logging.Logger;

import static util.PrimeUtil.demand;
import static util.PrimeUtil.primes;

public class PrimeFinder implements Flow.Subscriber<Integer> {

    private final String name;
    private static Logger LOGGER;

    public PrimeFinder() {
        this.name = String.format("%s-%s", this.getClass().getName(), UUID.randomUUID());
        LOGGER = Logger.getLogger(this.name);
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        StatisticsUtil.stats.putIfAbsent(this.name, 0);
        subscription.request(Long.MAX_VALUE);
    }

    @Override
    public void onNext(Integer integer) {
        LOGGER.log(Level.FINE, String.format("I'm %s working on %d", name, integer));

        if (primes.parallelStream().noneMatch(p -> integer % p == 0)) {
            if(demand.getAndDecrement() > 0) {
                primes.add(integer);
                LOGGER.log(Level.INFO, String.format("%d found by me: %s", integer, name));
                int newCount = StatisticsUtil.stats.get(this.name) + 1;
                StatisticsUtil.stats.put(this.name, newCount);
            }
        }
    }

    @Override
    public void onError(Throwable throwable) {
        LOGGER.log(Level.SEVERE, throwable.getMessage());
    }

    @Override
    public void onComplete() {
        LOGGER.log(Level.INFO, name + " is done");
    }
}

Main

In the main not a lot gets changed. Number of totalSubscribers gets doubled and sending a true to sendShutdown to make sure the executorServices in question are shutdown and waited for until done.

import publishers.IntegerPublisherImpl;
import subscribers.PrimeFinder;
import util.PrimeUtil;
import util.StatisticsUtil;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;


public class Main {
    final static int totalSubscribers = Runtime.getRuntime().availableProcessors() * 2;
    private static final Logger LOGGER = Logger.getAnonymousLogger();

    public static void main(String[] args) throws InterruptedException {
        IntegerPublisherImpl integerPublisher = new IntegerPublisherImpl();
        List<Flow.Subscriber<Integer>> subscribers = new ArrayList<>();

        for (int i = 0; i < totalSubscribers; i++) {
            subscribers.add(new PrimeFinder());
        }
        subscribers.forEach(integerPublisher::subscribe);

        while (PrimeUtil.demand.get() > 0 ) {
            TimeUnit.MILLISECONDS.sleep(100);
        }
        integerPublisher.sendShutdown(true);
        StatisticsUtil.stats.forEach((name, count) -> LOGGER.log(Level.INFO, String.format("%s found %d", name, count)));
        LOGGER.log(Level.INFO, PrimeUtil.primes.toString());
    }
}

This should flow way nicer and smoother. It should also provide a more even distribution amongst the subscribers. I have 8 subscribers and put in 1000 primes and it did it in close to 1 second. Which I find a nice result. It did not put a strain on my CPU as all other applications happily chugged along.

Final thoughts

I think about extending this into an example that has an error queue and so the publisher can also publish errors and log them. The nice thing is that in this case the publishing will still happen regardless of errors. The previous one would drop the subscriber as it would not request a new one, and therefore it had to wait a whole cycle until it was next to process a request and then it would request a new one. Let's say all errors happen in a row and all subscribers have to handle errors. Could happen, it is not unfathomable to know multiple errors happen in a row. Then the chain would die due to starvation of no new requests being made.

#code #java