feat(tracking): add internal events framework to ai gateway
What does this merge request do and why?
As Part of #491 (closed), we will be adding Internal events to AI gateway.
This will help us standardised event structure discussed in https://gitlab.com/gitlab-org/analytics-section/analytics-instrumentation/proposals/-/merge_requests/15
How to use this for tracking
internal_events_client.track_event(
'event_name',
{
'label': 'label',
'value': 2,
'property': 'property',
'random-key': 'value'
},
standard_context
'category'
)
How to set up and validate locally
- Follow this instructions to run AI gateway locally.
- Enable snowplow micro in gdk with these instructions
- Verify AI Gateway is running by visiting http://localhost:5052/docs
- Open gitlab-ai-gateway from gdk in your favourite editor
- Apply below patch
patch
diff --git a/ai_gateway/api/v2/code/completions.py b/ai_gateway/api/v2/code/completions.py
index 2604cda..3391c0d 100644
--- a/ai_gateway/api/v2/code/completions.py
+++ b/ai_gateway/api/v2/code/completions.py
@@ -1,3 +1,4 @@
+from datetime import datetime
from time import time
from typing import Annotated, AsyncIterator, Union
@@ -34,6 +35,7 @@ from ai_gateway.async_dependency_resolver import (
get_code_suggestions_generations_anthropic_factory_provider,
get_code_suggestions_generations_litellm_factory_provider,
get_code_suggestions_generations_vertex_provider,
+ get_internal_events_client,
get_snowplow_instrumentator,
)
from ai_gateway.auth.self_signed_jwt import SELF_SIGNED_TOKEN_ISSUER
@@ -48,15 +50,20 @@ from ai_gateway.code_suggestions.processing.ops import lang_from_filename
from ai_gateway.gitlab_features import GitLabFeatureCategory, GitLabUnitPrimitive
from ai_gateway.instrumentators.base import TelemetryInstrumentator
from ai_gateway.models import KindAnthropicModel, KindModelProvider
-from ai_gateway.tracking import SnowplowEvent, SnowplowEventContext
+from ai_gateway.tracking import (
+ InternalEventAdditionalProperties,
+ SnowplowEvent,
+ SnowplowEventContext,
+ StandardContext,
+)
from ai_gateway.tracking.errors import log_exception
from ai_gateway.tracking.instrumentator import SnowplowInstrumentator
+from ai_gateway.tracking.internal_events import InternalEventsClient
__all__ = [
"router",
]
-
log = structlog.stdlib.get_logger("codesuggestions")
router = APIRouter()
@@ -91,6 +98,7 @@ async def completions(
snowplow_instrumentator: SnowplowInstrumentator = Depends(
get_snowplow_instrumentator
),
+ internal_events: InternalEventsClient = Depends(get_internal_events_client),
):
if not current_user.can(GitLabUnitPrimitive.CODE_SUGGESTIONS):
raise HTTPException(
@@ -99,8 +107,34 @@ async def completions(
)
try:
- snowplow_instrumentator.watch(
- _suggestion_requested_snowplow_event(request, payload)
+ # snowplow_instrumentator.watch(
+ # _suggestion_requested_snowplow_event(request, payload)
+ # )
+
+ standard_context = StandardContext(
+ project_id=123,
+ namespace_id=456,
+ is_gitlab_team_member=True,
+ feature_enabled_by_namespace_ids=[123, 456],
+ environment="production",
+ source="api",
+ plan="premium",
+ context_generated_at=datetime.now().isoformat(),
+ realm=request.headers.get(X_GITLAB_REALM_HEADER, "saas"),
+ instance_id=request.headers.get(X_GITLAB_INSTANCE_ID_HEADER, ""),
+ global_user_id=request.headers.get(X_GITLAB_GLOBAL_USER_ID_HEADER, ""),
+ host_name=request.headers.get(X_GITLAB_HOST_NAME_HEADER, ""),
+ )
+
+ additional_properties = InternalEventAdditionalProperties(
+ label="completion_event", property="property_value", value=1, key="value"
+ )
+
+ internal_events.track_event(
+ event_name="test_event",
+ additional_properties=additional_properties,
+ context=standard_context,
+ category="code_completions",
)
except Exception as e:
log_exception(e)
@@ -114,42 +148,42 @@ async def completions(
stream=payload.stream,
)
- kwargs = {}
- if payload.model_provider == KindModelProvider.ANTHROPIC:
- code_completions = completions_anthropic_factory()
-
- # We support the prompt version 2 only with the Anthropic models
- if payload.prompt_version == 2:
- kwargs.update({"raw_prompt": payload.prompt})
- elif payload.model_provider == KindModelProvider.LITELLM:
- code_completions = completions_litellm_factory(
- model__name=payload.model_name,
- model__endpoint=payload.model_endpoint,
- )
- else:
- code_completions = completions_legacy_factory()
- if payload.choices_count > 0:
- kwargs.update({"candidate_count": payload.choices_count})
-
- if payload.context:
- kwargs.update({"code_context": [ctx.content for ctx in payload.context]})
-
- suggestions = await _execute_code_completion(payload, code_completions, **kwargs)
-
- if isinstance(suggestions[0], AsyncIterator):
- return await _handle_stream(suggestions[0])
-
- return SuggestionsResponse(
- id="id",
- created=int(time()),
- model=SuggestionsResponse.Model(
- engine=suggestions[0].model.engine,
- name=suggestions[0].model.name,
- lang=suggestions[0].lang,
- ),
- experiments=suggestions[0].metadata.experiments,
- choices=_completion_suggestion_choices(suggestions),
- )
+ # kwargs = {}
+ # if payload.model_provider == KindModelProvider.ANTHROPIC:
+ # code_completions = completions_anthropic_factory()
+
+ # # We support the prompt version 2 only with the Anthropic models
+ # if payload.prompt_version == 2:
+ # kwargs.update({"raw_prompt": payload.prompt})
+ # elif payload.model_provider == KindModelProvider.LITELLM:
+ # code_completions = completions_litellm_factory(
+ # model__name=payload.model_name,
+ # model__endpoint=payload.model_endpoint,
+ # )
+ # else:
+ # code_completions = completions_legacy_factory()
+ # if payload.choices_count > 0:
+ # kwargs.update({"candidate_count": payload.choices_count})
+
+ # if payload.context:
+ # kwargs.update({"code_context": [ctx.content for ctx in payload.context]})
+
+ # suggestions = await _execute_code_completion(payload, code_completions, **kwargs)
+
+ # if isinstance(suggestions[0], AsyncIterator):
+ # return await _handle_stream(suggestions[0])
+
+ # return SuggestionsResponse(
+ # id="id",
+ # created=int(time()),
+ # model=SuggestionsResponse.Model(
+ # engine=suggestions[0].model.engine,
+ # name=suggestions[0].model.name,
+ # lang=suggestions[0].lang,
+ # ),
+ # experiments=suggestions[0].metadata.experiments,
+ # choices=_completion_suggestion_choices(suggestions),
+ # )
@router.post("/code/generations")
diff --git a/ai_gateway/tracking/container.py b/ai_gateway/tracking/container.py
index c9c6356..10d7828 100644
--- a/ai_gateway/tracking/container.py
+++ b/ai_gateway/tracking/container.py
@@ -17,7 +17,7 @@ def _init_snowplow_client(
enabled: bool, configuration: SnowplowClientConfiguration
) -> SnowplowClient | SnowplowClientStub:
if not enabled:
- return SnowplowClientStub()
+ return SnowplowClient(configuration)
return SnowplowClient(configuration)
diff --git a/ai_gateway/tracking/snowplow.py b/ai_gateway/tracking/snowplow.py
index 85907eb..fbda57e 100644
--- a/ai_gateway/tracking/snowplow.py
+++ b/ai_gateway/tracking/snowplow.py
@@ -71,9 +71,8 @@ class SnowplowClient(Client):
def __init__(self, configuration: SnowplowClientConfiguration) -> None:
emitter = AsyncEmitter(
- batch_size=configuration.batch_size,
- thread_count=configuration.thread_count,
- endpoint=configuration.endpoint,
+ endpoint="http://localhost:9091",
+ batch_size=1,
)
self.tracker = Tracker(
- Copy below curl and run it.
cURL
curl --location --request POST 'http://localhost:5052/v2/code/completions' \
--header 'accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
"current_file": {
"file_name": "app.py",
"language_identifier": "python",
"content_above_cursor": "<|fim_prefix|>def hello_world():<|fim_suffix|><|fim_middle|>",
"content_below_cursor": ""
},
"model_provider": "litellm",
"model_endpoint": "http://127.0.0.1:4000",
"model_name": "codegemma",
"telemetry": [],
"prompt_version": 2,
"prompt": ""
}'
- You should be able to see event in snowplow micro.
Merge request checklist
-
Tests added for new functionality. If not, please raise an issue to follow up. -
Documentation added/updated, if needed.
Edited by Ankit Panchal