Implementing request reply in spring kafka

When you implementing the event-driven system with apache kafka it may be needed to integrate with external systems, that require respecting their old-fashioned request-reply semantics. So you basically need to respond to them in the same thread.

Imagine the situation where this external system initiates a process for checking some information about a person: documents ids, photos, name, etc. For simplicity, it will be one service: person-checker-api. In event-driven systems, we have an opportunity to subscribe to some events, so when an external system will initiate a checking, we can write information about a person from checking event in other services, same applies to check result, there is no direct reference between services.

request-reply-diagram.jpg

Out external system gateway(gateway-api) will fire a checking event to “Person.Check.Initiated” topic and checked result will be in another topic - “Person.Checked”, which will be populated from person-checker-api. Remember, that our gateway-api need to respond in a synchronous manner.

Spring for apache kafka provides a ReplyingKafkaTemplate, but if you want to use it with multiple instances, you need to play with manual partition assignments. If you run your receiving service on a single instance, it’s definitely an option.

In case if you have multiple receiving instances(gateway-api) you need some shared store: database, distributed data grid, or maybe something else. In this example, I will use a database, but I recommend you to consider data grid, such as hazelcast. Person check results will be stored in the table, so in case of gateway-api will receive request for checking a person with the same id, you can just immediately return the previous result.

Info

In case of providing previously saved result directly from database, you need to care about stale data.

When gateway-api receives checking request, it’s sending event to “Person.Check.Initiated” topic and begins to polling for check information from his own table.

    @PostMapping("/check")
    //you may want to try/catch this
    @SneakyThrows({InterruptedException.class, ExecutionException.class, TimeoutException.class})
    public Boolean checkPerson(@RequestBody Person person) {
        kafkaTemplate.send("Person.Check.Initiated", person.getId(), new PersonCheckInitiated(person));
        return pollForCheckResult(person.getId()).get(50, TimeUnit.SECONDS).getCheckResult();
    }

    private CompletableFuture<PersonCheckResult> pollForCheckResult(Integer personId) {
        CompletableFuture<PersonCheckResult> checkResultCompletableFuture = new CompletableFuture<>();
        final ScheduledFuture<?> checkResultScheduledFuture = executor.scheduleAtFixedRate(() -> {
            log.info("Checking result for person with id: {}", personId);
            Optional<PersonCheckResult> optionalCheckResult = checkResultRepository.findByPersonId(personId);
            optionalCheckResult.ifPresent(checkResultCompletableFuture::complete);
        }, 1, 1, TimeUnit.SECONDS);
        //we don't want to run this future indefinitely
        executor.schedule(() -> {
            log.info("Cancelling check for person with id: {}", personId);
            checkResultScheduledFuture.cancel(true);
        }, 65, TimeUnit.SECONDS);
        //cancel polling when result is received
        checkResultCompletableFuture.whenComplete((personCheckResult, throwable) -> checkResultScheduledFuture.cancel(true));
        return checkResultCompletableFuture;
    }

Here we are using the power of scheduled future to periodically poll table for changes. These changes are writing from “Person.Checked” topic:

    @KafkaListener(topics = "Person.Checked")
    public void personCheckedReceived(PersonChecked personChecked) {
        PersonCheckResult personCheckResult = new PersonCheckResult();
        personCheckResult.setPersonId(personChecked.getPersonId());
        personChecked.setCheckResult(personChecked.getCheckResult());
        checkResultRepository.save(personCheckResult);
    }

And that’s all! Just listening to the needed topic, save information in shared store and periodically check for it.

You can check the example with h2 database here: kafka-sync-example

To test it you need to initiate checking:

curl -X POST http://localhost:8080/check -H 'Content-Type: application/json' -d '{"id": 52, "firstName": "Test", "lastName": "Test"}'

And emulate check result by posting an event to the topic:

echo '{"personId": 52, "checkResult": true}' |  kafkacat -P -b localhost:9092 -t Person.Checked -p -1