With Client REQUEST_ENTITY_PROCESSING set to CHUNKED I lose documents

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

With Client REQUEST_ENTITY_PROCESSING set to CHUNKED I lose documents

nddipiazza

Dear Group:

I created a Jersey stackoverflow quesiton and I was going to send it here to hopefully generate some traffic.


I pasted it below. Can someone take a look? Thanks!


I have a REST web service that runs on Jetty. I want to write a Java client that chunks along a huge batch of documents to that rest service using the same web connection.

I was able to establish an Iterator based streaming approach here:

Sending a stream of documents to a Jersey @POST endpoint

This does not work unless you set clientConfig.property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.CHUNKED); because the Content-length is unknown.

While somewhat working, the chunked transfer seems to lose a few documents. For example:

num_docs 500000
numFound 499249

Maybe it's sending chunks like:

{some:doc}, {some:doc}, {some:doc}, {some:doc}, {some:doc}, {some:doc}, {some:do

So I'm losing some each time at the ends?

How do I make it not do that? Any ideas what else might be happening?

    ClientConfig clientConfig = new ClientConfig();
    clientConfig.property(ClientProperties.CONNECT_TIMEOUT, (int)TimeUnit.SECONDS.toMillis(60));
    clientConfig.property(ClientProperties.REQUEST_ENTITY_PROCESSING, RequestEntityProcessing.CHUNKED);
    clientConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 100);
    clientConfig.property(ApacheClientProperties.CONNECTION_MANAGER, HttpClientFactory.createConnectionManager(name,
      metricRegistry, configuration));
    ApacheConnectorProvider connector = new ApacheConnectorProvider();
    clientConfig.connectorProvider(connector);
    clientConfig.register(new ClientRequestFilter() {
    @Override
    public void filter(ClientRequestContext requestContext) throws IOException {
      List<Object> orig = requestContext.getHeaders().remove(HttpHeaders.CONTENT_LENGTH);
      if (orig != null && !orig.isEmpty()) {
        requestContext.getHeaders().addAll("Length", orig);
      }
    }
    });
    clientConfig.register(new ClientRequestFilter() {
    @Override
    public void filter(ClientRequestContext requestContext) throws IOException {
      if (requestContext.getMediaType() != null &&
          requestContext.getMediaType().getType() != null &&
          requestContext.getMediaType().getType().equalsIgnoreCase("multipart")) {
        final MediaType boundaryMediaType = Boundary.addBoundary(requestContext.getMediaType());
        if (boundaryMediaType != requestContext.getMediaType()) {
          requestContext.getHeaders().putSingle(HttpHeaders.CONTENT_TYPE, boundaryMediaType.toString());
        }
        if (!requestContext.getHeaders().containsKey("MIME-Version")) {
          requestContext.getHeaders().putSingle("MIME-Version", "1.0");
        }
      }
    }
    });