diff --git a/lib/fluent/plugin/out_forward/socket_cache.rb b/lib/fluent/plugin/out_forward/socket_cache.rb index 925eef0fad..fbee8af943 100644 --- a/lib/fluent/plugin/out_forward/socket_cache.rb +++ b/lib/fluent/plugin/out_forward/socket_cache.rb @@ -31,20 +31,26 @@ def initialize(timeout, log) end def checkout_or(key) + obsolete_sockets = [] + @mutex.synchronize do - tsock = pick_socket(key) + tsock, obsolete_sockets = pick_socket(key) if tsock - tsock.sock + return tsock.sock else sock = yield new_tsock = TimedSocket.new(timeout, key, sock) @log.debug("connect new socket #{new_tsock}") @inflight_sockets[sock] = new_tsock - new_tsock.sock + return new_tsock.sock end end + ensure + obsolete_sockets.each do |sock| + sock.sock.close rescue nil + end end def checkin(sock) @@ -117,17 +123,32 @@ def clear # this method is not thread safe def pick_socket(key) if @available_sockets[key].empty? - return nil + return nil, [] end t = Time.now - if (s = @available_sockets[key].find { |sock| !expired_socket?(sock, time: t) }) - @inflight_sockets[s.sock] = @available_sockets[key].delete(s) - s.timeout = timeout - s - else - nil + selected = nil + remaining = [] + obsolete_sockets = [] + + @available_sockets[key].each do |sock| + if expired_socket?(sock, time: t) || unavailable_socket?(sock.sock) + obsolete_sockets << sock + elsif selected.nil? + selected = sock + else + remaining << sock + end + end + + @available_sockets[key] = remaining + + if selected + @inflight_sockets[selected.sock] = selected + selected.timeout = timeout end + + [selected, obsolete_sockets] end def timeout @@ -137,6 +158,20 @@ def timeout def expired_socket?(sock, time: Time.now) sock.timeout ? sock.timeout < time : false end + + def unavailable_socket?(sock) + return sock.closed? if sock.respond_to?(:closed?) && sock.closed? + + io = if sock.respond_to?(:to_io) + sock.to_io + elsif sock.is_a?(IO) + sock + end + + io ? !!IO.select([io], nil, nil, 0) : false + rescue IOError, SystemCallError + true + end end end end diff --git a/test/plugin/out_forward/test_socket_cache.rb b/test/plugin/out_forward/test_socket_cache.rb index 1d584d6b4e..eacd4e3caa 100644 --- a/test/plugin/out_forward/test_socket_cache.rb +++ b/test/plugin/out_forward/test_socket_cache.rb @@ -2,6 +2,7 @@ require 'fluent/plugin/out_forward/socket_cache' require 'timecop' +require 'socket' class SocketCacheTest < Test::Unit::TestCase sub_test_case 'checkout_or' do @@ -48,6 +49,38 @@ class SocketCacheTest < Test::Unit::TestCase c.checkout_or('key') { 'new socket' } end end + + test 'discards cached socket after remote close' do + c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log) + server = TCPServer.open('127.0.0.1', unused_port(protocol: :tcp)) + first_sock = TCPSocket.new('127.0.0.1', server.addr[1]) + first_peer = server.accept + + c.checkout_or('key') { first_sock } + c.checkin(first_sock) + + first_peer.close + waiting(5) do + until IO.select([first_sock], nil, nil, 0) + sleep 0.01 + end + end + + second_peer = nil + second_sock = c.checkout_or('key') do + sock = TCPSocket.new('127.0.0.1', server.addr[1]) + second_peer = server.accept + sock + end + + assert_not_same(first_sock, second_sock) + assert_true(first_sock.closed?) + ensure + c&.clear + first_peer&.close rescue nil + second_peer&.close rescue nil + server&.close rescue nil + end end sub_test_case 'checkin' do