icon-arrow icon-check icon-mail icon-phone icon-facebook icon-linkedin icon-youtube icon-twitter icon-cheveron icon-download icon-instagram play close close icon-arrow-uturn icon-calendar icon-clock icon-search icon-chevron-process icon-skills icon-knowledge icon-kite icon-education icon-languages icon-tools icon-experience icon-coffee-cup
Werken bij Integration & Application Talents
Blog 20/10/2023

Streaming multi-line CSV files in Apache Camel

Parallel processing

Recently I had to setup a Apache Camel route for processing CSV files. Because the files that have to be processed could potentially be very large, we decided that the best approach was to process the CSV files by streaming them line by line. Apache Camel provides out-of-the-box support for reading files, splitting them on line feeds and transforming their contents. This worked fine, until we ran into multi-line CSV records…

Whitehorses
Mike Heeren /
Integratie expert

“Default” setup

As mentioned, we used Apache Camel to setup a route for processing CSV files. We created a simple route with the from-endpoint describing where Apache Camel could poll for the CSV files. Next, we added a split operation to process all lines (regardless of which line feed characters are being used) and process those lines separately in a streaming manner:

from("direct:multi-line-csv-route")
    .split(body().tokenize("\r\n|\r|\n")).streaming()
        .to("direct:process-separated-csv-record");

Almost every topic about processing CSV files in Apache Camel proposes a solution similar to the one above. This also works fine, until you run into a situation where the CSV records themselves can contain line feeds as well.

Multi-line CSVs

The first question that arose, was whether multi-line CSV records are valid according to the CSV standards. Luckily, that was an easy question to answer. RFC 4180, which describes the common format for Comma-Separated Values (CSV) files, is very clear about this:

Fields containing line breaks (CRLF), double quotes, and commas should be enclosed in double-quotes. For example:

"aaa","b CRLF
bb","ccc" CRLF
zzz,yyy,xxx

Testing this scenario

Using the example from the RFC, it’s pretty simple to create a unit test for this scenario. In this test we use the exact example from the RFC as input for the route. Afterwards, we will check if the “direct:process-separated-csv-record” endpoint received the 2 records:

@Test
void testMultiLineCsv() throws Exception {
    // Given
    AdviceWith.adviceWith(context, "multiLineCsvRoute", adviseWith ->
            adviseWith.weaveByToUri("direct:process-separated-csv-record").replace().to(mockEndpoint)
    );
    context.start();

    // When
    producerTemplate.sendBody("direct:multi-line-csv-route", """
            "aaa","b
            bb","ccc"
            zzz,yyy,xxx
            """);

    // Then
    mockEndpoint.expectedMessageCount(2);
    mockEndpoint.assertIsSatisfied();
    assertThat(mockEndpoint.getExchanges())
            .map(exchange -> exchange.getIn().getBody(String.class))
            .containsAll(List.of(
                    "\"aaa\",\"b\nbb\",\"ccc\"",
                    "zzz,yyy,xxx"
            ));
}

When running the test, we can immediately see it failing:

java.lang.AssertionError: mock://endpoint Received message count. Expected: <2> but was: <3>

This is caused by the fact that the splitting behavior will now supply the following records:

  1. “aaa”, “b: This is an invalid record, because we have a non-closed quote, and also a missing third column.
  2. bb”,”ccc”: This is an invalid record, because it’s missing the first column (and part of the second column value).
  3. zzz,yyy,xxx: This is the only correct record in the example.

Improving the splitting behavior

The solution we came up with, was to keep splitting the CSV file per line. However, before actually processing the records, we want to check if the line contains a “full CSV record”. If this is not the case (for the first record in the test), we want to write it to a buffer first, and keep appending next lines until we have a full record. Once we have a complete record, we can proceed to the processing step.

Let’s start with adjusting the Camel route:

from("direct:multi-line-csv-route")
    /* 1 */ .process(FullCsvRecordBuffer::init)
    .split(body().tokenize("\r\n|\r|\n")).streaming()
        /* 2 */.choice()
            .when(hasFullCsvRecordInBuffer)
                /* 3 */ .process(FullCsvRecordBuffer::setBody)
                .to("direct:process-separated-csv-record")
                /* 4 */ .process(FullCsvRecordBuffer::reset)
            .endChoice()
        .end()
    .end()
    /* 5 */ .process(FullCsvRecordBuffer::validateEmpty);

Here we can see a couple of steps that have been added:

  1. Before the split statement we initialize a buffer object and assign it to the Exchange. It’s important that we perform this step before the split statement, otherwise the buffer object will not be “shared” across all splitted exchanges.
  2. The when/then combination has been added within the split. This will perform the actual check if we have a full CSV record in the buffer. We will come to this implementation later. If we don’t have a full record yet, we will just proceed with the next line immediately.
  3. When we do have a full record in the buffer, we have to replace the Exchange body with the entire buffer instead of just the last part.
  4. After we processed the record in the application, we want to reset the buffer. Otherwise the next record will also still contain this (already processed) record.
  5. Finally, after the split operation we will perform one last step so we’re sure that no (partial) record remained in the buffer.

Checking for full CSV records

Now that we have changed the Camel route to use the buffer object, it’s time to have a look at the logic for checking if the buffer contains a full record. We use a Predicate implementation, so it can be easily incorporated into the choice statement from (step 2 of) the Camel route.

Here, we will fetch the buffer object that we created in step 1 from the Camel route. The current body will be added to the buffer. If the buffer did not contain a full record (yet), we also append a new line character. We have to do this manually, because the actual line feed characters are “lost” due to the split statement in the route.

public class HasFullCsvRecordInBuffer implements Predicate {

    private static final byte[] NEW_LINE = new byte[]{'\n'};

    // ...

    @Override
    public boolean matches(final Exchange exchange) {
        final var buffer = FullCsvRecordBuffer.get(exchange);
        buffer.append(exchange.getIn().getBody(byte[].class));
        final var fullRecordInBuffer = hasFullRecordInBuffer(exchange, buffer);
        if (!fullRecordInBuffer) {
            buffer.append(NEW_LINE);
        }
        return fullRecordInBuffer;
    }

    // ...

}

For the actual check of the CSV records, we will just use the Apache Camel – CSV data format implementation. When creating an instance of the data format, don’t forget to register it as a service in the Camel context, otherwise the (un)marshalling operations will fail:

private final CsvDataFormat fullRecordDataFormat;

public HasFullCsvRecordInBuffer(final CamelContext context) {
    try {
        fullRecordDataFormat = new CsvDataFormat();
        context.addService(fullRecordDataFormat);
    } catch (final Exception ex) {
        throw new CsvProcessingException(ex);
    }
}

Now that we have a way to (attempt to) unmarshal CSV records, we can finally have a look at how the actual check can be performed:

private boolean hasFullRecordInBuffer(final Exchange exchange, final FullCsvRecordBuffer buffer) {
    try (final var inputStream = new ByteArrayInputStream(buffer.read())) {
        fullRecordDataFormat.unmarshal(exchange, inputStream);
    } catch (final UncheckedIOException ex) {
        if (ex.getMessage().contains("EOF reached before encapsulated token finished")) {
            return false;
        }
        throw new CsvProcessingException(ex);
    } catch (final Exception ex) {
        throw new CsvProcessingException(ex);
    }
    return true;
}

Here we will just attempt to unmarshal the entire buffer using the CSV data format. Since the only place where “additional” line feeds should occur is within a column, we don’t even have to check the amount of columns necessarily. As long as the unmarshalling operation didn’t throw an exception, we know it’s a valid CSV line.

If an UncheckedIOException was thrown, we will check the exception message for a certain error message. This is the message thrown by the org.apache.commons.csv.Lexer implementation. Unfortunately, this is not the nicest way to act on exceptions, but currently there is no other way to check the reason of the thrown exception. The main risk is that the error message would change in a future release of the commons-csv project. However, since we do cover this scenario in the unit tests, we would immediately be notified if this was the case. Therefor we accepted that risk.

Counting columns in the buffer

As mentioned above, it’s not strictly necessary to count the amount of columns in the buffer. However, if desired, this could be used to throw an exception when the buffer for some reason contains too many columns, for example.

When you want to implement this, keep in mind that you have to subtract the amount of joined columns from the total column count. Otherwise, if you would just count all columns in all lines, the initial example would be counted like below.

  1. Collection 1, List 1: “aaa”
  2. Collection 1, List 2: “b
  3. Collection 2, List 1: bb”
  4. Collection 2, List 2: “ccc”

This incorrectly results in 4 columns instead of 3 (because 2 and 3 are a single column, separated over multiple lines/Collections). So keep this in mind when counting the columns:

final var lines = (Collection<List<String>>) fullRecordDataFormat.unmarshal(exchange, inputStream);

private static int countColumns(final Collection<List<String>> lines) {
    final var totalColumnsInLines = lines.stream()
            .mapToInt(List::size)
            .sum();
    final var joinedColumns = lines.size() - 1;
    return totalColumnsInLines - joinedColumns;
}

Conclusion

When processing multi-line CSV files in a streaming manner, you have to implement some additional logic to buffer incomplete records before actually processing them. This buffering can easily be implemented using some of the components that come with Apache Camel by default.

As always, a full code example can be found on Bitbucket:
https://bitbucket.org/whitehorsesbv/splitting-multi-line-csv

Geen reacties

Geef jouw mening

Reactie plaatsen

Reactie toevoegen

Jouw e-mailadres wordt niet openbaar gemaakt.

Geen HTML

  • Geen HTML toegestaan.
  • Regels en alinea's worden automatisch gesplitst.
  • Web- en e-mailadressen worden automatisch naar links omgezet.
Whitehorses
Mike Heeren /
Integratie expert

Wil je deel uitmaken van een groep gedreven en ambitieuze experts? Stuur ons jouw cv!