Commit 742ac7d7 authored by 7degener's avatar 7degener 🥃
Browse files

Add WotThingsProducer & WotMeasurementsProducer to collect data from exposed things

parent 58df891a
Loading
Loading
Loading
Loading
+8 −0
Original line number Diff line number Diff line
@@ -10,6 +10,7 @@ public class CollectorConfig {
    private ProducerConfig sample;
    private ProducerConfig klimabotschafter;
    private ProducerConfig luftdaten;
    private WotProducerConfig wot;

    public ProducerConfig getSample() {
        return sample;
@@ -35,4 +36,11 @@ public class CollectorConfig {
        this.luftdaten = luftdaten;
    }

    public WotProducerConfig getWot() {
        return wot;
    }

    public void setWot(WotProducerConfig wot) {
        this.wot = wot;
    }
}
+26 −0
Original line number Diff line number Diff line
package city.sane.collector.config;

import java.util.Collections;
import java.util.List;

public class WotProducerConfig extends ProducerConfig {

    private List<String> whitelist = Collections.emptyList();
    private List<String> blacklist = Collections.emptyList();

    public List<String> getWhitelist() {
        return whitelist;
    }

    public void setWhitelist(List<String> whitelist) {
        this.whitelist = whitelist;
    }

    public List<String> getBlacklist() {
        return blacklist;
    }

    public void setBlacklist(List<String> blacklist) {
        this.blacklist = blacklist;
    }
}
+54 −0
Original line number Diff line number Diff line
package city.sane.collector.producer;

import city.sane.collector.model.Measurement;
import city.sane.wot.thing.ConsumedThing;
import city.sane.wot.thing.property.ConsumedThingProperty;
import org.apache.commons.collections4.map.LRUMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Map;
import java.util.concurrent.ExecutionException;

public class WotMeasurementsProducer extends AbstractMeasurementsProducer<ConsumedThing> {

    private static final Logger LOG = LoggerFactory.getLogger(WotMeasurementsProducer.class);

    private final LRUMap<String, OffsetDateTime> timestampMap = new LRUMap<>(500);

    @Override
    public void accept(ConsumedThing consumedThing) {
        try {
            String thingId = consumedThing.getId();

            Map<String, Object> properties = consumedThing.readProperties().get();

            final String timestamp = (String) properties.remove("timestamp");
            final OffsetDateTime time = OffsetDateTime.parse(timestamp, DateTimeFormatter.ISO_OFFSET_DATE_TIME);

            if (time.isAfter(timestampMap.getOrDefault(thingId, OffsetDateTime.MIN))) {
                timestampMap.put(thingId, time);
            } else {
                LOG.info("Thing '{}' timestamp hasn't changed -> skip.", thingId);
                return;
            }

            final double latitude = (double) properties.remove("latitude");
            final double longitude = (double) properties.remove("longitude");

            properties.forEach((key, value) -> {
                ConsumedThingProperty thingProperty = consumedThing.getProperty(key);
                String expandedType = consumedThing.getExpandedObjectType(thingProperty.getObjectType());

                subject.next(new Measurement(thingId, time, latitude, longitude, expandedType, value));
            });

        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            LOG.error("Failed to read properties: ", e);
        }
    }
}
+62 −0
Original line number Diff line number Diff line
package city.sane.collector.producer;

import city.sane.wot.Wot;
import city.sane.wot.thing.ConsumedThing;
import city.sane.wot.thing.Thing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class WotThingsProducer extends AbstractThingsProducer<ConsumedThing> {

    private static final Logger LOG = LoggerFactory.getLogger(WotThingsProducer.class);

    private final Wot wot;

    private final List<String> blacklist = new ArrayList<>();
    private final List<String> whitelist = new ArrayList<>();

    public WotThingsProducer(Wot wot) {
        this.wot = wot;
    }

    public void setBlacklist(List<String> blacklist) {
        this.blacklist.clear();
        this.blacklist.addAll(blacklist);
    }

    public void setWhitelist(List<String> whitelist) {
        this.whitelist.clear();
        this.whitelist.addAll(whitelist);
    }

    @Override
    protected void onProduce() {
        LOG.info("Discover Things...");

        try {
            Collection<Thing> things = wot.discover().get();
            things.stream()
                    // Ignore things without properties
                    .filter(t -> !t.getProperties().isEmpty())
                    // White- & blacklist
                    .filter(t -> whitelist.isEmpty() || whitelist.contains(t.getId()))
                    .filter(t -> blacklist.isEmpty() || !blacklist.contains(t.getId()))
                    // Take only things with locations & timestamp
                    .filter(t -> t.getProperty("latitude") != null)
                    .filter(t -> t.getProperty("longitude") != null)
                    .filter(t -> t.getProperty("timestamp") != null)
                    .map(wot::consume)
                    .forEach(subject::next);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            LOG.error("Failed: ", e);
        }
    }

}
+18 −0
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@ package city.sane.collector.service;
import city.sane.apithing.sample.WeatherSensor;
import city.sane.collector.config.CollectorConfig;
import city.sane.collector.config.ProducerConfig;
import city.sane.collector.config.WotProducerConfig;
import city.sane.collector.model.Measurement;
import city.sane.collector.producer.*;
import city.sane.wot.Wot;
@@ -30,11 +31,28 @@ public class CollectorService implements Consumer<Measurement> {
        this.wot = wot;
        this.measurementService = measurementService;

        startWotThingsProducer(config.getWot(), clientWot);
        startSampleProducer(config.getSample());
        startKlimabotschafterProducer(config.getKlimabotschafter());
        startLuftdatenProducer(config.getLuftdaten());
    }

    private void startWotThingsProducer(WotProducerConfig config, Optional<Wot> clientWot) {
        if (clientWot.isPresent() && config.isEnabled()) {

            LOG.info("Starting Wot producer...");

            WotMeasurementsProducer wotMeasurementsProducer = new WotMeasurementsProducer();
            wotMeasurementsProducer.subscribe(this);

            WotThingsProducer wotThingsProducer = new WotThingsProducer(clientWot.get());
            wotThingsProducer.setBlacklist(config.getBlacklist());
            wotThingsProducer.setWhitelist(config.getWhitelist());
            wotThingsProducer.subscribe(wotMeasurementsProducer);
            wotThingsProducer.produce();
        }
    }

    protected void startKlimabotschafterProducer(ProducerConfig config) {
        if (!config.isEnabled()) {
            return;
Loading