Elasticsearch data migration with Kafka

Dana Aonofriesei
Trustpilot Technology
6 min readNov 14, 2022

--

Addressing Elasticsearch tech debt

We have 3 AWS managed Elasticsearch clusters that run very old version of Elasticsearch (v.5.1). There have been multiple instances when blue-green deployments used by AWS on domain update and unplanned upgrades have been slow or failed altogether.

The clusters are used in the below event driven setup. There is an indexer process that consumes SQS events and writes the data to the ES cluster. An API is used to interact with the ES cluster to issue queries (see the below diagram). Note that the indexer also reads from the ES cluster to perform data updates, while the API is read only.

To resolve the above issue, we need to upgrade the ES cluster to a more recent version and perform data migration.

Solution

An obvious solution would be to create a new Elasticsearch cluster(s), migrate the data from the old clusters, update indexer process and the API to account for any breaking changes between the ES versions. However, we implemented a slightly more complex — and seemingly more costly — solution as outlined in the diagram below.

The indexer process routes the writes to AWS managed OpenSearch cluster through the Kafka Rest Proxy, which writes messages to Kafka topics within AWS MSK cluster. The messages are consumed from the Kafka topics by OpenSearch sink connectors and are written to a corresponding index in the cluster. There is a 1–1 mapping between a topic and index.

There are few reasons for implementing this more complex solution:

  • Having ground truth data in Kafka allows us to relatively easily support future migrations and replications; we would need to register a new connector and provide a new destination. It decouples the data from Elasticsearch and allows skipping multiple ES versions while migrating.
  • Controlling OpenSearch cluster’s indexing rate by smoothing the traffic and using the Kafka cluster as asynchronous buffer.
  • Flexibility of setting up multiple OpenSearch clusters with different settings or index layouts.
  • Having complete history of a document along with the timeline of the changes in the Kafka topic.
  • Moving closer towards AWS managed Kafka offering (MSK) instead of self-managing our own cluster. The above migration has been used as a feasibility exercise for migration of our existing self-managed Kafka clusters to AWS MSK.

The disadvantages of the above architecture are increased infrastructure costs.

The migration process

Once all of the infrastructure components were in place, we followed the below steps to complete the migration:

  1. Stop the indexer process to avoid race conditions and, subsequently, data inconsistency in the OpenSearch indices. We had a plan for a zero downtime migration and it was more involved, but our end users agreed to accommodate situations where the most recent data is not available. They could still work in read only mode.
  2. Run multiple instances of a backfill process to copy data from old ES 5.1 clusters to a new OS 1.0 cluster via the Kafka Rest Proxy setup above. Before running backfill process, we scaled up OpenSearch and optimized index settings for better write performance. Parallel backfill processes were created based on indices and date based grouping within an index to minimise the downtime.
  3. Once the backfill completed, the indexer was restarted; it synced messages that queued up in SQS. We had 6 parallel backfill jobs reading from Elasticsearch and writing to OpenSearch. The backfill process took 2.5 hours to transfer ~400 million documents.
  4. Once the SQS queue reached manageable levels we downscaled OS cluster to a desirable state and restored index settings.

Other tools and technologies used

All of the infrastructure and data resides in AWS. Cloudformation was used to provision most of the infrastructure: OpenSearch, MSK cluster, Fargate based stacks for backfill process, Kafka Rest Proxy and Kafka UI. MSK Connect components were setup using CLI as at the time the cloudformation was not supported.

For access control between components we used IAM whenever supported. Unfortunately, sink connectors did not have IAM support for OpenSearch so we had to fall back to basic authentication as it was the simplest given time and resource constraints.

Since OpenSearch and Elasticsearch Sink Connector are incompatible, we used opensource OpenSearch Sink Connector as a plugin. The documents are stored as json in Kafka topics. The connector was setup with write.method:upsert (see configuration properties) to allow for both writes and updates of OpenSearch documents. We setup sink connectors with tasks.max value of 6 since our Kafka topics were setup with 6 partitions, to allow for parallelism without overhead of task-partition assignment.

Cloudwatch was used for observability and monitoring. AWS provides a lot of metrics out of the box. Max time lag and Sum offset Lag were very useful for checking whether connectors are keeping up with the load.

Obviously, there were a lot of breaking changes between ES5.1 and ES7.10 (~ OpenSearch 1.0) and we had to make appropriate changes in both the indexer and consumer API code.

Challenges

There were a few hurdles along the way. For example, kafka.t3.small MSK broker types are not really usable for development and prototyping when connectors and Kafka Rest Proxy are setup with IAM. We experienced timeouts and connection errors despite configuring the reconnection back-offs to large values. So we had to switch to kafka.m5.large.

It was a bit of a surprise to find out that OpenSearch is not compatible with Elasticsearch Sink Connector. As mentioned above, we had to use an opensource adaptation of the connector. Initially we also configured OpenSearch with IAM, however, we had to fall back to basic authentication at the end since IAM is not supported in OS/ES Sink Connectors.

Despite the fact that plugins property of a connector can be an array, at the time of our development (January 2022), it did not support more than one value.

At the time, MSK also did not support deletion of plugins and worker configurations. So we ended up with a lot of obsolete artefacts. However, plugin deletion is now supported. There is also no easy way to manage topics in the cluster. So we used a combination of Kafka Rest Proxy and Kafka UI. It would be great to have support for IAM in Kafka CLI tools to perform common topic management tasks.

Cloudwatch logs were not descriptive enough to resolve IAM permission related issues. So we often had to resolve to locally deployed connector (that was setup to communicate with the remote MSK cluster) to get more insight into an issue.

Results

As a result of the migration to the newer version of Elasticsearch (i.e. OpenSearch 1.0), we addressed a major technical debt in our infrastructure. Since the upgrade, we have not experienced any outages and deadlocked cluster upgrades. We have been able to scale up and down the cluster multiple times to address various business needs.

As an added benefit, we resolved multiple previously unknown memory leak issues in the indexer codebase. Right after the upgrade, we started observing an increased number of 500 errors in OpenSearch. The error logs were not immediately clear (ServerError: Type: search_phase_execution_exception Reason: "all shards failed"), but stack traces led us to the code that was using scroll API. We tried to increase search.max_open_scroll_context to resolve the issue, but AWS does not allow this. However, you can request this change from AWS support with the caveat that the default value of 500 will be restored upon any upgrade. Upon closer inspection of code, we noticed that scroll contexts were never closed. Once the fix was in place, the indexer process became leaner supporting more horizontal scaling. This has improved the overall throughput of our pipeline as well.

Shout out to Assulan and David for the success of this initiative!

--

--