Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 45 additions & 10 deletions lib/fluent/plugin/out_forward/socket_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,26 @@ def initialize(timeout, log)
end

def checkout_or(key)
obsolete_sockets = []

@mutex.synchronize do
tsock = pick_socket(key)
tsock, obsolete_sockets = pick_socket(key)

if tsock
tsock.sock
return tsock.sock
else
sock = yield
new_tsock = TimedSocket.new(timeout, key, sock)
@log.debug("connect new socket #{new_tsock}")

@inflight_sockets[sock] = new_tsock
new_tsock.sock
return new_tsock.sock
end
end
ensure
obsolete_sockets.each do |sock|
sock.sock.close rescue nil
end
end

def checkin(sock)
Expand Down Expand Up @@ -117,17 +123,32 @@ def clear
# this method is not thread safe
def pick_socket(key)
if @available_sockets[key].empty?
return nil
return nil, []
end

t = Time.now
if (s = @available_sockets[key].find { |sock| !expired_socket?(sock, time: t) })
@inflight_sockets[s.sock] = @available_sockets[key].delete(s)
s.timeout = timeout
s
else
nil
selected = nil
remaining = []
obsolete_sockets = []

@available_sockets[key].each do |sock|
if expired_socket?(sock, time: t) || unavailable_socket?(sock.sock)
obsolete_sockets << sock
elsif selected.nil?
selected = sock
else
remaining << sock
end
end

@available_sockets[key] = remaining

if selected
@inflight_sockets[selected.sock] = selected
selected.timeout = timeout
end

[selected, obsolete_sockets]
end

def timeout
Expand All @@ -137,6 +158,20 @@ def timeout
def expired_socket?(sock, time: Time.now)
sock.timeout ? sock.timeout < time : false
end

def unavailable_socket?(sock)
return sock.closed? if sock.respond_to?(:closed?) && sock.closed?

io = if sock.respond_to?(:to_io)
sock.to_io
elsif sock.is_a?(IO)
sock
end

io ? !!IO.select([io], nil, nil, 0) : false
rescue IOError, SystemCallError
true
end
end
end
end
33 changes: 33 additions & 0 deletions test/plugin/out_forward/test_socket_cache.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

require 'fluent/plugin/out_forward/socket_cache'
require 'timecop'
require 'socket'

class SocketCacheTest < Test::Unit::TestCase
sub_test_case 'checkout_or' do
Expand Down Expand Up @@ -48,6 +49,38 @@ class SocketCacheTest < Test::Unit::TestCase
c.checkout_or('key') { 'new socket' }
end
end

test 'discards cached socket after remote close' do
c = Fluent::Plugin::ForwardOutput::SocketCache.new(10, $log)
server = TCPServer.open('127.0.0.1', unused_port(protocol: :tcp))
first_sock = TCPSocket.new('127.0.0.1', server.addr[1])
first_peer = server.accept

c.checkout_or('key') { first_sock }
c.checkin(first_sock)

first_peer.close
waiting(5) do
until IO.select([first_sock], nil, nil, 0)
sleep 0.01
end
end

second_peer = nil
second_sock = c.checkout_or('key') do
sock = TCPSocket.new('127.0.0.1', server.addr[1])
second_peer = server.accept
sock
end

assert_not_same(first_sock, second_sock)
assert_true(first_sock.closed?)
ensure
c&.clear
first_peer&.close rescue nil
second_peer&.close rescue nil
server&.close rescue nil
end
end

sub_test_case 'checkin' do
Expand Down