debian-mirror-gitlab/lib/gitlab/ci/trace/chunked_io.rb

261 lines
6.5 KiB
Ruby
Raw Normal View History

2018-12-13 13:39:08 +05:30
# frozen_string_literal: true
2018-10-15 14:42:47 +05:30
##
# This class is compatible with IO class (https://ruby-doc.org/core-2.3.1/IO.html)
# source: https://gitlab.com/snippets/1685610
module Gitlab
module Ci
class Trace
class ChunkedIO
CHUNK_SIZE = ::Ci::BuildTraceChunk::CHUNK_SIZE
FailedToGetChunkError = Class.new(StandardError)
attr_reader :build
attr_reader :tell, :size
2019-02-15 15:39:39 +05:30
attr_reader :chunk_data, :chunk_range
2018-10-15 14:42:47 +05:30
alias_method :pos, :tell
def initialize(build, &block)
@build = build
@chunks_cache = []
@tell = 0
@size = calculate_size
yield self if block_given?
end
def close
# no-op
end
def binmode
# no-op
end
def binmode?
true
end
def seek(pos, where = IO::SEEK_SET)
new_pos =
case where
when IO::SEEK_END
size + pos
when IO::SEEK_SET
pos
when IO::SEEK_CUR
tell + pos
else
-1
end
raise ArgumentError, 'new position is outside of file' if new_pos < 0 || new_pos > size
@tell = new_pos
end
def eof?
tell == size
end
def each_line
until eof?
line = readline
break if line.nil?
yield(line)
end
end
2018-12-13 13:39:08 +05:30
def read(length = nil, outbuf = nil)
out = []
2018-10-15 14:42:47 +05:30
length ||= size - tell
until length <= 0 || eof?
data = chunk_slice_from_offset
2021-01-29 00:20:46 +05:30
raise FailedToGetChunkError if data.to_s.empty?
2018-10-15 14:42:47 +05:30
chunk_bytes = [CHUNK_SIZE - chunk_offset, length].min
2019-02-15 15:39:39 +05:30
chunk_data_slice = data.byteslice(0, chunk_bytes)
2018-10-15 14:42:47 +05:30
2019-02-15 15:39:39 +05:30
out << chunk_data_slice
@tell += chunk_data_slice.bytesize
length -= chunk_data_slice.bytesize
2018-10-15 14:42:47 +05:30
end
2018-12-13 13:39:08 +05:30
out = out.join
2018-10-15 14:42:47 +05:30
# If outbuf is passed, we put the output into the buffer. This supports IO.copy_stream functionality
if outbuf
2018-12-13 13:39:08 +05:30
outbuf.replace(out)
2018-10-15 14:42:47 +05:30
end
out
end
def readline
2018-12-13 13:39:08 +05:30
out = []
2018-10-15 14:42:47 +05:30
until eof?
data = chunk_slice_from_offset
2021-01-29 00:20:46 +05:30
raise FailedToGetChunkError if data.to_s.empty?
2019-02-15 15:39:39 +05:30
2018-10-15 14:42:47 +05:30
new_line = data.index("\n")
if !new_line.nil?
2019-02-15 15:39:39 +05:30
raw_data = data[0..new_line]
out << raw_data
@tell += raw_data.bytesize
2018-10-15 14:42:47 +05:30
break
else
out << data
@tell += data.bytesize
end
end
2018-12-13 13:39:08 +05:30
out.join
2018-10-15 14:42:47 +05:30
end
def write(data)
start_pos = tell
while tell < start_pos + data.bytesize
# get slice from current offset till the end where it falls into chunk
chunk_bytes = CHUNK_SIZE - chunk_offset
2019-02-15 15:39:39 +05:30
data_slice = data.byteslice(tell - start_pos, chunk_bytes)
2018-10-15 14:42:47 +05:30
# append data to chunk, overwriting from that point
2019-02-15 15:39:39 +05:30
ensure_chunk.append(data_slice, chunk_offset)
2018-10-15 14:42:47 +05:30
# move offsets within buffer
2019-02-15 15:39:39 +05:30
@tell += data_slice.bytesize
2018-10-15 14:42:47 +05:30
@size = [size, tell].max
end
tell - start_pos
ensure
invalidate_chunk_cache
end
2018-12-05 23:21:45 +05:30
# rubocop: disable CodeReuse/ActiveRecord
2018-10-15 14:42:47 +05:30
def truncate(offset)
raise ArgumentError, 'Outside of file' if offset > size || offset < 0
return if offset == size # Skip the following process as it doesn't affect anything
@tell = offset
@size = offset
# remove all next chunks
trace_chunks.where('chunk_index > ?', chunk_index).fast_destroy_all
# truncate current chunk
current_chunk.truncate(chunk_offset)
ensure
invalidate_chunk_cache
end
2018-12-05 23:21:45 +05:30
# rubocop: enable CodeReuse/ActiveRecord
2018-10-15 14:42:47 +05:30
def flush
# no-op
end
def present?
true
end
def destroy!
2019-10-12 21:52:04 +05:30
# TODO: Remove this logging once we confirmed new live trace architecture is functional.
# See https://gitlab.com/gitlab-com/gl-infra/infrastructure/issues/4667.
unless build.has_archived_trace?
Sidekiq.logger.warn(message: 'The job does not have archived trace but going to be destroyed.',
job_id: build.id)
end
2018-10-15 14:42:47 +05:30
trace_chunks.fast_destroy_all
@tell = @size = 0
ensure
invalidate_chunk_cache
end
private
##
# The below methods are not implemented in IO class
#
def in_range?
@chunk_range&.include?(tell)
end
def chunk_slice_from_offset
unless in_range?
current_chunk.tap do |chunk|
raise FailedToGetChunkError unless chunk
2019-02-15 15:39:39 +05:30
@chunk_data = chunk.data
2018-10-15 14:42:47 +05:30
@chunk_range = chunk.range
end
end
2019-02-15 15:39:39 +05:30
@chunk_data.byteslice(chunk_offset, CHUNK_SIZE)
2018-10-15 14:42:47 +05:30
end
def chunk_offset
tell % CHUNK_SIZE
end
def chunk_index
tell / CHUNK_SIZE
end
def chunk_start
chunk_index * CHUNK_SIZE
end
def chunk_end
[chunk_start + CHUNK_SIZE, size].min
end
def invalidate_chunk_cache
@chunks_cache = []
end
2018-12-05 23:21:45 +05:30
# rubocop: disable CodeReuse/ActiveRecord
2018-10-15 14:42:47 +05:30
def current_chunk
@chunks_cache[chunk_index] ||= trace_chunks.find_by(chunk_index: chunk_index)
end
2018-12-05 23:21:45 +05:30
# rubocop: enable CodeReuse/ActiveRecord
2018-10-15 14:42:47 +05:30
2021-03-11 19:13:27 +05:30
def next_chunk
@chunks_cache[chunk_index] = begin
if ::Ci::BuildTraceChunk.consistent_reads_enabled?(build)
::Ci::BuildTraceChunk
.safe_find_or_create_by(build: build, chunk_index: chunk_index)
else
::Ci::BuildTraceChunk
.new(build: build, chunk_index: chunk_index)
end
end
2018-10-15 14:42:47 +05:30
end
def ensure_chunk
2021-03-11 19:13:27 +05:30
current_chunk || next_chunk || current_chunk
2018-10-15 14:42:47 +05:30
end
2018-12-05 23:21:45 +05:30
# rubocop: disable CodeReuse/ActiveRecord
2018-10-15 14:42:47 +05:30
def trace_chunks
::Ci::BuildTraceChunk.where(build: build)
end
2018-12-05 23:21:45 +05:30
# rubocop: enable CodeReuse/ActiveRecord
2018-10-15 14:42:47 +05:30
2018-12-05 23:21:45 +05:30
# rubocop: disable CodeReuse/ActiveRecord
2018-10-15 14:42:47 +05:30
def calculate_size
trace_chunks.order(chunk_index: :desc).first.try(&:end_offset).to_i
end
2018-12-05 23:21:45 +05:30
# rubocop: enable CodeReuse/ActiveRecord
2018-10-15 14:42:47 +05:30
end
end
end
end