diff --git a/CHANGELOG.md b/CHANGELOG.md index 70d781ae4762ce408e59da4fe89730064058f6cf..9dd8028827a782b1ded6fac9e5c549ebbf116ce0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,8 @@ ProofOfWork = -2144920491 - CLI: `generate-pow` sub-command added. - CLI: `tun` sub-command added. - CLI: `generate-completion` sub-command added. +- Broadcast-based LAN Discovery added. Can be used programatically. More + information: https://git.informatik.uni-hamburg.de/sane-public/drasyl/-/merge_requests/680 - ### Changed diff --git a/drasyl-core/src/main/java/org/drasyl/handler/remote/LocalNetworkDiscovery.java b/drasyl-core/src/main/java/org/drasyl/handler/remote/LocalNetworkDiscovery.java index a0804e845631b1e96f22f14391188e02108c1847..6687ef86711670ee4e54520a8f979fee4c7d5377 100644 --- a/drasyl-core/src/main/java/org/drasyl/handler/remote/LocalNetworkDiscovery.java +++ b/drasyl-core/src/main/java/org/drasyl/handler/remote/LocalNetworkDiscovery.java @@ -51,16 +51,16 @@ import static org.drasyl.util.Preconditions.requirePositive; import static org.drasyl.util.RandomUtil.randomLong; /** - * This handler, along with the {@link UdpMulticastServer}, is used to discover other nodes on the - * local network via IP multicast. + * This handler, along with the {@link UdpMulticastServer} or {@link UdpBroadcastServer}, is used to + * discover other nodes on the local network via IP multicast or broadcast. *

- * For this purpose, the {@link UdpMulticastServer} joins a multicast group and forwards received - * {@link HelloMessage}s to this handler, which thus becomes aware of other nodes in the local - * network. In case no {@link HelloMessage} has been received for a longer period of time, the other - * node is considered stale. + * For this purpose, the above-mentioned server forwards received multicast/broadcast {@link + * HelloMessage}s to this handler, which thus becomes aware of other nodes in the local network. In + * case no {@link HelloMessage} has been received for a longer period of time, the other node is + * considered stale. *

