Skip to content

Separate Elastic queue for embeddings

Madelein van Niekerk requested to merge 457724-embedding-queue into master

What does this MR do and why?

Creates a new queue for indexing embeddings into Elasticsearch.

There are currently two queues: initial and incremental. We chose to add a separate queue for embeddings because:

  1. We need to have control over how many items we process from the queue. The other queues process 1000 items at a time but for embeddings, we need to comply with rate limits.
  2. We need more control over the indexing process.
  3. Embeddings can take longer to be processed than normal ES data.
  4. If an embedding fails to be generated/indexed, we will put the item back on the embedding queue. This is okay because the order of processing does not matter when indexing embeddings.

This MR

The following changes are present in this MR:

  • Creating a new ProcessEmbeddingBookkeepingService which inherits from ProcessBookkeepingService. This service is responsible for adding and fetching items from the queue.
  • Creating a ElasticIndexEmbeddingBulkCronWorker which inherits from BulkCronWorker. This worker will be scheduled to run every minute on a cron and enqueues 16 concurrent workers which call the service to fetch and process items from the queue. The other BulkCronWorkers re-enqueue themselves if there are items left in the queue but we added a feature flag to control if we want to enable re-enqueueing for the new worker to have more control: [Feature flag] Rollout of `embedding_cron_worke... (#463762).

Rate limiting

  • ApplicationRateLimiter already defines a threshold of 450 over 1 minute for :vertex_embeddings_api. We need to share this resource with Duo chat documentation embeddings. The vertex API limit is 1500 tokens per minute.
  • We define a threshold of 70% of the ApplicationRateLimiter rate limit, which equals to 315 per minute. This means there are 135 tokens per minute available for duo documentation if we max out our tokens.
  • We will not process any embeddings from the queue if the endpoint is throttled: we will return early and wait until the next cron run.
  • shard_limit is the amount of items we process from the queue during an execution. It is set to (threshold / self::SHARDS.count).to_i: For 16 shards, that's 19 items per shard.
  • In the follow-up MR, we will increment the rate limit whenever we generate an embedding and return early if it is throttled.

To summarize: every minute we will have 16 concurrent processes each generating 19 embeddings at most and will always check that we don't exceed 315 embeddings per minute using the ApplicationRateLimiter cache.

Follow-up MR

In Embedding Elasticsearch reference (!154297 - merged) we are adding a new Embedding Reference which defines how items are stored in the queue and add items to the queue based on callbacks. We also added a cron schedule for the bulk cron worker to run.

MR acceptance checklist

Please evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.

How to set up and validate locally

At the moment, the code does nothing. The specs cover all cases of what it would do eventually but the code will start being called in the next MR.

Related to #457724 (closed)

Edited by Madelein van Niekerk

Merge request reports

Loading