VSD - Implement the value stream dashboard count worker
Add a new worker that is scheduled in every 5 minutes, where we limit the maximum runtime to 3 minutes using the Analytics::CycleAnalytics::RuntimeLimiter
class.
The worker invokes a service that does the actual record counting. The service will return a cursor and an array of counts which will be bulk inserted into the Analytics::ValueStreamDashboard::Count
.
As the first step, the worker fetches a cursor from redis or DB. The cursor will help continuing the processing where the previous worker was stopped.
Cursor logic
When the cursor is nil
This case is when there was no aggregation executed yet (new installation) or when the processing for the current month has finished.
Find the first aggregation namespace record and check the last runtime
aggregation = Analytics::ValueStreamDashboard::Aggregation.first
return if aggregation.nil? # feature is not used at all
return if aggregation.last_run_at > 1.month.ago # we store the counts monthly
cursor = { top_level_namespace_id: namespace_id, namespace_id: -1, metric: 1, last_count: 0, last_value: -1 } # initial cursor
When the cursor is not nil
cursor = fetch_cursor # to be implemented, maybe we can use redis?
Iteration logic (pseudo code)
- Iterate over
Analytics::ValueStreamDashboard::Aggregation
. - Iterate over the subgroups or project namespaces.
- Iterate over the defined metrics (enum).
- Invoke the batch counter.
- If time is up: store the cursor and return.
- If a count is finished: add it to the
counts
array. - If loop is done: reset the cursor (nil).
counts = []
aggregation_scope = Analytics::ValueStreamDashboard::Aggregation.where('namespace_id >= ?', cursor[:top_level_namespace_id])
aggregation_scope.each_batch do |aggregations|
aggregations.each do |aggregation|
# TODO: check license, disable the aggregation if license is missing.
metrics = Analytics::ValueStreamDashboard::Count.metrics.gte(cursor[:metric]) # list of metrics values from the enum
metrics.each do |metric|
Namespace.where('traversal_ids[1] = ?', namespace_id).where(type: 'Project').where('id > ?', cursor[:namespace_id]).each_batch do |namespaces| # or project or group, depending on the metric
project_namespaces.each do |namespace|
last_count, last_value = build_scope_for_metric(metric, namespace).each_batch_count(last_count: cursor[:last_count], last_value: cursor[:last_value]) do
runtime_limiter.over_time?
end
if runtime_limiter.over_time?
store_cursor(top_level_namespace: aggregation.namespace_id, namespace_id: namespace.id last_count: count, last_value: last_value, metric_id: metric)
return
else
counts << { namespace_id: namespace.id, metric: metric, count: last_count }
end
end
end
end
end
end
return [cursor, counts]
build_scope_for_metric
method
This method is responsible for building the scope for the counter queries. Depending on the passed in metric, it would create the appropriate AR scope.
Example: when metric is issue count
Issue
is associated to Project
at the moment which means that the outer query should iterate over the Namespaces::ProjectNamespace
rows for the given top-level Group
.
Namespace.where('traversal_ids[1] = ?', namespace_id).where(type: 'Project').where('id > ?', cursor[:namespace_id]).each_batch do |namespaces|
Project.where(project_namespace_id: namespaces.select(:id).each do |project|
# build_scope_for_metric would do this:
last_count, last_value = Issue.where(project_id: project.id).each_batch_count(column :iid, # rest of the arguments)
end
end
We could come up with some sort of metric -> scope mapping:
{
issues: -> (parent) { Issue.where(project_id: parent.id) },
groups: -> (parent) { Group.where(parent_id: parent.id) }
}
As the first step, implement the counting worker for `issues` only and do the rest as a follow-up.