Primeros pasos en programación reactiva, IV

En esta serie de post acerca de programación reactiva voy a ir contando los pasos que estoy dando para ir practicando con esta forma de programación y más en concreto su uso en aplicaciones HTTP con Micronaut

En el post anterior (reactivex-3.html) nos centramos en la parte consumidora viendo cómo podemos usar el modelo reactivo en la construcción de un API que consuma endpoints remotos.

En este post lo que vamos a tratar es el caso de una llamada a una función de negocio que puede fallar y cómo realizar reintentos dentro de un contexto asíncrono.

Servicio

Supongamos que tenemos un microservicio que dada una cadena la convierte a mayusculas, ordenas los caracteres por orden alfabético y devuelve la cadena resultante y que queremos llamar de forma asíncrona

@Get("/{word}") Single<String> reverse(final String word) { ... }

Usando Java Stream la "lógica de negocio" podría ser algo así

List<Character> list = word.chars().mapToObj(c -> (char) c)
    .map(Character::toUpperCase)
    .collect(Collectors.toList())
    .stream()
    .sorted()
    .collect(Collectors.toList());

list
    .stream()
    .map(c -> c.toString())
    .collect(Collectors.joining())

Para simular que la llamada a nuestra función de negocio puede fallar, vamos a incluir un random usando el instante de ejecución y si los milisegundos son pares lanzamos una exception, con lo que nuestra función de negocio completa sería algo así:

private String logicaNegocio(String word) {
    if ((System.currentTimeMillis() % 2) == 0) {
        System.out.println("Vamos a simular una exception de negocio");
        throw new RuntimeException("Milis es par");
    }

    List<Character> list = word.chars().mapToObj(c -> (char) c)
            .map(Character::toUpperCase)
            .collect(Collectors.toList())
            .stream()
            .sorted()
            .collect(Collectors.toList());

    return
            list
            .stream()
            .map(c -> c.toString())
            .collect(Collectors.joining());
}

Problema

La primera implementación que haríamos probablemente de nuestro microservicio haría algo parecido a:

@Controller("/")
public class EchoReverseController {

    @Get("/{word}")
    Single<String> reverse(final String word) {
        return Single.create(emitter -> {
            emitter.onSuccess( logicaNegocio(word) )
        }
    }

}

Y si la lógica de negocio falla devolveríamos un error a quien nos ha llamado para que vuelva a reintentarlo o lo que estime oportuno.

Sin embargo, digamos que lo que queremos es que el servicio sea lo más "robusto" posible y pueda contemplar estas excepciones y actuar en consecuencia siendo el caso que queremos ver hacer un reintento sin salir del contexto de la llamada.

Solución

Para resolver esta situación lo primero que se nos ocurrirá (a mí al menos), es hacer el típico bucle for con n reintentos y capturar las excepciones dentro de él. Obviamente esto no es la solución más elegante cuando las excepciones son debidas a problemas con recursos puesto que las estás invocando de forma cuasi instantáneas en cada vuelta del bucle.

Lo que queremos hacer es que mientras el servicio que nos ha llamado está esperando que se resuelva la llamada, nosotros poder invocar a la función con un delay de milisegundos-segundos-minutos-horas …​ y todo de forma asíncrona. Para ello usaremos el método defer de un Observable junto con retryWhen

@Get("/{word}")
Single<String> reverse(final String word) {
    return Single.create(emitter -> {
        Observable
                .defer( () -> Observable.just(logicaNegocio(word)) )
                .retryWhen(
                        f -> f.take(3).delay(new Random().nextInt(8), TimeUnit.SECONDS))
                .subscribe(
                        str -> emitter.onSuccess(str),
                        err -> emitter.onError(err)
                );
    });
}

Utilizando los métodos de reactive lo que estamos haciendo es:

  • devolver a Micronaut un Single para que lo ejecute de forma asíncrona

  • Crear un observable sobre nuestra lógica de negocio

  • Se ejecutará hasta 3 veces (f.take(3)) con un delay de entre 0 y 8 segundos

  • a la primera ejecución que vaya bien emitiremos el resultado

  • si no va bien despues de intentarlo n veces, emitiremos un error

Resumen

Con un poco de esfuerzo podemos extender la programación reactiva no sólo a las capas de "entrada" sino a las propias llamadas de negocio

Follow comments at Telegram group Or subscribe to the Channel Telegram channel

2019 - 2021 | Mixed with Bootstrap | Baked with JBake v2.6.7