Commit df770905 authored by bornholdt's avatar bornholdt
Browse files

Merge branch 'master' into throtteling

# Conflicts:
#	drasyl-core/src/main/java/org/drasyl/remote/handler/UdpServer.java
parents 17b236a0 cfe666f2
Pipeline #46277 failed with stages
in 3 minutes and 9 seconds
......@@ -126,11 +126,12 @@ production:
variables:
APP_IMAGE: "${CI_REGISTRY_IMAGE}:latest"
APP_DEPLOY_HOST: "production.${APP_HOST}"
SENTRY_ENVIRONMENT: production
PORT: 22527
DRASYL_NETWORK_ID: 1
DRASYL_PROOF_OF_WORK: 10992904
DRASYL_PUBLIC_KEY: 025fff6f625f5dee816d9f8fe43895479aecfda187cb6a3330894a07e698bc5bd8
# DRASYL_PRIVATE_KEY is defined securely in GitLab CI/CD Settings
SENTRY_ENVIRONMENT: production
environment:
name: production
url: https://$APP_HOST
......@@ -150,6 +151,7 @@ stop_production:
GIT_STRATEGY: none
APP_IMAGE: "${CI_REGISTRY_IMAGE}:latest"
APP_DEPLOY_HOST: "production.${APP_HOST}"
PORT: 22527
SENTRY_ENVIRONMENT: production
DRASYL_NETWORK_ID: 1
DRASYL_PROOF_OF_WORK: 10992904
......
......@@ -13,7 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
-
### Changed
-
- By default, each node now listens on a port in the range 22528 and 65528, which is derived from its identity.
- UDP is now used instead of TCP for communication with remote peers.
-
-
......
......@@ -160,7 +160,7 @@ public abstract class DrasylNode {
this.endpoints = new CopyOnWriteArraySet<>();
this.acceptNewConnections = new AtomicBoolean();
this.started = new AtomicBoolean();
this.pipeline = new DrasylPipeline(this::onEvent, this.config, identity, peersManager, started, LazyBossGroupHolder.INSTANCE, endpoints);
this.pipeline = new DrasylPipeline(this::onEvent, this.config, identity, peersManager, started, LazyBossGroupHolder.INSTANCE);
this.pluginManager = new PluginManager(config, identity, pipeline);
this.startSequence = new CompletableFuture<>();
this.shutdownSequence = completedFuture(null);
......@@ -342,14 +342,14 @@ public abstract class DrasylNode {
*/
public CompletableFuture<Void> shutdown() {
if (startSequence.isDone() && !startSequence.isCompletedExceptionally() && started.compareAndSet(true, false)) {
onInternalEvent(new NodeDownEvent(Node.of(identity, endpoints)));
onInternalEvent(new NodeDownEvent(Node.of(identity)));
LOG.info("Shutdown drasyl Node with Identity '{}'...", identity);
shutdownSequence = new CompletableFuture<>();
pluginManager.beforeShutdown();
startSequence.whenComplete((t, exp) -> getInstanceHeavy().scheduleDirect(() -> {
rejectNewConnections();
onInternalEvent(new NodeNormalTerminationEvent(Node.of(identity, endpoints))).join();
onInternalEvent(new NodeNormalTerminationEvent(Node.of(identity))).join();
LOG.info("drasyl Node with Identity '{}' has shut down", identity);
pluginManager.afterShutdown();
......@@ -389,7 +389,7 @@ public abstract class DrasylNode {
pluginManager.beforeStart();
acceptNewConnections();
try {
onInternalEvent(new NodeUpEvent(Node.of(identity, endpoints))).get();
onInternalEvent(new NodeUpEvent(Node.of(identity))).get();
LOG.info("drasyl Node with Identity '{}' has started", identity);
startSequence.complete(null);
pluginManager.afterStart();
......@@ -400,7 +400,7 @@ public abstract class DrasylNode {
catch (final ExecutionException e) {
LOG.warn("Could not start drasyl Node: {}", e.getCause().getMessage());
pluginManager.beforeShutdown();
onInternalEvent(new NodeUnrecoverableErrorEvent(Node.of(identity, endpoints), e.getCause())).join();
onInternalEvent(new NodeUnrecoverableErrorEvent(Node.of(identity), e.getCause())).join();
pluginManager.afterShutdown();
INSTANCES.remove(DrasylNode.this);
startSequence.completeExceptionally(e);
......
......@@ -22,6 +22,8 @@ import org.drasyl.identity.CompressedPublicKey;
import java.util.Objects;
import static java.util.Objects.requireNonNull;
/**
* This event signals that the node has received a message addressed to it.
* <p>
......@@ -36,10 +38,11 @@ public class MessageEvent implements Event {
*
* @param sender the message's sender
* @param payload content of the message
* @throws NullPointerException if {@code sender} or {@code payload} is {@code null}
*/
public MessageEvent(final CompressedPublicKey sender, final Object payload) {
this.sender = sender;
this.payload = payload;
this.sender = requireNonNull(sender);
this.payload = requireNonNull(payload);
}
/**
......
......@@ -19,10 +19,10 @@
package org.drasyl.event;
import org.drasyl.identity.Identity;
import org.drasyl.peer.Endpoint;
import java.util.Objects;
import java.util.Set;
import static java.util.Objects.requireNonNull;
/**
* Used by {@link Event} to describe an event related to the local Node (e.g. {@link NodeUpEvent},
......@@ -32,16 +32,19 @@ import java.util.Set;
*/
public class Node {
private final Identity identity;
private final Set<Endpoint> endpoints;
private final int port;
Node(final Identity identity, final Set<Endpoint> endpoints) {
this.identity = identity;
this.endpoints = endpoints;
Node(final Identity identity, final int port) {
this.identity = requireNonNull(identity);
if (port < 0) {
throw new IllegalArgumentException("port must be non-negative.");
}
this.port = port;
}
@Override
public int hashCode() {
return Objects.hash(identity, endpoints);
return Objects.hash(identity, port);
}
@Override
......@@ -54,14 +57,14 @@ public class Node {
}
final Node node = (Node) o;
return Objects.equals(identity, node.identity) &&
Objects.equals(endpoints, node.endpoints);
port == port;
}
@Override
public String toString() {
return "Node{" +
"identity=" + identity +
", endpoints=" + endpoints +
", port=" + port +
'}';
}
......@@ -75,19 +78,26 @@ public class Node {
}
/**
* Returns the node's endpoints.
* Returns the node's server port.
*
* @return the node's endpoints
* @return the node's server port
*/
public Set<Endpoint> getEndpoints() {
return endpoints;
public int getPort() {
return port;
}
/**
* @throws NullPointerException if {@code identity} is {@code null}
*/
public static Node of(final Identity identity) {
return of(identity, Set.of());
return of(identity, 0);
}
public static Node of(final Identity identity, final Set<Endpoint> endpoints) {
return new Node(identity, Set.copyOf(endpoints));
/**
* @throws NullPointerException if {@code identity} is {@code null}
* @throws IllegalArgumentException if {@code port} is negative
*/
public static Node of(final Identity identity, final int port) {
return new Node(identity, port);
}
}
\ No newline at end of file
......@@ -25,6 +25,9 @@ package org.drasyl.event;
* This is an immutable object.
*/
public class NodeDownEvent extends AbstractNodeEvent {
/**
* @throws NullPointerException if {@code node} is {@code null}
*/
public NodeDownEvent(final Node node) {
super(node);
}
......
......@@ -25,6 +25,9 @@ package org.drasyl.event;
* This is an immutable object.
*/
public class NodeNormalTerminationEvent extends AbstractNodeEvent {
/**
* @throws NullPointerException if {@code node} is {@code null}
*/
public NodeNormalTerminationEvent(final Node node) {
super(node);
}
......
......@@ -25,6 +25,9 @@ package org.drasyl.event;
* This is an immutable object.
*/
public class NodeOfflineEvent extends AbstractNodeEvent {
/**
* @throws NullPointerException if {@code node} is {@code null}
*/
public NodeOfflineEvent(final Node node) {
super(node);
}
......
......@@ -27,6 +27,9 @@ package org.drasyl.event;
* This is an immutable object.
*/
public class NodeOnlineEvent extends AbstractNodeEvent {
/**
* @throws NullPointerException if {@code node} is {@code null}
*/
public NodeOnlineEvent(final Node node) {
super(node);
}
......
......@@ -21,6 +21,8 @@ package org.drasyl.event;
import java.util.Objects;
import static java.util.Objects.requireNonNull;
/**
* This events signals that the node encountered an unrecoverable error.
* <p>
......@@ -29,9 +31,12 @@ import java.util.Objects;
public class NodeUnrecoverableErrorEvent extends AbstractNodeEvent {
private final Throwable error;
/**
* @throws NullPointerException if {@code node} or {@code error} is {@code null}
*/
public NodeUnrecoverableErrorEvent(final Node node, final Throwable error) {
super(node);
this.error = error;
this.error = requireNonNull(error);
}
/**
......
......@@ -25,6 +25,9 @@ package org.drasyl.event;
* This is an immutable object.
*/
public class NodeUpEvent extends AbstractNodeEvent {
/**
* @throws NullPointerException if {@code node} is {@code null}
*/
public NodeUpEvent(final Node node) {
super(node);
}
......
......@@ -22,6 +22,8 @@ import org.drasyl.identity.CompressedPublicKey;
import java.util.Objects;
import static java.util.Objects.requireNonNull;
/**
* Used by {@link Event} to describe an event related to a Peer (e.g. {@link PeerRelayEvent}, {@link
* PeerDirectEvent}).
......@@ -32,7 +34,7 @@ public class Peer {
private final CompressedPublicKey publicKey;
Peer(final CompressedPublicKey publicKey) {
this.publicKey = publicKey;
this.publicKey = requireNonNull(publicKey);
}
/**
......@@ -68,6 +70,9 @@ public class Peer {
'}';
}
/**
* @throws NullPointerException if {@code publicKey} is {@code null}
*/
public static Peer of(final CompressedPublicKey publicKey) {
return new Peer(publicKey);
}
......
......@@ -25,6 +25,9 @@ package org.drasyl.event;
* This is an immutable object.
*/
public class PeerDirectEvent extends AbstractPeerEvent {
/**
* @throws NullPointerException if {@code peer} is {@code null}
*/
public PeerDirectEvent(final Peer peer) {
super(peer);
}
......
......@@ -25,6 +25,9 @@ package org.drasyl.event;
* This is an immutable object.
*/
public class PeerRelayEvent extends AbstractPeerEvent {
/**
* @throws NullPointerException if {@code peer} is {@code null}
*/
public PeerRelayEvent(final Peer peer) {
super(peer);
}
......
......@@ -25,6 +25,7 @@ import org.drasyl.event.NodeUpEvent;
import org.drasyl.identity.CompressedPublicKey;
import org.drasyl.peer.PeerInformation;
import org.drasyl.pipeline.HandlerContext;
import org.drasyl.pipeline.address.Address;
import org.drasyl.pipeline.message.ApplicationMessage;
import org.drasyl.pipeline.skeleton.SimpleDuplexHandler;
import org.drasyl.util.Pair;
......@@ -43,7 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
* Inspired by: https://github.com/actoron/jadex/blob/10e464b230d7695dfd9bf2b36f736f93d69ee314/platform/base/src/main/java/jadex/platform/service/awareness/IntraVMAwarenessAgent.java
*/
@SuppressWarnings({ "java:S110" })
public class IntraVmDiscovery extends SimpleDuplexHandler<ApplicationMessage, ApplicationMessage, CompressedPublicKey> {
public class IntraVmDiscovery extends SimpleDuplexHandler<ApplicationMessage, ApplicationMessage, Address> {
public static final IntraVmDiscovery INSTANCE = new IntraVmDiscovery();
public static final String INTRA_VM_DISCOVERY = "INTRA_VM_DISCOVERY";
private static final Logger LOG = LoggerFactory.getLogger(IntraVmDiscovery.class);
......@@ -79,7 +80,7 @@ public class IntraVmDiscovery extends SimpleDuplexHandler<ApplicationMessage, Ap
@Override
protected void matchedRead(final HandlerContext ctx,
final CompressedPublicKey sender,
final Address sender,
final ApplicationMessage msg,
final CompletableFuture<Void> future) {
final CompressedPublicKey recipient = msg.getRecipient();
......@@ -102,7 +103,7 @@ public class IntraVmDiscovery extends SimpleDuplexHandler<ApplicationMessage, Ap
@Override
protected void matchedWrite(final HandlerContext ctx,
final CompressedPublicKey recipient,
final Address recipient,
final ApplicationMessage msg,
final CompletableFuture<Void> future) {
final HandlerContext discoveree = discoveries.get(Pair.of(ctx.config().getNetworkId(), msg.getRecipient()));
......
......@@ -27,7 +27,6 @@ import org.drasyl.event.NodeDownEvent;
import org.drasyl.event.NodeUnrecoverableErrorEvent;
import org.drasyl.event.NodeUpEvent;
import org.drasyl.identity.CompressedPublicKey;
import org.drasyl.peer.Endpoint;
import org.drasyl.peer.PeerInformation;
import org.drasyl.peer.PeersManager;
import org.drasyl.pipeline.HandlerContext;
......@@ -43,7 +42,6 @@ import java.nio.file.FileSystem;
import java.nio.file.Path;
import java.nio.file.WatchService;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
......@@ -78,7 +76,6 @@ public class LocalHostDiscovery extends SimpleOutboundHandler<Object, Address> {
private final Path discoveryPath;
private final Duration leaseTime;
private final CompressedPublicKey ownPublicKey;
private final Set<Endpoint> endpoints;
private final AtomicBoolean doScan;
private final Scheduler scheduler;
private Disposable watchDisposable;
......@@ -87,13 +84,11 @@ public class LocalHostDiscovery extends SimpleOutboundHandler<Object, Address> {
private PeerInformation postedPeerInformation;
public LocalHostDiscovery(final DrasylConfig config,
final CompressedPublicKey ownPublicKey,
final Set<Endpoint> endpoints) {
final CompressedPublicKey ownPublicKey) {
this(
config.getLocalHostDiscoveryPath().resolve(String.valueOf(config.getNetworkId())),
config.getLocalHostDiscoveryLeaseTime(),
ownPublicKey,
endpoints,
new AtomicBoolean(),
DrasylScheduler.getInstanceLight(),
null,
......@@ -105,7 +100,6 @@ public class LocalHostDiscovery extends SimpleOutboundHandler<Object, Address> {
LocalHostDiscovery(final Path discoveryPath,
final Duration leaseTime,
final CompressedPublicKey ownPublicKey,
final Set<Endpoint> endpoints,
final AtomicBoolean doScan,
final Scheduler scheduler,
final Disposable watchDisposable,
......@@ -113,7 +107,6 @@ public class LocalHostDiscovery extends SimpleOutboundHandler<Object, Address> {
this.discoveryPath = discoveryPath;
this.leaseTime = leaseTime;
this.ownPublicKey = ownPublicKey;
this.endpoints = endpoints;
this.doScan = doScan;
this.scheduler = scheduler;
this.watchDisposable = watchDisposable;
......@@ -125,7 +118,7 @@ public class LocalHostDiscovery extends SimpleOutboundHandler<Object, Address> {
final Event event,
final CompletableFuture<Void> future) {
if (event instanceof NodeUpEvent) {
startDiscovery(ctx);
startDiscovery(ctx, ((NodeUpEvent) event).getNode().getPort());
}
else if (event instanceof NodeUnrecoverableErrorEvent || event instanceof NodeDownEvent) {
stopDiscovery();
......@@ -147,7 +140,7 @@ public class LocalHostDiscovery extends SimpleOutboundHandler<Object, Address> {
ctx.write(recipient, msg, future);
}
private synchronized void startDiscovery(final HandlerContext ctx) {
private synchronized void startDiscovery(final HandlerContext ctx, final int port) {
LOG.debug("Start Local Host Discovery...");
final File directory = discoveryPath.toFile();
if (!directory.exists() && !directory.mkdirs()) {
......@@ -159,7 +152,7 @@ public class LocalHostDiscovery extends SimpleOutboundHandler<Object, Address> {
else {
tryWatchDirectory();
scan(ctx);
keepOwnInformationUpToDate();
keepOwnInformationUpToDate(port);
}
LOG.debug("Local Host Discovery started.");
}
......@@ -204,7 +197,7 @@ public class LocalHostDiscovery extends SimpleOutboundHandler<Object, Address> {
/**
* Writes periodically the actual own information to {@link #discoveryPath}.
*/
private void keepOwnInformationUpToDate() {
private void keepOwnInformationUpToDate(final int port) {
final Duration refreshInterval;
if (leaseTime.toSeconds() > 5) {
refreshInterval = leaseTime.minus(ofSeconds(5));
......@@ -217,7 +210,7 @@ public class LocalHostDiscovery extends SimpleOutboundHandler<Object, Address> {
if (watchService == null) {
doScan.set(true);
}
postInformation();
postInformation(port);
}, 0, refreshInterval.toMillis(), MILLISECONDS);
}
......@@ -252,8 +245,8 @@ public class LocalHostDiscovery extends SimpleOutboundHandler<Object, Address> {
/**
* Posts own {@link PeerInformation} to {@link #discoveryPath}.
*/
private void postInformation() {
final PeerInformation peerInformation = PeerInformation.of(endpoints);
private void postInformation(final int port) {
final PeerInformation peerInformation = PeerInformation.of(); // TODO: add port here
final Path filePath = discoveryPath.resolve(ownPublicKey.toString() + ".json");
LOG.trace("Post own Peer Information to {}", filePath);
......
......@@ -18,9 +18,9 @@
*/
package org.drasyl.loopback.handler;
import org.drasyl.identity.CompressedPublicKey;
import org.drasyl.pipeline.HandlerContext;
import org.drasyl.pipeline.Stateless;
import org.drasyl.pipeline.address.Address;
import org.drasyl.pipeline.message.ApplicationMessage;
import org.drasyl.pipeline.skeleton.SimpleOutboundHandler;
......@@ -30,7 +30,7 @@ import java.util.concurrent.CompletableFuture;
* This handler processes outbound messages addressed to the local node.
*/
@Stateless
public class LoopbackOutboundMessageSinkHandler extends SimpleOutboundHandler<ApplicationMessage, CompressedPublicKey> {
public class LoopbackOutboundMessageSinkHandler extends SimpleOutboundHandler<ApplicationMessage, Address> {
public static final LoopbackOutboundMessageSinkHandler INSTANCE = new LoopbackOutboundMessageSinkHandler();
public static final String LOOPBACK_OUTBOUND_MESSAGE_SINK_HANDLER = "LOOPBACK_OUTBOUND_MESSAGE_SINK_HANDLER";
......@@ -39,10 +39,10 @@ public class LoopbackOutboundMessageSinkHandler extends SimpleOutboundHandler<Ap
@Override
protected void matchedWrite(final HandlerContext ctx,
final CompressedPublicKey recipient,
final Address recipient,
final ApplicationMessage msg,
final CompletableFuture<Void> future) {
if (ctx.identity().getPublicKey().equals(recipient)) {
if (ctx.identity().getPublicKey().equals(msg.getRecipient())) {
ctx.fireRead(msg.getSender(), msg, future);
}
else {
......
......@@ -28,11 +28,8 @@ import org.drasyl.localhost.LocalHostDiscovery;
import org.drasyl.loopback.handler.LoopbackInboundMessageSinkHandler;
import org.drasyl.loopback.handler.LoopbackOutboundMessageSinkHandler;
import org.drasyl.monitoring.Monitoring;
import org.drasyl.peer.Endpoint;
import org.drasyl.peer.PeersManager;
import org.drasyl.pipeline.codec.ApplicationMessage2ObjectHolderHandler;
import org.drasyl.pipeline.codec.DefaultCodec;
import org.drasyl.pipeline.codec.ObjectHolder2ApplicationMessageHandler;
import org.drasyl.pipeline.codec.TypeValidator;
import org.drasyl.remote.handler.ByteBuf2MessageHandler;
import org.drasyl.remote.handler.ChunkingHandler;
......@@ -46,7 +43,6 @@ import org.drasyl.remote.handler.UdpServer;
import org.drasyl.util.DrasylScheduler;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
......@@ -56,9 +52,7 @@ import static org.drasyl.localhost.LocalHostDiscovery.LOCAL_HOST_DISCOVERY;
import static org.drasyl.loopback.handler.LoopbackInboundMessageSinkHandler.LOOPBACK_INBOUND_MESSAGE_SINK_HANDLER;
import static org.drasyl.loopback.handler.LoopbackOutboundMessageSinkHandler.LOOPBACK_OUTBOUND_MESSAGE_SINK_HANDLER;
import static org.drasyl.monitoring.Monitoring.MONITORING_HANDLER;
import static org.drasyl.pipeline.codec.ApplicationMessage2ObjectHolderHandler.APP_MSG2OBJECT_HOLDER;
import static org.drasyl.pipeline.codec.DefaultCodec.DEFAULT_CODEC;
import static org.drasyl.pipeline.codec.ObjectHolder2ApplicationMessageHandler.OBJECT_HOLDER2APP_MSG;
import static org.drasyl.remote.handler.ByteBuf2MessageHandler.BYTE_BUF_2_MESSAGE_HANDLER;
import static org.drasyl.remote.handler.ChunkingHandler.CHUNKING_HANDLER;
import static org.drasyl.remote.handler.HopCountGuard.HOP_COUNT_GUARD;
......@@ -79,8 +73,7 @@ public class DrasylPipeline extends DefaultPipeline {
final Identity identity,
final PeersManager peersManager,
final AtomicBoolean started,
final EventLoopGroup bossGroup,
final Set<Endpoint> endpoints) {
final EventLoopGroup bossGroup) {
this.handlerNames = new ConcurrentHashMap<>();
this.inboundValidator = TypeValidator.ofInboundValidator(config);
this.outboundValidator = TypeValidator.ofOutboundValidator(config);
......@@ -95,15 +88,13 @@ public class DrasylPipeline extends DefaultPipeline {
// add default codec
addFirst(DEFAULT_CODEC, DefaultCodec.INSTANCE);
addFirst(APP_MSG2OBJECT_HOLDER, ApplicationMessage2ObjectHolderHandler.INSTANCE);
addFirst(OBJECT_HOLDER2APP_MSG, ObjectHolder2ApplicationMessageHandler.INSTANCE);
// local message delivery
addFirst(LOOPBACK_INBOUND_MESSAGE_SINK_HANDLER, new LoopbackInboundMessageSinkHandler(started));
addFirst(LOOPBACK_OUTBOUND_MESSAGE_SINK_HANDLER, LoopbackOutboundMessageSinkHandler.INSTANCE);
if (config.isLocalHostDiscoveryEnabled()) {
addFirst(LOCAL_HOST_DISCOVERY, new LocalHostDiscovery(config, identity.getPublicKey(), endpoints));
addFirst(LOCAL_HOST_DISCOVERY, new LocalHostDiscovery(config, identity.getPublicKey()));
}
// we trust peers within the same jvm. therefore we do not use signatures
......
......@@ -88,7 +88,7 @@ class TailContext extends AbstractEndHandler {
LOG.trace("Event has passed the pipeline: `{}` ", event);
}
else {
LOG.debug("Message '{}' was not written to the application, because the corresponding address was not of type CompressedPublicKey.", msg);
LOG.debug("Message '{}' was not written to the application, because the corresponding address was not of type {} (was type {}).", () -> msg, CompressedPublicKey.class::getSimpleName, sender.getClass()::getSimpleName);
}
}
......
/*
* Copyright (c) 2020.
*
* This file is part of drasyl.
*
* drasyl is free software: you can redistribute it and/or modify
* it under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* drasyl is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with drasyl. If not, see <http://www.gnu.org/licenses/>.
*/
package org.drasyl.pipeline.codec;
import org.drasyl.pipeline.HandlerContext;
import org.drasyl.pipeline.Stateless;
import org.drasyl.pipeline.address.Address;
import org.drasyl.pipeline.message.ApplicationMessage;
import org.drasyl.pipeline.skeleton.SimpleInboundHandler;
import java.util.concurrent.CompletableFuture;