Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions spec/amqproxy/server_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,62 @@ describe AMQProxy::Server do
end
end
end

it "does not send duplicate channel.close frames when client crashes after sending close" do
with_server do |server, proxy_url|
Fiber.yield

# Open multiple channels and close them rapidly, then crash
# This increases the probability of triggering the race condition
num_channels = 10

conn = AMQP::Client.new(proxy_url).connect
channels = (1..num_channels).map { conn.channel }
sleep 0.1.seconds
server.upstream_connections.should eq 1

# Send Channel::Close for ALL channels rapidly without waiting for CloseOk
channels.each do |channel|
conn.write AMQ::Protocol::Frame::Channel::Close.new(channel.id, 200_u16, "Normal close", 0_u16, 0_u16)
end

# Simulate crash: close socket abruptly WITHOUT waiting for any CloseOk
conn.@io.close

# Give the proxy time to process the disconnect
sleep 0.3.seconds

# The upstream connection should still be open (not 0)
# If duplicate Channel::Close was sent, upstream would have closed the connection
# with error: "expected 'channel.open'"
server.upstream_connections.should eq 1
end
end

it "does not send duplicate channel.close when upstream initiates close and client crashes" do
with_server do |server, proxy_url|
Fiber.yield

conn = AMQP::Client.new(proxy_url).connect
ch = conn.channel
sleep 0.1.seconds
server.upstream_connections.should eq 1

# Trigger a channel error by consuming from non-existent queue
# This will cause the upstream to send Channel::Close
expect_raises(AMQP::Client::Channel::ClosedException) do
ch.basic_consume("non_existent_queue_#{rand}") { }
end

# Simulate client crash: close socket WITHOUT proper connection close
conn.@io.close

# Give the proxy time to process the disconnect
sleep 0.2.seconds

# The upstream connection should still be open (not 0)
# The proxy already sent CloseOk to upstream when it received Channel::Close
server.upstream_connections.should eq 1
end
end
end
13 changes: 10 additions & 3 deletions src/amqproxy/client.cr
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ module AMQProxy
upstream_channel = channel_pool.get(DownstreamChannel.new(self, frame.channel))
@channel_map[frame.channel] = upstream_channel
write AMQ::Protocol::Frame::Channel::OpenOk.new(frame.channel)
when AMQ::Protocol::Frame::Channel::Close
if upstream_channel = @channel_map.delete(frame.channel)
# Channel was open, forward close to upstream
upstream_channel.write(frame)
else
# Channel doesn't exist
close_connection(504_u16, "CHANNEL_ERROR - Channel #{frame.channel} not open", frame)
end
when AMQ::Protocol::Frame::Channel::CloseOk
# Server closed channel, CloseOk reply to server is already sent
@channel_map.delete(frame.channel)
Expand Down Expand Up @@ -148,9 +156,8 @@ module AMQProxy
@socket.flush unless expect_more_frames?(frame)
end
case frame
when AMQ::Protocol::Frame::Channel::Close
@channel_map[frame.channel] = nil
when AMQ::Protocol::Frame::Channel::CloseOk
when AMQ::Protocol::Frame::Channel::Close,
AMQ::Protocol::Frame::Channel::CloseOk
@channel_map.delete(frame.channel)
when AMQ::Protocol::Frame::Connection::CloseOk
@socket.close rescue nil
Expand Down