Primeros pasos en programación reactiva, III

En esta serie de post acerca de programación reactiva voy a ir contando los pasos que estoy dando para ir practicando con esta forma de programación y más en concreto su uso en aplicaciones HTTP con Micronaut

En el post anterior (reactivex-2.html) nos centramos en la parte servidora y vimos cómo crear un Single (y devolverlo como retorno de la función) para así delegar en el subsistema la ejecución de nuestro código de forma asíncrona.

En este post nos vamos a centrar en la parte consumidora viendo cómo podemos usar el modelo reactivo en la construcción de un API que consuma endpoints remotos. Para ello vamos a usar los siguientes recursos públicos:

Modelos

Vamos a construir el modelo que representa a cada uno de los endpoints

public class Astronaut {
    public String name;
    public String craft;

    @Override
    public String toString(){
        return name+" "+craft;
    }
}

public class Astros {

    public String message;
    public int number;
    public List<Astronaut>people;

}
public class IssPosition {

    public double latitude;
    public double longitude;

    public String toString(){
        return "lat:"+latitude+" log:"+longitude;
    }
}

public class Iss {

    public IssPosition iss_position;

    public String toString(){
        return "current position "+iss_position;
    }
}

Así como el modelo que va a devolver nuestra API, que no es más que la unión de ambos

public class Nasa {
    public Astros astros;
    public Iss iss;

    public Nasa astros(Astros astros){
        this.astros=astros;
        return this;
    }

    public Nasa iss(Iss iss){
        this.iss=iss;
        return this;
    }
}

ClientS

Por otra parte vamos a definir nuestros interfaces Client Micronaut que nos permiten recuperar dichos modelos

@Client("http://api.open-notify.org/")
public interface AstrosClient {

    @Get("/astros.json")
    Single<Astros>getAstros();

}
@Client("http://api.open-notify.org/")
public interface IssClient {

    @Get("iss-now.json")
    Single<Iss> getIss();

}

Service

Para recuperar ambos modelos (ISS y Astros) y devolverlos como uno sólo (Nasa) vamos a crear un servicio NasaService el cual va a ofrecer 3 formas diferentes de hacerlo:

  • block, primero va a llamar a uno de los endpoints de forma bloqueante y después al otro de la misma forma

  • chain, primero va a llamar a uno de lso endpoints de forma reactiva y cuando se complete la llamada se llamará al segundo endpoint de la misma forma

  • zip, vamos a llamar a ambos endpoints a la vez de forma reactiva y cuando se completen ambos devolveremos el resultado

@Singleton
public class NasaService {

    @Inject
    IssClient issClient;

    @Inject
    AstrosClient astrosClient;

    public Single<Nasa> blockingGet() {
        ...
    }

    public Single<Nasa>chain() {
        ...
    }

    public Single<Nasa>zip(){
        ...
    }

}

Blocking

    public Single<Nasa> blockingGet() {
        return Single.create(emitter -> {
            Nasa nasa = new Nasa();
            nasa.astros = astrosClient.getAstros().blockingGet();
            nasa.iss = issClient.getIss().blockingGet();
            emitter.onSuccess(nasa);
        });
    }

En este modo, el servicio construye un objeto Nasa de respuesta y lo va completando según se van obteniendo las respuestas de los endpoints remotos. Una vez terminados se emite el objeto resultante

Como podemos intuir, este método será el más lento y propenso a errores pues en el mejor de los casos el total del tiempo necesario será la suma de ambas llamadas. Además, al esperar la respuesta de ambas llamadas estamos bloqueando el hilo por lo que otras solicitudes se quedarán encoladas.

Chain

    BiFunction<Nasa, SingleEmitter<Nasa>, Disposable> issFunc = (nasa, emitter)->
            issClient.getIss().subscribe(
                    iss ->
                            emitter.onSuccess(nasa.iss(iss))
                    ,
                    err->
                            emitter.onError(err)
            );

    BiFunction<Nasa, SingleEmitter<Nasa>, Disposable> astrosFunc = (nasa, emitter)->
            astrosClient.getAstros().subscribe(
                    astros ->
                            issFunc.apply(nasa.astros(astros), emitter)
                    ,
                    err->
                            emitter.onError(err)
            );

    public Single<Nasa>chain() {
        return Single.create(
                emitter -> astrosFunc.apply(new Nasa(), emitter)
        );
    }
NOTE

Para ayudar en la legibilidad de este método vamos a usar BiFunction aunque podríamos seguir usando clases anónimas lambdas

Básicamente definimos un método chain que crea un Single (como en caso block) y lo que hace es aplicar una función astrosFunc. Esta función simplemente se subscribe a la llamada asíncrona que recupera los astronautas del espacio y cuando se complete encadena a su vez otra subscripción al endpoint de la ISS. Cuando este segundo endpoint se completa se devuelve el resultado al emitter "original"

La idea es muy parecida al método anterior (aunque el código parece un poco más complicado) en el sentido de que vamos a llamar a dos funciones de forma seguida, por lo que el tiempo total será aproximadamente igual. Sin embargo, al no bloquear el hilo podremos aceptar más solicitudes.

Zip

    public Single<Nasa>zip(){
        return Single.zip(astrosClient.getAstros(), issClient.getIss(),
                (astros, iss) ->
                        new Nasa().astros(astros).iss(iss));
    }

Mediante zip un Single es capaz de poder llamar de forma simultánea a un número de observables y ejecutar una función con el resultado de todos ellos.

INFO

La lista de observables a ejecutar puede ser "fija", como en este caso, o una lista de ellos. En caso de fija podemos especificar hasta 10 (creo) observables y en la función a ejecutar recibiremos los resultados de forma explícita. En el caso de una lista, lo que obtendremos en la función es un array de Object y nos toca a nosotros interpretarlos

Controller

Básicamente el controller es un simple punto de entrada para cada tipo que llama directamente al servicio

@Controller("/nasa")
public class NasaController {

    @Inject
    NasaService service;

    @Get("/blocking")
    Single<Nasa>blockingGet() {
        return service.blockingGet();
    }

    @Get("/chain")
    Single<Nasa>chain() {
        return service.chain();
    }

    @Get("/zip")
    Single<Nasa>zip(){
        return service.zip();
    }

}

JMeter

Con estos 3 endpoints he preparado una prueba de carga usando JMeter y he ejecutado 100 peticiones simultáneas contra cada uno.

Blocking

Obtenemos una tasa de error muy elevada

SummaryReportBlock

Chain

Conseguimos no bloquear el hilo de llamada reduciendo los errores a cero

SummaryReportChain

Zip

No sólo conseguimos reducir los errores a cero sino que la diferencia de tiempo respecto de chain es significativa (casi la mitad)

SummaryReportZip

Resumen

Por el lado del consumidor vemos que utilizar programación reactiva nos ayuda a optimizar los recursos y mejorar los tiempos de respuesta

Follow comments at Telegram group Or subscribe to the Channel Telegram channel

2019 - 2021 | Mixed with Bootstrap | Baked with JBake v2.6.5