# # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # require 'logger' require 'thread' module Thrift # this class expects to always use a FramedTransport for reading messages class NonblockingServer < BaseServer def initialize(processor, server_transport, transport_factory=nil, protocol_factory=nil, num=20, logger=nil) super(processor, server_transport, transport_factory, protocol_factory) @num_threads = num if logger.nil? @logger = Logger.new(STDERR) @logger.level = Logger::WARN else @logger = logger end @shutdown_semaphore = Mutex.new @transport_semaphore = Mutex.new end def serve @logger.info "Starting #{self}" @server_transport.listen @io_manager = start_io_manager begin loop do break if @server_transport.closed? begin rd, = select([@server_transport], nil, nil, 0.1) rescue Errno::EBADF => e # In Ruby 1.9, calling @server_transport.close in shutdown paths causes the select() to raise an # Errno::EBADF. If this happens, ignore it and retry the loop. break end next if rd.nil? socket = @server_transport.accept @logger.debug "Accepted socket: #{socket.inspect}" @io_manager.add_connection socket end rescue IOError => e end # we must be shutting down @logger.info "#{self} is shutting down, goodbye" ensure @transport_semaphore.synchronize do @server_transport.close end @io_manager.ensure_closed unless @io_manager.nil? end def shutdown(timeout = 0, block = true) @shutdown_semaphore.synchronize do return if @is_shutdown @is_shutdown = true end # nonblocking is intended for calling from within a Handler # but we can't change the order of operations here, so lets thread shutdown_proc = lambda do @io_manager.shutdown(timeout) @transport_semaphore.synchronize do @server_transport.close # this will break the accept loop end end if block shutdown_proc.call else Thread.new &shutdown_proc end end private def start_io_manager iom = IOManager.new(@processor, @server_transport, @transport_factory, @protocol_factory, @num_threads, @logger) iom.spawn iom end class IOManager # :nodoc: DEFAULT_BUFFER = 2**20 def initialize(processor, server_transport, transport_factory, protocol_factory, num, logger) @processor = processor @server_transport = server_transport @transport_factory = transport_factory @protocol_factory = protocol_factory @num_threads = num @logger = logger @connections = [] @buffers = Hash.new { |h,k| h[k] = '' } @signal_queue = Queue.new @signal_pipes = IO.pipe @signal_pipes[1].sync = true @worker_queue = Queue.new @shutdown_queue = Queue.new end def add_connection(socket) signal [:connection, socket] end def spawn @iom_thread = Thread.new do @logger.debug "Starting #{self}" run end end def shutdown(timeout = 0) @logger.debug "#{self} is shutting down workers" @worker_queue.clear @num_threads.times { @worker_queue.push [:shutdown] } signal [:shutdown, timeout] @shutdown_queue.pop @signal_pipes[0].close @signal_pipes[1].close @logger.debug "#{self} is shutting down, goodbye" end def ensure_closed kill_worker_threads if @worker_threads @iom_thread.kill end private def run spin_worker_threads loop do rd, = select([@signal_pipes[0], *@connections]) if rd.delete @signal_pipes[0] break if read_signals == :shutdown end rd.each do |fd| begin if fd.handle.eof? remove_connection fd else read_connection fd end rescue Errno::ECONNRESET remove_connection fd end end end join_worker_threads(@shutdown_timeout) ensure @shutdown_queue.push :shutdown end def read_connection(fd) @buffers[fd] << fd.read(DEFAULT_BUFFER) while(frame = slice_frame!(@buffers[fd])) @logger.debug "#{self} is processing a frame" @worker_queue.push [:frame, fd, frame] end end def spin_worker_threads @logger.debug "#{self} is spinning up worker threads" @worker_threads = [] @num_threads.times do @worker_threads << spin_thread end end def spin_thread Worker.new(@processor, @transport_factory, @protocol_factory, @logger, @worker_queue).spawn end def signal(msg) @signal_queue << msg @signal_pipes[1].write " " end def read_signals # clear the signal pipe # note that since read_nonblock is broken in jruby, # we can only read up to a set number of signals at once sigstr = @signal_pipes[0].readpartial(1024) # now read the signals begin sigstr.length.times do signal, obj = @signal_queue.pop(true) case signal when :connection @connections << obj when :shutdown @shutdown_timeout = obj return :shutdown end end rescue ThreadError # out of signals # note that in a perfect world this would never happen, since we're # only reading the number of signals pushed on the pipe, but given the lack # of locks, in theory we could clear the pipe/queue while a new signal is being # placed on the pipe, at which point our next read_signals would hit this error end end def remove_connection(fd) # don't explicitly close it, a thread may still be writing to it @connections.delete fd @buffers.delete fd end def join_worker_threads(shutdown_timeout) start = Time.now @worker_threads.each do |t| if shutdown_timeout > 0 timeout = (start + shutdown_timeout) - Time.now break if timeout <= 0 t.join(timeout) else t.join end end kill_worker_threads end def kill_worker_threads @worker_threads.each do |t| t.kill if t.status end @worker_threads.clear end def slice_frame!(buf) if buf.length >= 4 size = buf.unpack('N').first if buf.length >= size + 4 buf.slice!(0, size + 4) else nil end else nil end end class Worker # :nodoc: def initialize(processor, transport_factory, protocol_factory, logger, queue) @processor = processor @transport_factory = transport_factory @protocol_factory = protocol_factory @logger = logger @queue = queue end def spawn Thread.new do @logger.debug "#{self} is spawning" run end end private def run loop do cmd, *args = @queue.pop case cmd when :shutdown @logger.debug "#{self} is shutting down, goodbye" break when :frame fd, frame = args begin otrans = @transport_factory.get_transport(fd) oprot = @protocol_factory.get_protocol(otrans) membuf = MemoryBufferTransport.new(frame) itrans = @transport_factory.get_transport(membuf) iprot = @protocol_factory.get_protocol(itrans) @processor.process(iprot, oprot) rescue => e @logger.error "#{Thread.current.inspect} raised error: #{e.inspect}\n#{e.backtrace.join("\n")}" end end end end end end end end