Add GCP bucket connector for fetching package metadata
Problem to solve
The external license database will make license data available to instances via a public gcp bucket. In order import this data a way to connect to the bucket and retrieve its contents is necessary.
Proposal
Create a gcp bucket connector which can open a gcp bucket connection and stream data to the caller. fog/fog-google
is already in use in the code base. It's probably the best candidate.
Because there's a large amount of data in the bucket, a way to get to the last object read is necessary so as not to re-process already processed data.
Implementation plan
1. connector
-
create PackageMetadata::Connector
implementing the interface used in the sync service -
add PackageMetadata::GcpBucketConnector
- initialized with
base_uri
,version_format
, andpurl_type
- initialized with
-
implement PackageMetadata::Connector#fetch_objects_after(sequence_id, chunk_id)
- list
sequence
objects in bucket (these are lexicographically ordered) - skip any objects less than sequence and chunk until next "unread" object is found
- open stream to the object, pass limited number of lines at a time to csv reader
- yield csv rows to caller
- list
- note: if sequence is not found we have to start from the first sequence, so depending on the number of objects in the bucket a search faster than O(n) might be needed
Pseudocode:
module PackageMetadata
class GcpBucketConnector < BaseConnector
def initialize(base_uri:, version_format:, purl_type:)
@base_uri = base_uri
@version_format = version_format
@purl_type = purl_type
end
def data_after(seq_id, chunk_id, &blk)
objects = objects_after(seq_id, chunk_id)
objects.each do |obj|
seq, chunk = split(obj.name)
connection.get_object(bucket_name, obj.name) do |buffer|
csv = CSV.new(buffer)
csv.each do |seq, chunk, row|
yield row
end
end
end
end
private
def objects_after(seq_id, chunk_id, &blk)
all_objects = bucket.list_objects
objects_to_process = []
pos = 0
all_objects.each_with_idx do |obj, idx|
seq, chunk = split(obj.name)
if seq == seq_id
pos = idx
end
if seq == seq_id && chunk == chunk_id
pos = idx
end
end
all_objects[pos+1..-1]
end
def connection
@connection ||= Fog::Storage::Google.new(@base_uri, @version_format, @purl_type)
end
end
end
2. gcp bucket settings
Because this is a public bucket, configuration can happen within code/instance settings rather than in the database.
3. network connection
Because of potential size of objects, the connection to the bucket should be gzip encoded rather than plain text.
4. data stream
Because of the amount of data being processed, streaming the csv data in configurable batches is important so as not to create memory pressure on the instance.