- * In addition, this handler periodically sends a {@link HelloMessage} messages to a multicast group - * so that other nodes become aware of this node. + * In addition, this handler periodically sends a {@link HelloMessage} messages to a given + * multicast/broadcast address so that other nodes become aware of this node. * * @see UdpMulticastServer * @see UdpBroadcastServer diff --git a/drasyl-core/src/main/java/org/drasyl/handler/remote/UdpBroadcastServer.java b/drasyl-core/src/main/java/org/drasyl/handler/remote/UdpBroadcastServer.java new file mode 100644 index 0000000000000000000000000000000000000000..161a87dd87123dadb7cc6deae132c5ea138a1f2d --- /dev/null +++ b/drasyl-core/src/main/java/org/drasyl/handler/remote/UdpBroadcastServer.java @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2020-2022 Heiko Bornholdt and Kevin Röbert + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE + * OR OTHER DEALINGS IN THE SOFTWARE. + */ +package org.drasyl.handler.remote; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; +import io.netty.channel.socket.nio.NioDatagramChannel; +import io.netty.util.NetUtil; +import io.netty.util.internal.SystemPropertyUtil; +import org.drasyl.channel.InetAddressedMessage; +import org.drasyl.util.logging.Logger; +import org.drasyl.util.logging.LoggerFactory; + +import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Supplier; + +import static java.util.Objects.requireNonNull; + +/** + * Starts an UDP broadcast server and together with the {@link LocalNetworkDiscovery} is responsible + * for discovering other nodes in the local network. + * + * @see LocalNetworkDiscovery + */ +@Sharable +public class UdpBroadcastServer extends ChannelInboundHandlerAdapter { + private static final String BROADCAST_ADDRESS_PROPERTY = "org.drasyl.remote.broadcast.address"; + private static final String BROADCAST_BIND_HOST_PROPERTY = "org.drasyl.remote.broadcast.bind-host"; + private static final Logger LOG = LoggerFactory.getLogger(UdpBroadcastServer.class); + public static final InetSocketAddress BROADCAST_ADDRESS; + private static final String BROADCAST_BIND_HOST; + private final Set nodes; + private final Supplier bootstrapSupplier; + private DatagramChannel channel; + + static { + try { + final String stringValue = SystemPropertyUtil.get(BROADCAST_ADDRESS_PROPERTY, "255.255.255.255:22527"); + final URI uriValue = new URI("my://" + stringValue); + BROADCAST_ADDRESS = new InetSocketAddress(uriValue.getHost(), uriValue.getPort()); + } + catch (final URISyntaxException | IllegalArgumentException e) { + throw new RuntimeException("Invalid broadcast address given:", e); + } + + BROADCAST_BIND_HOST = SystemPropertyUtil.get(BROADCAST_BIND_HOST_PROPERTY, "0.0.0.0"); + } + + @SuppressWarnings("java:S2384") + UdpBroadcastServer(final Set nodes, + final Supplier bootstrapSupplier, + final DatagramChannel channel) { + this.nodes = requireNonNull(nodes); + this.bootstrapSupplier = requireNonNull(bootstrapSupplier); + this.channel = channel; + } + + public UdpBroadcastServer() { + this( + new HashSet<>(), + Bootstrap::new, + null + ); + } + + @SuppressWarnings("java:S1905") + @Override + public void channelActive(final ChannelHandlerContext ctx) { + if (NetUtil.isIpV6AddressesPreferred()) { + LOG.debug("Do not start Broadcast Server as we're in an IPv6 preferred environment."); + ctx.fireChannelActive(); + return; + } + + nodes.add(ctx); + + if (channel == null) { + LOG.debug("Start Broadcast Server..."); + bootstrapSupplier.get() + .group((EventLoopGroup) ctx.executor().parent()) + .channel(NioDatagramChannel.class) + .handler(new UdpBroadcastServerHandler()) + .bind(BROADCAST_BIND_HOST, BROADCAST_ADDRESS.getPort()) + .addListener(new UdpBroadcastServerFutureListener(ctx)); + } + else { + ctx.fireChannelActive(); + } + } + + @SuppressWarnings("java:S1602") + @Override + public void channelInactive(final ChannelHandlerContext ctx) { + ctx.fireChannelInactive(); + + nodes.remove(ctx); + + if (channel != null && nodes.isEmpty()) { + final InetSocketAddress socketAddress = channel.localAddress(); + LOG.debug("Stop Server listening at udp:/{}...", socketAddress); + // shutdown server + channel.close().addListener(future1 -> { + channel = null; + LOG.debug("Server stopped."); + }); + } + } + + private class UdpBroadcastServerHandler extends SimpleChannelInboundHandler { + public UdpBroadcastServerHandler() { + super(false); + } + + @Override + protected void channelRead0(final ChannelHandlerContext channelCtx, + final DatagramPacket packet) { + final InetSocketAddress sender = packet.sender(); + final ByteBuf content = packet.content().asReadOnly(); + nodes.forEach(nodeCtx -> { + LOG.trace("Datagram received {} and passed to {}", () -> packet, nodeCtx.channel()::localAddress); + final ByteBuf byteBuf = content.retainedDuplicate(); + nodeCtx.executor().execute(() -> { + nodeCtx.fireChannelRead(new InetAddressedMessage<>(byteBuf, null, sender)); + nodeCtx.fireChannelReadComplete(); + }); + }); + content.release(); + } + } + + private class UdpBroadcastServerFutureListener implements ChannelFutureListener { + private final ChannelHandlerContext ctx; + + public UdpBroadcastServerFutureListener(final ChannelHandlerContext ctx) { + this.ctx = ctx; + } + + @Override + public void operationComplete(final ChannelFuture future) { + if (future.isSuccess()) { + // server successfully started + final DatagramChannel myChannel = (DatagramChannel) future.channel(); + LOG.info("Server started and listening at udp:/{}", myChannel.localAddress()); + UdpBroadcastServer.this.channel = myChannel; + ctx.fireChannelActive(); + } + else { + // server start failed + //noinspection unchecked + LOG.info("Unable to bind server to address udp://{}:{}. This can be caused by another drasyl node running in a different JVM or another application is bind to that port.", () -> BROADCAST_BIND_HOST, BROADCAST_ADDRESS::getPort, future.cause()::getMessage); + ctx.fireChannelActive(); + } + } + } +} diff --git a/drasyl-core/src/test/java/org/drasyl/handler/remote/UdpBroadcastServerTest.java b/drasyl-core/src/test/java/org/drasyl/handler/remote/UdpBroadcastServerTest.java new file mode 100644 index 0000000000000000000000000000000000000000..8a72899677238c949a6669ae1a76389d1e69f27f --- /dev/null +++ b/drasyl-core/src/test/java/org/drasyl/handler/remote/UdpBroadcastServerTest.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2020-2022 Heiko Bornholdt and Kevin Röbert + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. + * IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE + * OR OTHER DEALINGS IN THE SOFTWARE. + */ +package org.drasyl.handler.remote; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.channel.socket.DatagramChannel; +import io.netty.channel.socket.DatagramPacket; +import io.netty.util.concurrent.EventExecutor; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.stubbing.Answer; + +import java.net.InetSocketAddress; +import java.net.NetworkInterface; +import java.util.HashSet; +import java.util.Set; +import java.util.function.Supplier; + +import static org.mockito.Answers.RETURNS_DEEP_STUBS; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class UdpBroadcastServerTest { + @Mock(answer = RETURNS_DEEP_STUBS) + private Supplier bootstrapSupplier; + @Mock(answer = RETURNS_DEEP_STUBS) + private Bootstrap bootstrap; + @Mock(answer = RETURNS_DEEP_STUBS) + private DatagramChannel channel; + @Mock + private Set nodes; + + @Nested + class StartServer { + @Test + void shouldStartServerOnChannelActive(@Mock(answer = RETURNS_DEEP_STUBS) final ChannelFuture channelFuture, + @Mock(answer = RETURNS_DEEP_STUBS) final DatagramChannel datagramChannel) { + when(bootstrapSupplier.get()).thenReturn(bootstrap); + when(channelFuture.isSuccess()).thenReturn(true); + when(channelFuture.channel()).thenReturn(datagramChannel); + when(bootstrap.group(any()).channel(any()).handler(any()).bind(anyString(), anyInt()).addListener(any())).then(invocation -> { + final ChannelFutureListener listener = invocation.getArgument(0, ChannelFutureListener.class); + listener.operationComplete(channelFuture); + return null; + }); + when(datagramChannel.localAddress()).thenReturn(new InetSocketAddress(22527)); + when(datagramChannel.joinGroup(any(InetSocketAddress.class), any(NetworkInterface.class)).addListener(any())).then(invocation -> { + final ChannelFutureListener listener = invocation.getArgument(0, ChannelFutureListener.class); + listener.operationComplete(null); + return null; + }); + + final UdpBroadcastServer handler = new UdpBroadcastServer(nodes, bootstrapSupplier, null); + final EmbeddedChannel channel = new EmbeddedChannel(handler); + try { + verify(nodes).add(channel.pipeline().context(handler)); + verify(bootstrap.group(any()).channel(any()).handler(any()), times(2)).bind(anyString(), anyInt()); + } + finally { + channel.close(); + } + } + } + + @Nested + class StopServer { + @Test + void shouldStopServerOnChannelInactive() { + when(nodes.isEmpty()).thenReturn(true); + + final UdpBroadcastServer handler = new UdpBroadcastServer(nodes, bootstrapSupplier, channel); + final EmbeddedChannel channel = new EmbeddedChannel(handler); + try { + channel.pipeline().fireChannelInactive(); + + verify(nodes).remove(channel.pipeline().context(handler)); + verify(UdpBroadcastServerTest.this.channel).close(); + } + finally { + channel.close(); + } + } + } + + @Nested + class MessagePassing { + @Test + @SuppressWarnings("unchecked") + void shouldPassIngoingMessagesToAllPipelines(@Mock final ChannelHandlerContext channelCtx, + @Mock(answer = RETURNS_DEEP_STUBS) final ChannelHandlerContext ctx, + @Mock final EventExecutor eventExecutor) { + when(bootstrapSupplier.get()).thenReturn(bootstrap); + when(bootstrap.group(any()).channel(any()).handler(any())).then((Answer) invocation -> { + final SimpleChannelInboundHandler handler = invocation.getArgument(0, SimpleChannelInboundHandler.class); + handler.channelRead(channelCtx, new DatagramPacket(Unpooled.EMPTY_BUFFER, new InetSocketAddress(22527), new InetSocketAddress(25421))); + return bootstrap; + }); + when(ctx.executor()).thenReturn(eventExecutor); + doAnswer((Answer) invocation -> { + invocation.getArgument(0, Runnable.class).run(); + return null; + }).when(eventExecutor).execute(any()); + + final Set nodes = new HashSet<>(Set.of(ctx)); + final UdpBroadcastServer handler = new UdpBroadcastServer(nodes, bootstrapSupplier, null); + final EmbeddedChannel channel = new EmbeddedChannel(handler); + try { + verify(ctx).fireChannelRead(any()); + } + finally { + channel.releaseInbound(); + channel.close(); + } + } + } +}