Duda con rest y rabbit en Java

Kaledros

Buenas.

Tengo una aplicación en Spring Boot que expone un endpoint. El tema es que ese endpoint genera un mensaje y lo envía a una cola de Rabbit, hace lo que tiene que hacer y lo recupero con un listener. Lo que no sé es cómo devolver desde el endpoint lo que recupera el listener.

El código es una cosa así:

@RestController
@RequestMapping("auth")
public class MyClass {

  @PostMapping(value = "/login", produces = MediaType.APPLICATION_JSON_VALUE)
  public Dto login(@RequestBody Dto dto, @RequestHeader HttpHeaders headers) {

  //Cosas

  rabbitTemplate.processAndSend(EXCHANGE, TOPIC, message);

  //Aquí debería ir el código que recupera el mensaje de la cola y lo devuelve hacia afuera.

  }

  @Bean
  MessageListener listenerAdapter(ApplicationContext context) {
    return new MessageListener() {
      @Override
      public void onMessage(org.springframework.amqp.core.Message message) {
        //Recupero el mensaje y lo convierto a Dto.
      }
    };
  }

}

¿Alguien sabe cómo recuperar la respuesta del mensaje que has enviado dentro del propio PostMapping y devolverla como response del endpoint?

Grasiah.

-Yepeto-

No creo que tal como esta eso lo vayas a poder hacer ahora mismo.

El endpoint de rest que tienes publicado lo deberías de convertir a uno asíncrono que quede esperando a la tarea que hagas en la parte de recepción del mensaje y que cuando esté último procese la tarea notifique al que esta esperando.

Mírate las librerías de asincronía de spring a ver si ves algo.

Otro consejo, envuelve las respuestas de tus endpoints en la clase Response. Así tienes mejor control de errores y de comunicación con quien consuma ese endpoint (mensajes, status, respuestas por defecto..)

pineda

conceptualmente el método login tendrá que esperar a que el listenerAdapter reciba el mensaje, y lo publique en algún lugar como una caché de primer nivel, y solo cuando exista el elemento (o pase el timeout) responder

Luego como lo implementes... posiblemente hacerte un pubsub simple

JuAn4k4

Algo estás haciendo mal conceptualmente. ¿Qué envías commands por un bus o algo?

Request/Response pattern en el bus deberías implementar, pero vaya que no se debería usar mucho.

1 respuesta
Kaledros
#4JuAn4k4:

Algo estás haciendo mal conceptualmente

Estoy segurísimo. El tema es que estoy perdido y ni siquiera sé lo que estoy haciendo mal.

JackSparow

Tu problema es mas conceptual?
Si quieres consumir la cola bajo demanda, no uses un listener, hazlo con un RabbitTemplate (o a más bajo nivel en el Channel si así lo requieres por temas de ack manual).

MTX_Anubis

No tiene rabbit una opción de hacer rpc calls? Realmente envía el mensaje y crea una cola temporal para recibir la respuesta y espera a que reciba esa respuesta. El consumer, en el mensaje tiene algo así como replyTo que la cola a la que tiene que devolver la respuesta.

Vamos hace casi 10 años tenía esto que te digo, ahora con Spring pues ni pajorela.

De todas formas lo que quieres es la última opción que haría yo y porque me obligara otro sistema al que no tuviera acceso xD

Kaledros

Vale, ya está claro.

Como decís, lo suyo es crear un listener dentro del endpoint que se quede escuchando la cola. Eso es lo que no tenía nada claro, pero se hace así:

@RestController
@RequestMapping("auth")
public class MyClass {

  @PostMapping(value = "/login", produces = MediaType.APPLICATION_JSON_VALUE)
  public Dto login(@RequestBody Dto dto, @RequestHeader HttpHeaders headers) {

  //Cosas

  rabbitTemplate.convertAndSend(EXCHANGE, TOPIC, message);

  org.springframework.amqp.core.Message responseMessage =  template.receive(QUEUE);

  return responseMessage.map(Dto.class, responseMessage);

  }

Ese responseMessage contiene un byte[] que se puede mapear al objeto que quieras y devolverlo en el return.

Obviamente luego hay que comprobar que es la respuesta que esperas (con el correlationId y tal), pero como mínimo ya recibo la respuesta y la devuelvo.

JuAn4k4

Para un request response, se crea una queue ad-hoc para el request en cuestión y se espera a que alguien haga el replyTo a esa cola. (En el request suele ir en un header el replyTo). Se suele evitar por el tema de crear quejes adhoc. Algunos fw lo hacen como con los connection pools y van rehusando colas para estas cosas.

D

Yo tengo la duda de si con jetty/tomcat eso funciona. No es blocante? No se deberia usar netty (spring webflux)?

Creo que he picado todo lo que habeis dicho y si puedo elegir me quedo con un endpoint que devuelva un id con metadatos y que sea responsabilidad del cliente el polling.

Con spring he usado webflux y sse para recibir datos push del server, te mantiene una conexión http abierta donde solo recibe del server. nse no lo uso mucho xq el navegador tiene un limite de SSE abiertos, para estos casos ahora uso stomp.

1 respuesta
Kaledros

Sí, lo normal para estos casos es usar webflux y con un poco de suerte es lo que acabaremos haciendo, pero esto es poco más que una POC aplicada al proyecto actual para pasar de rest a colas. En una versión anterior se usaba webflux y es lo que al final deberíamos volver a usar.

MTX_Anubis

#10 Si claro que es bloquea. Estoy seguro de que el clietne de Rabbit tiene la opción para que sea sencillo de hacer. De hecho

https://www.rabbitmq.com/tutorials/tutorial-six-java.html

FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

el fibonacciRpc.call es bloqueante y basicamente hace eso. Publica el mensaje, y espera a una respuesta en una queue que crea cuando se ha creado el cliente.

1 respuesta
D

#12 Si.

Me refería a que el thread del server http se bloquea por cada petición. Si hago 10k peticiones de login bloqueare el server.

1 respuesta
MTX_Anubis

#13 ah pero bueno, eso te pasa con cualquier server que gestione las peticiones en el hilo del server o que se lie a crear hilos/req. No hay mucha diferencia entre hacer eso de las colas y atacar a una BBDD salvo que aguantarás algunas req/sec más.

1 respuesta
D

#14 Claro y de ahi la pregunta. Netty vs jetty/tomcat.

edit: obviamente para la db yo uso conectores async.

Usuarios habituales

  • desu
  • MTX_Anubis
  • Kaledros
  • JuAn4k4
  • JackSparow
  • pineda
  • -Yepeto-