package lz4

import (
	"io"

	"github.com/pierrec/lz4/v4/internal/lz4block"
	"github.com/pierrec/lz4/v4/internal/lz4errors"
	"github.com/pierrec/lz4/v4/internal/lz4stream"
)

var writerStates = []aState{
	noState:     newState,
	newState:    writeState,
	writeState:  closedState,
	closedState: newState,
	errorState:  newState,
}

// NewWriter returns a new LZ4 frame encoder.
func NewWriter(w io.Writer) *Writer {
	zw := &Writer{frame: lz4stream.NewFrame()}
	zw.state.init(writerStates)
	_ = zw.Apply(DefaultBlockSizeOption, DefaultChecksumOption, DefaultConcurrency, defaultOnBlockDone)
	zw.Reset(w)
	return zw
}

// Writer allows writing an LZ4 stream.
type Writer struct {
	state   _State
	src     io.Writer                 // destination writer
	level   lz4block.CompressionLevel // how hard to try
	num     int                       // concurrency level
	frame   *lz4stream.Frame          // frame being built
	data    []byte                    // pending data
	idx     int                       // size of pending data
	handler func(int)
	legacy  bool
}

func (*Writer) private() {}

func (w *Writer) Apply(options ...Option) (err error) {
	defer w.state.check(&err)
	switch w.state.state {
	case newState:
	case errorState:
		return w.state.err
	default:
		return lz4errors.ErrOptionClosedOrError
	}
	for _, o := range options {
		if err = o(w); err != nil {
			return
		}
	}
	w.Reset(w.src)
	return
}

func (w *Writer) isNotConcurrent() bool {
	return w.num == 1
}

// init sets up the Writer when in newState. It does not change the Writer state.
func (w *Writer) init() error {
	w.frame.InitW(w.src, w.num, w.legacy)
	if true || !w.isNotConcurrent() {
		size := w.frame.Descriptor.Flags.BlockSizeIndex()
		w.data = size.Get()
	}
	w.idx = 0
	return w.frame.Descriptor.Write(w.frame, w.src)
}

func (w *Writer) Write(buf []byte) (n int, err error) {
	defer w.state.check(&err)
	switch w.state.state {
	case writeState:
	case closedState, errorState:
		return 0, w.state.err
	case newState:
		if err = w.init(); w.state.next(err) {
			return
		}
	default:
		return 0, w.state.fail()
	}

	zn := len(w.data)
	for len(buf) > 0 {
		if w.isNotConcurrent() && w.idx == 0 && len(buf) >= zn {
			// Avoid a copy as there is enough data for a block.
			if err = w.write(buf[:zn], false); err != nil {
				return
			}
			n += zn
			buf = buf[zn:]
			continue
		}
		// Accumulate the data to be compressed.
		m := copy(w.data[w.idx:], buf)
		n += m
		w.idx += m
		buf = buf[m:]

		if w.idx < len(w.data) {
			// Buffer not filled.
			return
		}

		// Buffer full.
		if err = w.write(w.data, true); err != nil {
			return
		}
		if !w.isNotConcurrent() {
			size := w.frame.Descriptor.Flags.BlockSizeIndex()
			w.data = size.Get()
		}
		w.idx = 0
	}
	return
}

func (w *Writer) write(data []byte, safe bool) error {
	if w.isNotConcurrent() {
		block := w.frame.Blocks.Block
		err := block.Compress(w.frame, data, w.level).Write(w.frame, w.src)
		w.handler(len(block.Data))
		return err
	}
	c := make(chan *lz4stream.FrameDataBlock)
	w.frame.Blocks.Blocks <- c
	go func(c chan *lz4stream.FrameDataBlock, data []byte, safe bool) {
		b := lz4stream.NewFrameDataBlock(w.frame)
		c <- b.Compress(w.frame, data, w.level)
		<-c
		w.handler(len(b.Data))
		b.Close(w.frame)
		if safe {
			// safe to put it back as the last usage of it was FrameDataBlock.Write() called before c is closed
			lz4block.Put(data)
		}
	}(c, data, safe)

	return nil
}

// Close closes the Writer, flushing any unwritten data to the underlying io.Writer,
// but does not close the underlying io.Writer.
func (w *Writer) Close() (err error) {
	switch w.state.state {
	case writeState:
	case errorState:
		return w.state.err
	default:
		return nil
	}
	defer w.state.nextd(&err)
	if w.idx > 0 {
		// Flush pending data, disable w.data freeing as it is done later on.
		if err = w.write(w.data[:w.idx], false); err != nil {
			return err
		}
		w.idx = 0
	}
	err = w.frame.CloseW(w.src, w.num)
	// It is now safe to free the buffer.
	if w.data != nil {
		lz4block.Put(w.data)
		w.data = nil
	}
	return
}

// Reset clears the state of the Writer w such that it is equivalent to its
// initial state from NewWriter, but instead writing to writer.
// Reset keeps the previous options unless overwritten by the supplied ones.
// No access to writer is performed.
//
// w.Close must be called before Reset or pending data may be dropped.
func (w *Writer) Reset(writer io.Writer) {
	w.frame.Reset(w.num)
	w.state.reset()
	w.src = writer
}

// ReadFrom efficiently reads from r and compressed into the Writer destination.
func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
	switch w.state.state {
	case closedState, errorState:
		return 0, w.state.err
	case newState:
		if err = w.init(); w.state.next(err) {
			return
		}
	default:
		return 0, w.state.fail()
	}
	defer w.state.check(&err)

	size := w.frame.Descriptor.Flags.BlockSizeIndex()
	var done bool
	var rn int
	data := size.Get()
	if w.isNotConcurrent() {
		// Keep the same buffer for the whole process.
		defer lz4block.Put(data)
	}
	for !done {
		rn, err = io.ReadFull(r, data)
		switch err {
		case nil:
		case io.EOF, io.ErrUnexpectedEOF: // read may be partial
			done = true
		default:
			return
		}
		n += int64(rn)
		err = w.write(data[:rn], true)
		if err != nil {
			return
		}
		w.handler(rn)
		if !done && !w.isNotConcurrent() {
			// The buffer will be returned automatically by go routines (safe=true)
			// so get a new one fo the next round.
			data = size.Get()
		}
	}
	err = w.Close()
	return
}