Separate Elastic queue for embeddings
What does this MR do and why?
Creates a new queue for indexing embeddings into Elasticsearch.
There are currently two queues: initial
and incremental
. We chose to add a separate queue for embeddings because:
- We need to have control over how many items we process from the queue. The other queues process 1000 items at a time but for embeddings, we need to comply with rate limits.
- We need more control over the indexing process.
- Embeddings can take longer to be processed than normal ES data.
- If an embedding fails to be generated/indexed, we will put the item back on the embedding queue. This is okay because the order of processing does not matter when indexing embeddings.
This MR
The following changes are present in this MR:
- Creating a new
ProcessEmbeddingBookkeepingService
which inherits fromProcessBookkeepingService
. This service is responsible for adding and fetching items from the queue. - Creating a
ElasticIndexEmbeddingBulkCronWorker
which inherits fromBulkCronWorker
. This worker will be scheduled to run every minute on a cron and enqueues 16 concurrent workers which call the service to fetch and process items from the queue. The other BulkCronWorkers re-enqueue themselves if there are items left in the queue but we added a feature flag to control if we want to enable re-enqueueing for the new worker to have more control: [Feature flag] Rollout of `embedding_cron_worke... (#463762).
Rate limiting
-
ApplicationRateLimiter
already defines a threshold of450
over 1 minute for:vertex_embeddings_api
. We need to share this resource with Duo chat documentation embeddings. The vertex API limit is1500
tokens per minute. - We define a threshold of 70% of the
ApplicationRateLimiter
rate limit, which equals to315
per minute. This means there are 135 tokens per minute available for duo documentation if we max out our tokens. - We will not process any embeddings from the queue if the endpoint is throttled: we will return early and wait until the next cron run.
-
shard_limit
is the amount of items we process from the queue during an execution. It is set to(threshold / self::SHARDS.count).to_i
: For 16 shards, that's 19 items per shard. - In the follow-up MR, we will increment the rate limit whenever we generate an embedding and return early if it is throttled.
To summarize: every minute we will have 16 concurrent processes each generating 19 embeddings at most and will always check that we don't exceed 315
embeddings per minute using the ApplicationRateLimiter
cache.
Follow-up MR
In Embedding Elasticsearch reference (!154297 - merged) we are adding a new Embedding Reference which defines how items are stored in the queue and add items to the queue based on callbacks. We also added a cron schedule for the bulk cron worker to run.
MR acceptance checklist
Please evaluate this MR against the MR acceptance checklist. It helps you analyze changes to reduce risks in quality, performance, reliability, security, and maintainability.
How to set up and validate locally
At the moment, the code does nothing. The specs cover all cases of what it would do eventually but the code will start being called in the next MR.
Related to #457724 (closed)