Betrayed by Java standard library

So I was working with Spring Webflux to fix some issues regarding processing items and not continually processing the same items. I ran into a nice deep issue that I fixed with a simple trick.

The problem

The application itself was quite simple, it did the following:

@Transactional
        public Flux<EventsCases> getCasesWithEvents() {
            return r2dbcEntityTemplate
                    .select(EventsCases.class)
                    .matching(
                            Query.query(
                                    Criteria.where("unix_timestamp").isNotNull()
                            )
                                    .sort(Sort.by(Sort.Order.asc("unix_timestamp")))
                                    .limit(20)
                                    .offset(0)
                    )
                    .all()
                    .doOnError(throwable -> LOGGER.error("Get cases with events failed", throwable));
        }

It got the first 20 items ordered by the timestamp. Nothing fancy. The problem is however. If those items caused an error when sending them to the 3rd party API they got stuck in the database. Until the same 20 items were being sent over and over and over again.

So I added a simple thing in the flow of processing.

if (e.getStatusCode().equals(HttpStatus.INTERNAL_SERVER_ERROR) ||
                e.getStatusCode().equals(HttpStatus.NOT_FOUND) ||
                e.getStatusCode().equals(HttpStatus.FORBIDDEN) ||
                e.getStatusCode().equals(HttpStatus.BAD_REQUEST)) {
            String id = oracleBuilderDto.getBuilder().getCaseDetails().getId();
            retriesMap.putIfAbsent(id, 0);
            retriesMap.computeIfPresent(id, (k, v) -> v + 1);
            if (retriesMap.get(id) > 5) {
                statisticsDto.getAcquire().tally(e.getStatusCode());
                unprocessedCasesService.addUnprocessedCase(id,
                                e.getRawStatusCode(),
                                statisticsDto.getAcquire().getUnixTimestamp(),
                                e.getResponseBodyAsString()
                        )
                        .subscribeOn(Daemon.SCHEDULER)
                        .subscribe();
                retriesMap.remove(id);
            }
        } else {
            statisticsDto.getAcquire().tally(e.getStatusCode());
        }

This basically just tries it 5 times and then adds it to the unprocessed cases table and in the original query I added an extra selection filter stating the id should not be in any of the unprocessed cases entities.

This suddenly gave a memory leak warning in my output.

The culprit

I found the culprit because of sheer determination. First off I will say that retriesMap is a simple ConcurrentHashmap<String, Integer> and so I was under the, apparently false, impression that this would be synchronized for me.

This issue told me otherwise. So it turns out non-synchronised get/put requests are a potential problem using this construct.

The fix

The fix I did was to wrap it using a private final Map<String, Integer> syncMap = Collections.synchronizedMap(new HashMap<>()); . This makes it nice to use. It contains an internal mutex which will keep things okay. .

if (e.getStatusCode().equals(HttpStatus.INTERNAL_SERVER_ERROR) ||
                e.getStatusCode().equals(HttpStatus.NOT_FOUND) ||
                e.getStatusCode().equals(HttpStatus.FORBIDDEN) ||
                e.getStatusCode().equals(HttpStatus.BAD_REQUEST)) {
            String id = oracleBuilderDto.getBuilder().getCaseDetails().getId();
            syncMap.putIfAbsent(id, 0);
            syncMap.computeIfPresent(id, (k, v) -> v + 1);
            if (syncMap.get(id) > 5) {
                statisticsDto.getAcquire().tally(e.getStatusCode());
                unprocessedCasesService.addUnprocessedCase(id,
                                e.getRawStatusCode(),
                                statisticsDto.getAcquire().getUnixTimestamp(),
                                e.getResponseBodyAsString()
                        )
                        .subscribeOn(Daemon.SCHEDULER)
                        .subscribe();
                syncMap.remove(id);
            }
        } else {
            statisticsDto.getAcquire().tally(e.getStatusCode());
        }

I changed to a HashMap as the synchronised part will take care making sure updates occur atomically.

Then the memory leak warning went away.