lundi 22 mars 2021

Vert.x event bus performance issue (design problem)

I am still getting familiar with vert.x. Coming from Spring boot and Spring webflux background, I wanted to try out some basic stuffs that I used to do in Spring eco-system.

So my idea was writing an api exposed via a controller, which will delegate the actual work to a service . The only way I could think of achieving this in vert.x world was via utilizing event-bus. Here, my KeyValueServiceVerticle's getKeyValues method is supposed to fetch list of key-values from a publisher (keyValueRepository.findAllItems().items()) and send them back via event-bus to the original event publisher api. I am indeed getting the result (list of key-values) as expected but somehow I am not satisfied with the performance. I put some load in equivalent code of spring webflux and vert.x and my webflux implementation always performs better (higher RPS). Related repository: https://github.com/tahniat-ashraf/spring-boot-webflux-vert.x-comparison

Am I blocking the code somewhere? Is there a better vert.x way to achieve what I am trying to achieve?

Related code:

public class KeyValueController extends AbstractVerticle {

  @Override
  public void start() throws Exception {
    Router router = Router.router(vertx);
    router
      .route()
      .handler(BodyHandler.create());
    router.route()
      .handler(LoggerHandler.create(LoggerFormat.DEFAULT));
    router
      .route(HttpMethod.GET, "/keyValues")
      .handler(this::getKeyValues);

    vertx
      .createHttpServer()
      .requestHandler(router)
      .listen(6678);
  }

  private void getKeyValues(RoutingContext routingContext) {
    vertx
      .eventBus()
      .request(KeyValueServiceVerticle.GET_LIST_ADDRESS, new JsonObject(), messageAsyncResult ->
        routingContext.response()
          .putHeader("content-type", "application/json")
          .end((String) messageAsyncResult.result().body())
      );
  }
}

and

public class KeyValueServiceVerticle extends AbstractVerticle {

  public static final String GET_LIST_ADDRESS = "GET_LIST_KEY_VAL";
  private KeyValueRepository keyValueRepository;
  private DynamoConfiguration dynamoConfiguration;

  @Override
  public void start() throws Exception {
    dynamoConfiguration = new DynamoConfiguration();
    keyValueRepository = new KeyValueRepository("dev-paybill-key-value", dynamoConfiguration.getDynamoDBEnhancedClient());
    var eventBus = vertx.eventBus();
    eventBus
      .consumer(KeyValueServiceVerticle.GET_LIST_ADDRESS, this::getKeyValues);

  }

  private <T> void getKeyValues(Message<T> tMessage) {

    Observable.fromPublisher(keyValueRepository.findAllItems().items())
      .toList()
      .subscribe(tList -> {
        JsonArray jsonArray=new JsonArray(tList);
        tMessage.reply(jsonArray.encodePrettily());
      });
  }
}

Aucun commentaire:

Enregistrer un commentaire