// Copyright 2019 The etcd Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package tracker // Inflights limits the number of MsgApp (represented by the largest index // contained within) sent to followers but not yet acknowledged by them. Callers // use Full() to check whether more messages can be sent, call Add() whenever // they are sending a new append, and release "quota" via FreeLE() whenever an // ack is received. type Inflights struct { // the starting index in the buffer start int // number of inflights in the buffer count int // the size of the buffer size int // buffer contains the index of the last entry // inside one message. buffer []uint64 } // NewInflights sets up an Inflights that allows up to 'size' inflight messages. func NewInflights(size int) *Inflights { return &Inflights{ size: size, } } // Clone returns an *Inflights that is identical to but shares no memory with // the receiver. func (in *Inflights) Clone() *Inflights { ins := *in ins.buffer = append([]uint64(nil), in.buffer...) return &ins } // Add notifies the Inflights that a new message with the given index is being // dispatched. Full() must be called prior to Add() to verify that there is room // for one more message, and consecutive calls to add Add() must provide a // monotonic sequence of indexes. func (in *Inflights) Add(inflight uint64) { if in.Full() { panic("cannot add into a Full inflights") } next := in.start + in.count size := in.size if next >= size { next -= size } if next >= len(in.buffer) { in.grow() } in.buffer[next] = inflight in.count++ } // grow the inflight buffer by doubling up to inflights.size. We grow on demand // instead of preallocating to inflights.size to handle systems which have // thousands of Raft groups per process. func (in *Inflights) grow() { newSize := len(in.buffer) * 2 if newSize == 0 { newSize = 1 } else if newSize > in.size { newSize = in.size } newBuffer := make([]uint64, newSize) copy(newBuffer, in.buffer) in.buffer = newBuffer } // FreeLE frees the inflights smaller or equal to the given `to` flight. func (in *Inflights) FreeLE(to uint64) { if in.count == 0 || to < in.buffer[in.start] { // out of the left side of the window return } idx := in.start var i int for i = 0; i < in.count; i++ { if to < in.buffer[idx] { // found the first large inflight break } // increase index and maybe rotate size := in.size if idx++; idx >= size { idx -= size } } // free i inflights and set new start index in.count -= i in.start = idx if in.count == 0 { // inflights is empty, reset the start index so that we don't grow the // buffer unnecessarily. in.start = 0 } } // FreeFirstOne releases the first inflight. This is a no-op if nothing is // inflight. func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) } // Full returns true if no more messages can be sent at the moment. func (in *Inflights) Full() bool { return in.count == in.size } // Count returns the number of inflight messages. func (in *Inflights) Count() int { return in.count } // reset frees all inflights. func (in *Inflights) reset() { in.count = 0 in.start = 0 }