Verified Commit 45aca96f authored by roebert's avatar roebert 👻
Browse files

Merge branch 'master' into netty-codec-c-socket

parents ea8f2211 40577d81
Pipeline #99118 passed with stages
in 8 minutes and 7 seconds
......@@ -46,7 +46,6 @@ import org.drasyl.node.event.NodeUnrecoverableErrorEvent;
import org.drasyl.node.event.NodeUpEvent;
import org.drasyl.node.event.PeerDirectEvent;
import org.drasyl.node.event.PeerRelayEvent;
import org.drasyl.node.handler.plugin.PluginManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
......@@ -88,8 +87,6 @@ class PerfClientNodeTest {
@Mock(answer = RETURNS_DEEP_STUBS)
private ServerBootstrap bootstrap;
@Mock(answer = RETURNS_DEEP_STUBS)
private PluginManager pluginManager;
@Mock(answer = RETURNS_DEEP_STUBS)
private ChannelFuture channelFuture;
@Mock(answer = RETURNS_DEEP_STUBS)
private ChannelGroup channels;
......
......@@ -34,7 +34,6 @@ import org.drasyl.node.event.MessageEvent;
import org.drasyl.node.event.NodeNormalTerminationEvent;
import org.drasyl.node.event.NodeOnlineEvent;
import org.drasyl.node.event.NodeUnrecoverableErrorEvent;
import org.drasyl.node.handler.plugin.PluginManager;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
......@@ -70,8 +69,6 @@ class SendingWormholeNodeTest {
@Mock(answer = RETURNS_DEEP_STUBS)
private ServerBootstrap bootstrap;
@Mock(answer = RETURNS_DEEP_STUBS)
private PluginManager pluginManager;
@Mock(answer = RETURNS_DEEP_STUBS)
private ChannelFuture channelFuture;
@Mock(answer = RETURNS_DEEP_STUBS)
private ChannelGroup channels;
......
/*
* Copyright (c) 2020-2021 Heiko Bornholdt and Kevin Röbert
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
* OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.drasyl.channel;
import io.netty.util.ReferenceCounted;
import java.net.SocketAddress;
import java.util.Objects;
import static java.util.Objects.requireNonNull;
/**
* A message that wraps another message with an address.
*
* @param <M> the type of the wrapped message
* @param <A> the type of the address
*/
public class AddressedMessage<M, A extends SocketAddress> implements ReferenceCounted {
private final M message;
private final A address;
/**
* @throws NullPointerException if {@code address} is {@code null}
*/
public AddressedMessage(final M message, final A address) {
this.message = message;
this.address = requireNonNull(address);
}
/**
* Returns the message wrapped by this addressed message.
*/
public M message() {
return message;
}
/**
* Returns the address of this message.
*/
public A address() {
return address;
}
@Override
public String toString() {
return "AddressedMessage{" +
"message=" + message +
", address=" + address +
'}';
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final AddressedMessage<?, ?> that = (AddressedMessage<?, ?>) o;
return Objects.equals(message, that.message) && Objects.equals(address, that.address);
}
@Override
public int hashCode() {
return Objects.hash(message, address);
}
@Override
public int refCnt() {
if (message instanceof ReferenceCounted) {
return ((ReferenceCounted) message).refCnt();
}
else {
return 0;
}
}
@Override
public ReferenceCounted retain() {
if (message instanceof ReferenceCounted) {
((ReferenceCounted) message).retain();
}
return this;
}
@Override
public ReferenceCounted retain(final int increment) {
if (message instanceof ReferenceCounted) {
((ReferenceCounted) message).retain(increment);
}
return this;
}
@Override
public ReferenceCounted touch() {
if (message instanceof ReferenceCounted) {
((ReferenceCounted) message).touch();
}
return this;
}
@Override
public ReferenceCounted touch(final Object hint) {
if (message instanceof ReferenceCounted) {
((ReferenceCounted) message).touch(hint);
}
return this;
}
@Override
public boolean release() {
if (message instanceof ReferenceCounted) {
return ((ReferenceCounted) message).release();
}
else {
return false;
}
}
@Override
public boolean release(final int decrement) {
if (message instanceof ReferenceCounted) {
return ((ReferenceCounted) message).release(decrement);
}
else {
return false;
}
}
/**
* Returns a new {@link AddressedMessage} which contains the message {@code message} with the
* address {@link #address()}.
*/
public <N> AddressedMessage<N, A> replace(N message) {
return new AddressedMessage<>(message, address());
}
/**
* Returns a new {@link AddressedMessage} with contains the message {@link #message()} with the
* address {@code address}.
*/
public <B extends SocketAddress> AddressedMessage<M, B> route(B address) {
return new AddressedMessage<>(message(), address);
}
}
......@@ -161,7 +161,7 @@ public class DrasylChannel extends AbstractChannel {
}
ReferenceCountUtil.retain(msg);
parent().write(new AddressedMessage<>(msg, remoteAddress));
parent().write(new OverlayAddressedMessage<>(msg, remoteAddress, localAddress));
in.remove();
}
parent().flush();
......
......@@ -160,9 +160,9 @@ public class DrasylServerChannel extends AbstractServerChannel {
}
else {
try {
final AddressedMessage<?, IdentityPublicKey> childMsg = (AddressedMessage<?, IdentityPublicKey>) msg;
final Object o = childMsg.message();
final IdentityPublicKey peer = childMsg.address();
final OverlayAddressedMessage<?> childMsg = (OverlayAddressedMessage<?>) msg;
final Object o = childMsg.content();
final IdentityPublicKey peer = (IdentityPublicKey) childMsg.sender();
// create/get channel
final DrasylServerChannel serverChannel = (DrasylServerChannel) ctx.channel();
......
package org.drasyl.channel;
import io.netty.channel.DefaultAddressedEnvelope;
import java.net.InetSocketAddress;
import java.util.Objects;
/**
* A message that wraps another message with an {@link InetSocketAddress}.
*
* @param <M> the type of the wrapped message
*/
public class InetAddressedMessage<M> extends DefaultAddressedEnvelope<M, InetSocketAddress> {
/**
* @throws NullPointerException if {@code message} or {@code recipient} is {@code null}
*/
public InetAddressedMessage(final M message, final InetSocketAddress recipient) {
super(message, recipient);
}
/**
* @throws NullPointerException if {@code message} or {@code recipient} and {@code sender} are
* {@code null}
*/
public InetAddressedMessage(final M message,
final InetSocketAddress recipient,
final InetSocketAddress sender) {
super(message, recipient, sender);
}
@Override
public int hashCode() {
return Objects.hash(sender(), recipient(), content());
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final InetAddressedMessage<?> that = (InetAddressedMessage<?>) o;
return Objects.equals(sender(), that.sender()) &&
Objects.equals(recipient(), that.recipient()) &&
Objects.equals(content(), that.content());
}
@Override
public InetAddressedMessage<M> retain() {
super.retain();
return this;
}
@Override
public InetAddressedMessage<M> retain(final int increment) {
super.retain(increment);
return this;
}
@Override
public InetAddressedMessage<M> touch() {
super.touch();
return this;
}
@Override
public InetAddressedMessage<M> touch(final Object hint) {
super.touch(hint);
return this;
}
/**
* Returns a copy of this message with {@code newRecipient} as the new {@link #recipient()}.
*/
public InetAddressedMessage<M> route(final InetSocketAddress newRecipient) {
return new InetAddressedMessage<>(content(), newRecipient, sender());
}
/**
* Returns a copy of this message with {@code newContent} as the new {@link #content()}.
*/
public <N> InetAddressedMessage<N> replace(final N newContent) {
return new InetAddressedMessage<>(newContent, recipient(), sender());
}
}
package org.drasyl.channel;
import io.netty.channel.DefaultAddressedEnvelope;
import org.drasyl.identity.DrasylAddress;
import java.net.InetSocketAddress;
import java.util.Objects;
/**
* A message that wraps another message with a {@link DrasylAddress}.
*
* @param <M> the type of the wrapped message
*/
public class OverlayAddressedMessage<M> extends DefaultAddressedEnvelope<M, DrasylAddress> {
/**
* @throws NullPointerException if {@code message} or {@code recipient} is {@code null}
*/
public OverlayAddressedMessage(final M message, final DrasylAddress recipient) {
super(message, recipient);
}
/**
* @throws NullPointerException if {@code message} or {@code recipient} and {@code sender} are
* {@code null}
*/
public OverlayAddressedMessage(final M message,
final DrasylAddress recipient,
final DrasylAddress sender) {
super(message, recipient, sender);
}
@Override
public int hashCode() {
return Objects.hash(sender(), recipient(), content());
}
@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final OverlayAddressedMessage<?> that = (OverlayAddressedMessage<?>) o;
return Objects.equals(sender(), that.sender()) &&
Objects.equals(recipient(), that.recipient()) &&
Objects.equals(content(), that.content());
}
@Override
public OverlayAddressedMessage<M> retain() {
super.retain();
return this;
}
@Override
public OverlayAddressedMessage<M> retain(final int increment) {
super.retain(increment);
return this;
}
@Override
public OverlayAddressedMessage<M> touch() {
super.touch();
return this;
}
@Override
public OverlayAddressedMessage<M> touch(final Object hint) {
super.touch(hint);
return this;
}
/**
* Returns a copy of this message with {@code newRecipient} as the new {@link #recipient()}.
*/
public InetAddressedMessage<M> resolve(final InetSocketAddress address) {
return new InetAddressedMessage<>(content(), address);
}
/**
* Returns a copy of this message with {@code newRecipient} as the new {@link #recipient()}.
*/
public OverlayAddressedMessage<M> route(final DrasylAddress newRecipient) {
return new OverlayAddressedMessage<>(content(), newRecipient, sender());
}
/**
* Returns a copy of this message with {@code newContent} as the new {@link #content()}.
*/
public <N> OverlayAddressedMessage<N> replace(final N newContent) {
return new OverlayAddressedMessage<>(newContent, recipient(), sender());
}
}
......@@ -24,13 +24,12 @@ package org.drasyl.handler.discovery;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import org.drasyl.channel.AddressedMessage;
import org.drasyl.channel.OverlayAddressedMessage;
import org.drasyl.identity.DrasylAddress;
import org.drasyl.util.Pair;
import org.drasyl.util.logging.Logger;
import org.drasyl.util.logging.LoggerFactory;
import java.net.SocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
......@@ -56,8 +55,8 @@ public class IntraVmDiscovery extends ChannelDuplexHandler {
public void write(final ChannelHandlerContext ctx,
final Object msg,
final ChannelPromise promise) {
if (msg instanceof AddressedMessage) {
final SocketAddress recipient = ((AddressedMessage<?, ?>) msg).address();
if (msg instanceof OverlayAddressedMessage) {
final DrasylAddress recipient = ((OverlayAddressedMessage<?>) msg).recipient();
final ChannelHandlerContext discoveree = discoveries.get(Pair.of(myNetworkId, recipient));
......@@ -66,8 +65,8 @@ public class IntraVmDiscovery extends ChannelDuplexHandler {
ctx.write(msg, promise);
}
else {
LOG.debug("Send message `{}` via Intra VM Discovery.", ((AddressedMessage<?, ?>) msg)::message);
discoveree.fireChannelRead(((AddressedMessage<?, ?>) msg).route(ctx.channel().localAddress()));
LOG.debug("Send message `{}` via Intra VM Discovery.", ((OverlayAddressedMessage<?>) msg)::content);
discoveree.fireChannelRead(msg);
promise.setSuccess();
}
}
......
......@@ -21,13 +21,13 @@
*/
package org.drasyl.handler.logging;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.ScheduledFuture;
import org.drasyl.channel.AddressedMessage;
import java.io.PrintStream;
import java.net.SocketAddress;
......@@ -131,9 +131,9 @@ public class MessagesThroughputHandler extends ChannelDuplexHandler {
public void write(final ChannelHandlerContext ctx,
final Object msg,
final ChannelPromise promise) {
if (msg instanceof AddressedMessage) {
if (msg instanceof AddressedEnvelope) {
outboundMessages.increment();
if (consumeOutbound.test(((AddressedMessage<?, ?>) msg).address(), ((AddressedMessage<?, ?>) msg).message())) {
if (consumeOutbound.test(((AddressedEnvelope<?, ?>) msg).recipient(), ((AddressedEnvelope<?, ?>) msg).content())) {
promise.setSuccess();
}
else {
......@@ -147,9 +147,9 @@ public class MessagesThroughputHandler extends ChannelDuplexHandler {
@Override
public void channelRead(final ChannelHandlerContext ctx, final Object msg) {
if (msg instanceof AddressedMessage) {
if (msg instanceof AddressedEnvelope) {
inboundMessages.increment();
if (!consumeInbound.test(((AddressedMessage<?, ?>) msg).address(), ((AddressedMessage<?, ?>) msg).message())) {
if (!consumeInbound.test(((AddressedEnvelope<?, ?>) msg).sender(), ((AddressedEnvelope<?, ?>) msg).content())) {
ctx.fireChannelRead(msg);
}
}
......
......@@ -19,12 +19,13 @@
* OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
* OR OTHER DEALINGS IN THE SOFTWARE.
*/
package org.drasyl.handler.codec;
package org.drasyl.handler.remote;
import io.netty.buffer.ByteBuf;
import io.netty.channel.AddressedEnvelope;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import org.drasyl.channel.AddressedMessage;
import org.drasyl.channel.OverlayAddressedMessage;
import org.drasyl.handler.remote.protocol.ApplicationMessage;
import org.drasyl.identity.IdentityPublicKey;
import org.drasyl.identity.ProofOfWork;
......@@ -36,7 +37,7 @@ import static java.util.Objects.requireNonNull;
/**
* This codec converts {@link ApplicationMessage}s to the embedded payload and vice versa.
*/
public class ApplicationMessageToPayloadCodec extends MessageToMessageCodec<AddressedMessage<ApplicationMessage, ?>, AddressedMessage<ByteBuf, IdentityPublicKey>> {
public class ApplicationMessageToPayloadCodec extends MessageToMessageCodec<AddressedEnvelope<ApplicationMessage, ?>, OverlayAddressedMessage<ByteBuf>> {
private final int networkId;
private final IdentityPublicKey myPublicKey;
private final ProofOfWork myProofOfWork;
......@@ -51,26 +52,26 @@ public class ApplicationMessageToPayloadCodec extends MessageToMessageCodec<Addr
@Override
public boolean acceptOutboundMessage(final Object msg) {
return msg instanceof AddressedMessage && ((AddressedMessage<?, ?>) msg).message() instanceof ByteBuf && ((AddressedMessage<?, ?>) msg).address() instanceof IdentityPublicKey;
return msg instanceof OverlayAddressedMessage && ((OverlayAddressedMessage<?>) msg).content() instanceof ByteBuf;
}
@Override
protected void encode(final ChannelHandlerContext ctx,
final AddressedMessage<ByteBuf, IdentityPublicKey> msg,
final OverlayAddressedMessage<ByteBuf> msg,
final List<Object> out) throws Exception {
final ApplicationMessage wrappedMsg = ApplicationMessage.of(networkId, msg.address(), myPublicKey, myProofOfWork, msg.message().retain());
final ApplicationMessage wrappedMsg = ApplicationMessage.of(networkId, (IdentityPublicKey) msg.recipient(), myPublicKey, myProofOfWork, msg.content().retain());
out.add(msg.replace(wrappedMsg));
}
@Override
public boolean acceptInboundMessage(final Object msg) {
return msg instanceof AddressedMessage && ((AddressedMessage<?, ?>) msg).message() instanceof ApplicationMessage;
return msg instanceof AddressedEnvelope && ((AddressedEnvelope<?, ?>) msg).content() instanceof ApplicationMessage;
}
@Override
protected void decode(final ChannelHandlerContext ctx,
final AddressedMessage<ApplicationMessage, ?> msg,
final AddressedEnvelope<ApplicationMessage, ?> msg,
final List<Object> out) {
out.add(new AddressedMessage<>(msg.message().getPayload().retain(), msg.message().getSender()));
out.add(new OverlayAddressedMessage<>(msg.content().getPayload().retain(), msg.content().getRecipient(), msg.content().getSender()));
}
}
......@@ -26,7 +26,7 @@ import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToMessageCodec;
import io.netty.util.ReferenceCountUtil;
import org.drasyl.channel.AddressedMessage;
import org.drasyl.channel.InetAddressedMessage;
import org.drasyl.handler.remote.protocol.InvalidMessageFormatException;
import org.drasyl.handler.remote.protocol.MagicNumberMissmatchException;
import org.drasyl.handler.remote.protocol.PartialReadMessage;
......@@ -41,35 +41,35 @@ import java.util.List;
*/
@SuppressWarnings("java:S110")
@Sharable
public class ByteToRemoteMessageCodec extends MessageToMessageCodec<AddressedMessage<ByteBuf, ?>, AddressedMessage<RemoteMessage, ?>> {
public class ByteToRemoteMessageCodec extends MessageToMessageCodec<InetAddressedMessage<ByteBuf>, InetAddressedMessage<RemoteMessage>> {
private static final Logger LOG = LoggerFactory.getLogger(ByteToRemoteMessageCodec.class);
@Override
public boolean acceptOutboundMessage(final Object msg) {
return msg instanceof AddressedMessage && ((AddressedMessage<?, ?>) msg).message() instanceof RemoteMessage;
return msg instanceof InetAddressedMessage && ((InetAddressedMessage<?>) msg).content() instanceof RemoteMessage;
}
@Override
protected void encode(final ChannelHandlerContext ctx,
final AddressedMessage<RemoteMessage, ?> msg,