Streaming job using Kinesis connector fails

Problem

You have a streaming job writing to a Kinesis sink, and it is failing with out of memory error messages.

java.lang.OutOfMemoryError: GC Overhead limit exceeded
java.lang.OutOfMemoryError: Java heap space.

Symptoms include:

  • Ganglia shows a gradual increase in JVM memory usage.
  • Microbatch analysis shows input and processing rates are consistent, which means there are no issues with the source or processing.
  • A heapdump shows Java hashmaps occupying large spaces on the JVM heap and increasing over time.

Cause

The most common reason for this error is that you are closing the Kinesis client, but you are not closing the HTTP client.

For example, a new Kinesis client is usually created for every partition.

class KinesisSink extends ForeachWriter[SinkInput] {   private var kinesisClient: KinesisClient = _   override def open(partitionId: Long, version: Long): Boolean = {
    val httpClient = ApacheHttpClient
        .builder()
        .build()
    kinesisClient = KinesisClient
      .builder()
      .region(Region.of(region))
      .httpClient(httpClient)
      .build()
    true
  }
 override def process(value: KinesisSinkInput): Unit = {
    // Main process is here.
  }
 override def close(errorOrNull: Throwable): Unit = {
    kinesisClient.close()
  }
}

This sample code is calling kinesisClient.close() but it is not calling httpClient.close().

This means HTTP clients are being created, and using resources to open TCP connections, but are not getting terminated.

Solution

Ensure that you are closing HTTP clients when they are no longer required.

override def close(errorOrNull: Throwable): Unit = {
  client.close()
  httpClient.close()
}