Verified Commit b7660684 authored by roebert's avatar roebert 👻
Browse files

Zwischenstand

parent 49dbde08
Pipeline #106246 passed with stages
in 6 minutes and 52 seconds
......@@ -83,7 +83,6 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
private UnsignedInteger nextInboundSequenceNo;
private UnsignedInteger base;
private UnsignedInteger nextSeqNum;
private UnsignedInteger firstNonRetransmittedSeqNum;
private Duration rto;
private Duration retryTimeout;
private boolean firstOutbound;
......@@ -94,6 +93,7 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
private boolean ackRequired;
private long srtt;
private long sdev;
private long timeoutAt;
/**
* Creates a new GoBackNArqHandler.
......@@ -130,7 +130,6 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
this.firstOutbound = true;
this.windowShouldAffectWritability = windowShouldAffectWritability;
this.ackClock = ackClock;
this.firstNonRetransmittedSeqNum = UnsignedInteger.MIN_VALUE;
this.srtt = retryTimeout.toMillis();
updateRTO(retryTimeout.toMillis());
}
......@@ -202,7 +201,8 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
ctx.fireChannelActive();
// try to synchronize sequence numbers
// ctx.writeAndFlush(new GoBackNArqRst());
ackTask(ctx);
this.ackTask = ctx.executor().scheduleAtFixedRate(() -> ackTask(ctx), 0, ackClock.toMillis(), MILLISECONDS);
this.retryTask = ctx.executor().scheduleAtFixedRate(() -> resend(ctx), Duration.ofSeconds(1).toMillis(), Duration.ofMillis(10).toMillis(), MILLISECONDS);
}
@Override
......@@ -210,7 +210,7 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
window.removeAndFailAll(new ClosedChannelException());
overflow.removeAndFailAll(new ClosedChannelException());
ctx.fireChannelInactive();
stopTimer();
this.retryTask.cancel(true);
stopAckTask();
}
......@@ -258,7 +258,6 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
final GoBackNArqAck ack = (GoBackNArqAck) msg;
LOG.trace("[{}] Got {}", ctx.channel().id()::asShortText, () -> ack);
resetExponentialBackoff(ack.sequenceNo());
if (ack.sequenceNo().safeIncrement().equals(nextSeqNum)) {
stopTimer();
}
......@@ -320,13 +319,16 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
*/
private void succeedWrites(final ChannelHandlerContext ctx, final long cumAck) {
window.increaseCapacity(cumAck);
LOG.debug("Increased window to `{}`", window::size);
long rtt = 0;
for (long i = 0; i < cumAck; i++) {
final Window.Frame f = window.remove();
f.getPromise().trySuccess();
final long rtt = System.currentTimeMillis() - f.getSendTime();
updateRTO(rtt);
rtt += System.currentTimeMillis() - f.getSendTime();
}
updateRTO(rtt / cumAck);
writeData(ctx);
ctx.flush();
......@@ -339,6 +341,11 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
*/
private void writeData(final ChannelHandlerContext ctx) {
final int freeSpace = Math.min(window.getFreeSpace(), overflow.size());
if(freeSpace > 0) {
resetExponentialBackoff();
}
for (int i = 0; i < freeSpace; i++) {
final Object o = overflow.current();
if (o == null) {
......@@ -407,20 +414,17 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
* @param ctx the handler context
*/
private void resetTimer(final ChannelHandlerContext ctx) {
LOG.trace("[{}] Reset timer", ctx.channel().id()::asShortText);
stopTimer();
retryTask = ctx.executor().schedule(() -> resend(ctx), retryTimeout.toMillis(), MILLISECONDS);
timeoutAt = System.currentTimeMillis() + retryTimeout.toMillis();
LOG.trace("[{}] Reset timer", ctx.channel().id()::asShortText);
}
/**
* Stops the retry timer task.
*/
private void stopTimer() {
if (retryTask != null) {
LOG.trace("Reset timer");
retryTask.cancel(true);
retryTask = null;
}
timeoutAt = 0;
LOG.trace("Stop timer");
}
/**
......@@ -430,10 +434,14 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
*/
private void resend(final ChannelHandlerContext ctx) {
UnsignedInteger seqNo = UnsignedInteger.of(base.getValue());
final long currentTime = System.currentTimeMillis();
// resend all messages from window
if (window.size() != 0) {
LOG.info("[{}] ACKs got timeout. Resend complete window of size {}", ctx.channel().id()::asShortText, window::size);
if (window.size() != 0 && timeoutAt != 0 && timeoutAt < currentTime) {
window.decreaseCapacity();
LOG.debug("Decreased window to `{}`", window::size);
LOG.info("[{}] ACKs got timeout on rto `{}`. Resend complete window of size {}", ctx.channel().id()::asShortText, rto::toMillis, window::size);
for (final Window.Frame frame : window.getQueue()) {
if (frame.getPromise().isDone()) {
window.remove();
......@@ -446,7 +454,6 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
ctx.flush();
window.decreaseCapacity();
doExponentialBackoff();
resetTimer(ctx);
}
......@@ -456,7 +463,6 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
* Increase the retry timeout exponentially to do some backoff.
*/
private void doExponentialBackoff() {
firstNonRetransmittedSeqNum = UnsignedInteger.of(window.size()).safeIncrement();
final Duration newTimeout = retryTimeout.multipliedBy(2);
if (newTimeout.toMillis() < MAX_EXPONENTIAL_BACKOFF.toMillis()) {
retryTimeout = newTimeout;
......@@ -468,13 +474,10 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
/**
* Resets the retry timeout on the first not retransmitted packed.
*
* @param currentSeqNum the current sequence number
*/
private void resetExponentialBackoff(final UnsignedInteger currentSeqNum) {
if (currentSeqNum.getValue() >= firstNonRetransmittedSeqNum.getValue()) {
retryTimeout = rto;
}
private void resetExponentialBackoff() {
retryTimeout = rto;
LOG.debug("Reset retry timeout to `{}`", rto::toMillis);
}
/**
......@@ -484,9 +487,9 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
*/
private void updateRTO(final long rtt) {
updateSrtt(rtt);
rto = Duration.ofMillis(Math.round(srtt + RTO_COEFFICIENT * sdev));
rto = Duration.ofMillis(Math.round(srtt + (RTO_COEFFICIENT * sdev)));
LOG.trace("Updated default retransmission timeout to `{}` by getting rtt `{}`", () -> rto, () -> rtt);
LOG.trace("Updated default retransmission timeout to `{}` by getting rtt `{}` and updated srtt to `{}` and sdev to `{}`", rto::toMillis, () -> rtt, () -> srtt, () -> sdev);
}
/**
......@@ -495,8 +498,8 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
* @param rtt the current measured roundtrip time
*/
private void updateSrtt(final long rtt) {
srtt = Math.round((1 - SRTT_COEFFICIENT) * srtt + SRTT_COEFFICIENT * rtt);
sdev = Math.round((1 - SDEV_COEFFICIENT) * sdev + SDEV_COEFFICIENT * Math.abs(rtt - srtt)); // smoothed roundtrip standard deviation estimate
srtt = Math.round(((1 - SRTT_COEFFICIENT) * srtt) + (SRTT_COEFFICIENT * rtt));
sdev = Math.round(((1 - SDEV_COEFFICIENT) * sdev) + (SDEV_COEFFICIENT * Math.abs(rtt - srtt))); // smoothed roundtrip standard deviation estimate
}
/**
......@@ -508,8 +511,6 @@ public class GoBackNArqHandler extends ChannelDuplexHandler {
// reply with ACK of current inbound index
ctx.writeAndFlush(new GoBackNArqAck(nextInboundSequenceNo.safeDecrement()));
}
ackTask = ctx.executor().schedule(() -> ackTask(ctx), ackClock.toMillis(), MILLISECONDS);
}
/**
......
......@@ -108,11 +108,16 @@ class PendingQueueWindow implements Window {
@Override
public void increaseCapacity(final long amount) {
capacity += amount;
if (amount + capacity > 2000) {
capacity = 2000;
}
else {
capacity += amount;
}
}
@Override
public void decreaseCapacity() {
capacity = Math.max(1, capacity / 2);
capacity = Math.max(20, capacity / 2);
}
}
......@@ -101,11 +101,16 @@ public class SimpleWindow implements Window {
@Override
public void increaseCapacity(final long amount) {
capacity += amount;
if (amount + capacity > 2000) {
capacity = 2000;
}
else {
capacity += amount;
}
}
@Override
public void decreaseCapacity() {
capacity = Math.max(1, capacity / 2);
capacity = Math.max(20, capacity / 2);
}
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment