Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public class IndirectMessageReference implements QueueMessageReference {
private boolean dropped;
/** Has the message been acked? */
private boolean acked;
/** Has the message been acked? */
private boolean delivered;
/** Direct reference to the message */
private final Message message;
private final MessageId messageId;
Expand Down Expand Up @@ -197,16 +199,25 @@ public boolean isExpired() {

@Override
public synchronized int getSize() {
return message.getSize();
return message.getSize();
}

@Override
public boolean isAdvisory() {
return message.isAdvisory();
return message.isAdvisory();
}

@Override
public boolean canProcessAsExpired() {
return message.canProcessAsExpired();
}

@Override
public boolean isDelivered() {
return delivered;
}

public void setDelivered(final boolean delivered) {
this.delivered = delivered;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,13 @@ public boolean canProcessAsExpired() {
return false;
}

@Override
public boolean isDelivered() {
return false;
}

@Override
public void setDelivered(boolean delivered) {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,27 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a
} else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.

boolean inAckRange = false;
for (final MessageReference node : dispatched) {
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null
|| ack.getFirstMessageId().equals(messageId)) {
inAckRange = true;
}
if (inAckRange) {
if (node instanceof QueueMessageReference) {
((QueueMessageReference) node).setDelivered(true);
}

if (ack.getLastMessageId().equals(messageId)) {
destination = (Destination) node.getRegionDestination();
callDispatchMatched = true;
break;
}
}
}

int index = 0;
for (Iterator<MessageReference> iter = dispatched.iterator(); iter.hasNext(); index++) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this loop overrides what has been done in the previous loop (line 275). Maybe we should merge the logic in one loop ?

final MessageReference node = iter.next();
Expand Down Expand Up @@ -396,7 +417,7 @@ public final void acknowledge(final ConnectionContext context,final MessageAck a
}

protected void processExpiredAck(final ConnectionContext context, final Destination dest,
final MessageReference node) {
final MessageReference node) {
dest.messageExpired(context, this, node);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,8 @@ public interface QueueMessageReference extends MessageReference {
boolean unlock();

LockOwner getLockOwner();

boolean isDelivered();

void setDelivered(boolean delivered);
}
Loading