From 95e549b8f30473f58e683bf7df9a0dfd1ec278d8 Mon Sep 17 00:00:00 2001 From: Alex Selimov Date: Wed, 2 Apr 2025 15:37:30 -0400 Subject: [PATCH] WIP basic architecture done. Need to fix broken LinearRegressionTests --- pom.xml | 110 +++++++++++++ .../java/com/aselimov/app/SteadyDetect.java | 78 +++++++++ .../aselimov/app/socket/SocketListener.java | 154 ++++++++++++++++++ .../LinearRegression.java | 79 +++++++++ .../SteadyStateCalculator.java | 30 ++++ .../com/aselimov/app/timedata/TimeData.java | 94 +++++++++++ .../aselimov/app/utilities/Statistics.java | 60 +++++++ src/test/java/com/aselimov/app/AppTest.java | 19 +++ .../LinearRegressionTest.java | 77 +++++++++ .../app/utilities/StatisticsTest.java | 58 +++++++ 10 files changed, 759 insertions(+) create mode 100644 pom.xml create mode 100644 src/main/java/com/aselimov/app/SteadyDetect.java create mode 100644 src/main/java/com/aselimov/app/socket/SocketListener.java create mode 100644 src/main/java/com/aselimov/app/steady_state_analysis/LinearRegression.java create mode 100644 src/main/java/com/aselimov/app/steady_state_analysis/SteadyStateCalculator.java create mode 100644 src/main/java/com/aselimov/app/timedata/TimeData.java create mode 100644 src/main/java/com/aselimov/app/utilities/Statistics.java create mode 100644 src/test/java/com/aselimov/app/AppTest.java create mode 100644 src/test/java/com/aselimov/app/steady_state_analysis/LinearRegressionTest.java create mode 100644 src/test/java/com/aselimov/app/utilities/StatisticsTest.java diff --git a/pom.xml b/pom.xml new file mode 100644 index 0000000..2a0463f --- /dev/null +++ b/pom.xml @@ -0,0 +1,110 @@ + + + 4.0.0 + + com.aselimov.app + SteadyState-Detector + 1.0-SNAPSHOT + + SteadyState-Detector + + http://www.example.com + + + UTF-8 + 17 + com.aselimov.app.SteadyDetect + + + + + + org.junit + junit-bom + 5.11.0 + pom + import + + + + + + + org.junit.jupiter + junit-jupiter-api + test + + + + org.junit.jupiter + junit-jupiter-params + test + + + + com.fasterxml.jackson.core + jackson-core + 2.15.2 + + + + + com.fasterxml.jackson.core + jackson-databind + 2.15.2 + + + + args4j + args4j + 2.37 + + + + + + + + + maven-clean-plugin + 3.4.0 + + + + maven-resources-plugin + 3.3.1 + + + maven-compiler-plugin + 3.13.0 + + + maven-surefire-plugin + 3.3.0 + + + maven-jar-plugin + 3.4.2 + + + maven-install-plugin + 3.1.2 + + + maven-deploy-plugin + 3.1.2 + + + + maven-site-plugin + 3.12.1 + + + maven-project-info-reports-plugin + 3.6.1 + + + + + diff --git a/src/main/java/com/aselimov/app/SteadyDetect.java b/src/main/java/com/aselimov/app/SteadyDetect.java new file mode 100644 index 0000000..ac01bf4 --- /dev/null +++ b/src/main/java/com/aselimov/app/SteadyDetect.java @@ -0,0 +1,78 @@ +package com.aselimov.app; + +import static org.kohsuke.args4j.ExampleMode.ALL; + +import java.io.IOException; + +import org.kohsuke.args4j.Argument; +import org.kohsuke.args4j.CmdLineException; +import org.kohsuke.args4j.CmdLineParser; + +import com.aselimov.app.socket.SocketListener; +import com.aselimov.app.steady_state_analysis.LinearRegression; +import com.aselimov.app.steady_state_analysis.SteadyStateCalculator; +import com.aselimov.app.timedata.TimeData; + +public class SteadyDetect { + /** + * Main function accepts 1 argument which is the port that should be bound. + * + */ + public static void main(String[] args) throws IOException { + new SteadyDetect().doMain(args); + } + + private SocketListener socketListener; + private TimeData data = new TimeData(); + + private SteadyStateCalculator steadyStateCalc; + + @Argument + private Integer port = null; + + @Argument + private Double slopeTol = null; + + @Argument + private Integer numNeededMatching = null; + + @Argument + private Integer windowSize = null; + + public void doMain(String[] args) throws IOException { + + CmdLineParser parser = new CmdLineParser(this); + parser.setUsageWidth(80); + try { + parser.parseArgument(args); + + if (port == null) + throw new CmdLineException(parser, "No argument is given"); + + } catch (CmdLineException e) { + // if there's a problem in the command line, + // you'll get this exception. this will report + // an error message. + System.err.println(e.getMessage()); + System.err.println("java SteadyDetect [options...] port"); + // print the list of available options + parser.printUsage(System.err); + System.err.println(); + + // print option sample. This is useful some time + System.err.println(" Example: java SteadyDetect" + parser.printExample(ALL)); + + return; + } + + System.out.println("Binding SteadyDetect to port " + port); + socketListener = new SocketListener(port, TimeData.TimeDataPoint.class, data); + socketListener.start(); + + // Initialize the SteadyStateCalc to the LinearRegression calculator + steadyStateCalc = new LinearRegression(slopeTol, numNeededMatching, windowSize); + while (true) { + steadyStateCalc.printIfSteady(data); + } + } +} diff --git a/src/main/java/com/aselimov/app/socket/SocketListener.java b/src/main/java/com/aselimov/app/socket/SocketListener.java new file mode 100644 index 0000000..c5d1daa --- /dev/null +++ b/src/main/java/com/aselimov/app/socket/SocketListener.java @@ -0,0 +1,154 @@ +package com.aselimov.app.socket; + +import java.io.*; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonParser; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + +/** + * Listen for data over a socket + * + */ +public class SocketListener { + + private final int port; + private final Class dataClass; + private final ObjectMapper objectMapper; + private final Consumer dataConsumer; + private ServerSocket serverSocket; + private boolean running = false; + private ExecutorService executorService; + + /** + * Creates a new StreamingSocketJsonListener. + * + * @param port The port to listen on + * @param targetClass The class to parse incoming JSON into + * @param dataConsumer Consumer that processes parsed data objects + */ + public SocketListener(int port, Class dataClass, Consumer dataConsumer) { + this.port = port; + this.dataClass = dataClass; + this.dataConsumer = dataConsumer; + this.objectMapper = new ObjectMapper(); + } + + /** + * Start the SocketListener. + * This creates a threadpool to process the socket. For each piece of data + * received we immediately call the consumer + * function + */ + public void start() { + if (running) { + return; + } + + System.out.println("Started listening on " + port); + running = true; + // Use a new Cached Thread Pool since we expect low numbers of clients + // connecting + executorService = Executors.newCachedThreadPool(); + + executorService.execute(() -> { + try { + serverSocket = new ServerSocket(port); + while (running) { + try { + Socket clientSocket = serverSocket.accept(); + handleConnection(clientSocket); + } catch (IOException e) { + if (running) { + System.err.println("Error accepting connections: " + e.getMessage()); + } + } + } + } catch (IOException e) { + System.err.println("Error starting server because: " + e.getMessage()); + } + + }); + } + + private void handleConnection(Socket clientSocket) { + System.out.println("Opening new client connection"); + executorService.execute(() -> { + try { + BufferedInputStream inputStream = new BufferedInputStream(clientSocket.getInputStream()); + + JsonFactory factory = objectMapper.getFactory(); + JsonParser parser = factory.createParser(inputStream); + + // Set the parser for streaming mode + parser.setCodec(objectMapper); + + while (running && clientSocket.isConnected() && !clientSocket.isClosed()) { + try { + T parsedData = parser.readValueAs(dataClass); + + if (parsedData != null) { + // NOTE: This is going to run pretty slow for multi-client architectures because + // each JSON object in the stream is going to lock the data structure to push a + // new data point. Probably here we should use per thread buffers and then add + // large groups of data points at once + dataConsumer.accept(parsedData); + } + } catch (IOException e) { + + if (clientSocket.isClosed() || !clientSocket.isConnected()) { + break; + } else { + System.err.println("Error parser JSON because " + e.getMessage()); + } + } + } + } catch (IOException e) { + System.err.println("Error reading from socket because " + e.getMessage()); + } finally { + try { + clientSocket.close(); + System.out.println("Closed client connection"); + } catch (IOException e) { + System.err.println("Error closing client socket because " + e.getMessage()); + } + } + + }); + } + + /** + * Stops the listener and cleans up resources. + */ + public void stop() { + running = false; + + System.out.println("Stopping SteadyDetect and freeing port " + port); + + if (serverSocket != null && !serverSocket.isClosed()) { + try { + serverSocket.close(); + } catch (IOException e) { + System.err.println("Error closing server socket: " + e.getMessage()); + } + } + + if (executorService != null) { + executorService.shutdown(); + } + + System.out.println("Streaming socket listener stopped"); + } + + @Override + protected void finalize() { + stop(); + } + +} diff --git a/src/main/java/com/aselimov/app/steady_state_analysis/LinearRegression.java b/src/main/java/com/aselimov/app/steady_state_analysis/LinearRegression.java new file mode 100644 index 0000000..0a507b5 --- /dev/null +++ b/src/main/java/com/aselimov/app/steady_state_analysis/LinearRegression.java @@ -0,0 +1,79 @@ +package com.aselimov.app.steady_state_analysis; + +import com.aselimov.app.timedata.TimeData; +import com.aselimov.app.timedata.TimeData.TimeDataPoints; +import com.aselimov.app.utilities.Statistics; + +/** + * Determine whether a time series is steady state using continuous linear + * regression. + * + * This class performs a basic linear regression on chunks of the input signal. + * If a user specified number of chunks have a slope within a specified + * tolerance, then the signal is labeled as having reached steady state + */ +public class LinearRegression extends SteadyStateCalculator { + private double slopeTol; + private int numNeededMatching; + private int windowSize; + private int numMatching = 0; + private int lastChecked = 0; + + /** + * Create a LinearRegression object + * + * @param slopeTol Tolerance for the slope to be detected as 0 + * @param numNeededMatching the number of consecutive matches before we detect + * the signal as steady state + * @param windowSize The size of the window to perform linear regression + * on + */ + public LinearRegression(double slopeTol, int numNeededMatching, int windowSize) { + this.slopeTol = slopeTol; + this.numNeededMatching = numNeededMatching; + this.windowSize = windowSize; + } + + /** + * Determine whether a signal is steady or not. + * This performs linear regression on the final windowSize number of data + * points and determines whether the signal is steady state or not + */ + @Override + protected SignalStatus isSteady(TimeData inData) { + + // TODO: This is a race condition, in that the time between checking whether we + // need to check steady state can yield additional data points, meaning some + // points aren't getting checked. Probably need ot refactor this code so that + // calling the steady checker happens on the data addition + if ((inData.getNumPnts() - lastChecked) < windowSize) { + return SignalStatus.NOT_CHECKED; + } + TimeDataPoints data = inData.threadsafeClone(); + lastChecked = data.times.size(); + + // Estimate the slope of the fitted line (the intercept doesn't matter) + int endIdx = data.times.size(); + int startIdx = endIdx - windowSize; + double timeMean = Statistics.calcMean(data.times, startIdx, endIdx); + double valueMean = Statistics.calcMean(data.values, startIdx, endIdx); + + double timeVariance = Statistics.calcVariance(data.times, timeMean, startIdx, endIdx); + + double covariance = Statistics.calcCovariance(data.times, data.values, timeMean, valueMean, startIdx, endIdx); + + double slope = covariance / timeVariance; + + if (Math.abs(slope) < slopeTol) { + numMatching += 1; + } else { + numMatching = 0; + } + + if (numMatching >= numNeededMatching) { + return SignalStatus.STEADY; + } else { + return SignalStatus.NOT_STEADY; + } + } +} diff --git a/src/main/java/com/aselimov/app/steady_state_analysis/SteadyStateCalculator.java b/src/main/java/com/aselimov/app/steady_state_analysis/SteadyStateCalculator.java new file mode 100644 index 0000000..89d6c4c --- /dev/null +++ b/src/main/java/com/aselimov/app/steady_state_analysis/SteadyStateCalculator.java @@ -0,0 +1,30 @@ +package com.aselimov.app.steady_state_analysis; + +import com.aselimov.app.timedata.TimeData; + +/** + * A calculator that determines whether input time_series data is steady state. + */ +public abstract class SteadyStateCalculator { + public enum SignalStatus { + STEADY, + NOT_STEADY, + NOT_CHECKED + } + + public void printIfSteady(TimeData data) { + SignalStatus status = isSteady(data); + switch (status) { + case STEADY: + System.out.println("Signal is steady-state"); + break; + case NOT_STEADY: + System.out.println("Signal is not steady-state"); + break; + case NOT_CHECKED: + break; + } + } + + protected abstract SignalStatus isSteady(TimeData data); +} diff --git a/src/main/java/com/aselimov/app/timedata/TimeData.java b/src/main/java/com/aselimov/app/timedata/TimeData.java new file mode 100644 index 0000000..4631f14 --- /dev/null +++ b/src/main/java/com/aselimov/app/timedata/TimeData.java @@ -0,0 +1,94 @@ +package com.aselimov.app.timedata; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; + +import com.aselimov.app.steady_state_analysis.SteadyStateCalculator; + +public class TimeData implements Consumer { + + private TimeDataPoints data = new TimeDataPoints(); + private ReentrantLock mutex = new ReentrantLock(); + + /** + * Single time data point. + * Used for formatted parsing of incoming JSON data + * + */ + public record TimeDataPoint(double time, double value) { + }; + + /** + * Collection of data points + */ + public class TimeDataPoints { + public List times; + public List values; + + /** + * Empty constructor + */ + public TimeDataPoints() { + times = new ArrayList(); + values = new ArrayList(); + } + + /** + * Implement copy constructors for TimeDataPoints + */ + public TimeDataPoints(TimeDataPoints data) { + this(data.times, data.values); + } + + public TimeDataPoints(List times, List values) { + this.times = new ArrayList(times); + this.values = new ArrayList(values); + } + + public void add(double time, double value) { + times.add(time); + values.add(value); + } + }; + + /** + * Allow TimeData to be a consumer of TimeDataPoint. + * Add the TimeDataPoint to the overall time_data + */ + @Override + public void accept(TimeDataPoint arg0) { + mutex.lock(); + try { + data.add(arg0.time(), arg0.value()); + } finally { + mutex.unlock(); + } + + } + + /** + * Get a clone of the TimeDataPoints. + * We get a clone for local operation while ensuring the memory access is + * threadsafe + */ + public TimeDataPoints threadsafeClone() { + mutex.lock(); + try { + return new TimeDataPoints(data.times, data.values); + } finally { + mutex.unlock(); + } + + } + + /** + * Return the number of data points in the class + * + * @return number of data points + */ + public int getNumPnts() { + return data.times.size(); + } +} diff --git a/src/main/java/com/aselimov/app/utilities/Statistics.java b/src/main/java/com/aselimov/app/utilities/Statistics.java new file mode 100644 index 0000000..4a7da73 --- /dev/null +++ b/src/main/java/com/aselimov/app/utilities/Statistics.java @@ -0,0 +1,60 @@ +package com.aselimov.app.utilities; + +import java.util.List; + +public class Statistics { + /** + * Calculate the mean of a set of data + * + * @param data input data + * @param startIdx start idx of data + * @param endIdx end idx of data + * @return + */ + public static double calcMean(List data, int startIdx, int endIdx) { + double sum = 0.0; + for (int i = startIdx; i < endIdx; i++) { + sum += data.get(i); + } + return sum / (endIdx - startIdx); + } + + /** + * Calculate the variance of a set of data + * + * @param data input data + * @param mean mean value of data + * @param startIdx start_idx of data + * @param endIdx end_idx of data + * @return variance of data + */ + public static double calcVariance(List data, double mean, int startIdx, int endIdx) { + double variance = 0.0; + for (int i = startIdx; i < endIdx; i++) { + double xMinusMean = data.get(i) - mean; + variance += xMinusMean * xMinusMean; + } + return variance / (endIdx - startIdx); + } + + /** + * Calculate the covariance of two variables + * + * @param x List of data representing one variable + * @param y List of data representing other variable + * @param xMean mean of x data + * @param yMean mean of y data + * @param startIdx start index for both data + * @param endIdx end index for both data + * @return covariance of x and y + */ + public static double calcCovariance(List x, List y, double xMean, double yMean, int startIdx, + int endIdx) { + double covariance = 0; + for (int i = startIdx; i < endIdx; i++) { + covariance += (x.get(i) - xMean) * (y.get(i) - yMean); + } + return covariance / (endIdx - startIdx - 1); + } + +} diff --git a/src/test/java/com/aselimov/app/AppTest.java b/src/test/java/com/aselimov/app/AppTest.java new file mode 100644 index 0000000..5c508b9 --- /dev/null +++ b/src/test/java/com/aselimov/app/AppTest.java @@ -0,0 +1,19 @@ +package com.aselimov.app; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +/** + * Unit test for simple App. + */ +public class AppTest { + + /** + * Rigorous Test :-) + */ + @Test + public void shouldAnswerWithTrue() { + assertTrue(true); + } +} diff --git a/src/test/java/com/aselimov/app/steady_state_analysis/LinearRegressionTest.java b/src/test/java/com/aselimov/app/steady_state_analysis/LinearRegressionTest.java new file mode 100644 index 0000000..7043414 --- /dev/null +++ b/src/test/java/com/aselimov/app/steady_state_analysis/LinearRegressionTest.java @@ -0,0 +1,77 @@ +package com.aselimov.app.steady_state_analysis; + +import com.aselimov.app.timedata.TimeData; +import com.aselimov.app.timedata.TimeData.TimeDataPoints; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.*; + +public class LinearRegressionTest { + private LinearRegression linearRegression; + private TimeData timeData; + + @BeforeEach + void setUp() { + // Set up a LinearRegression instance with reasonable default values + linearRegression = new LinearRegression(0.01, 3, 5); // slope tolerance = 0.01, need 3 matches, window size = 5 + timeData = new TimeData(); + } + + @Test + void testNotEnoughData() { + // Test when there's not enough data points + timeData.accept(new TimeData.TimeDataPoint(1.0, 1.0)); + timeData.accept(new TimeData.TimeDataPoint(2.0, 2.0)); + + assertEquals(SteadyStateCalculator.SignalStatus.NOT_CHECKED, linearRegression.isSteady(timeData)); + } + + @Test + void testSteadyStateDetection() { + // Test steady state detection with a flat line + for (int i = 0; i < 10; i++) { + timeData.accept(new TimeData.TimeDataPoint(i * 1.0, 1.0)); // Constant value of 1.0 + } + + assertEquals(SteadyStateCalculator.SignalStatus.STEADY, linearRegression.isSteady(timeData)); + } + + @Test + void testNonSteadyState() { + // Test non-steady state with a linearly increasing signal + for (int i = 0; i < 10; i++) { + timeData.accept(new TimeData.TimeDataPoint(i * 1.0, i * 1.0)); // Linear increase + } + + assertEquals(SteadyStateCalculator.SignalStatus.NOT_STEADY, linearRegression.isSteady(timeData)); + } + + @Test + void testMultipleWindows() { + // Test multiple windows with a signal that becomes steady + // First add non-steady data + for (int i = 0; i < 5; i++) { + timeData.accept(new TimeData.TimeDataPoint(i * 1.0, i * 1.0)); // Linear increase + } + + // Add steady data + for (int i = 0; i < 10; i++) { + timeData.accept(new TimeData.TimeDataPoint(i * 1.0 + 5.0, 5.0)); // Constant value of 5.0 + } + + assertEquals(SteadyStateCalculator.SignalStatus.STEADY, linearRegression.isSteady(timeData)); + } + + @Test + void testSlopeTolerance() { + // Test with a signal that has a small slope within tolerance + linearRegression = new LinearRegression(0.1, 3, 5); // Increase slope tolerance + + for (int i = 0; i < 10; i++) { + // Create a signal with a small slope (0.05) + timeData.accept(new TimeData.TimeDataPoint(i * 1.0, 1.0 + i * 0.05)); + } + + assertEquals(SteadyStateCalculator.SignalStatus.STEADY, linearRegression.isSteady(timeData)); + } +} diff --git a/src/test/java/com/aselimov/app/utilities/StatisticsTest.java b/src/test/java/com/aselimov/app/utilities/StatisticsTest.java new file mode 100644 index 0000000..10841b8 --- /dev/null +++ b/src/test/java/com/aselimov/app/utilities/StatisticsTest.java @@ -0,0 +1,58 @@ +package com.aselimov.app.utilities; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +/** + * Unit test the statistics class + */ +public class StatisticsTest { + List x; + List y; + double xMeanFull; + double xMeanHalf; + double yMeanFull; + double yMeanHalf; + + @BeforeEach + void setup() { + x = new ArrayList(Arrays.asList(1.30656191, -0.9354737, -1.10250799, 1.00903942, 1.37715207, -0.163614, + -1.17995963, 0.52334468, -0.24819095, 0.22252544)); + y = new ArrayList(Arrays.asList(0.56089952, 1.9944321, -0.39985776, 0.60433577, 1.01884808, + 0.39328237, 1.72282243, -1.3340576, -0.12969627, -0.1063279)); + + xMeanFull = Statistics.calcMean(x, 0, x.size()); + xMeanHalf = Statistics.calcMean(x, 5, x.size()); + yMeanFull = Statistics.calcMean(y, 0, y.size()); + yMeanHalf = Statistics.calcMean(y, 5, y.size()); + } + + @Test + void testMean() { + assertEquals(0.08088772442417858, xMeanFull, 1e-8); + assertEquals(0.43246807465934944, yMeanFull, 1e-8); + assertEquals(-0.1691788946337382, xMeanHalf, 1e-8); + assertEquals(0.10920460800791403, yMeanHalf, 1e-8); + } + + @Test + void testVariance() { + assertEquals(0.8451099344901358, Statistics.calcVariance(x, xMeanFull, 0, x.size()), 1e-8); + assertEquals(0.33219455205301196, Statistics.calcVariance(x, xMeanHalf, 5, x.size()), 1e-8); + assertEquals(0.8915865222994588, Statistics.calcVariance(y, yMeanFull, 0, y.size()), 1e-8); + assertEquals(0.9741992662607654, Statistics.calcVariance(y, yMeanHalf, 5, y.size()), 1e-8); + } + + @Test + void testCovariance() { + assertEquals(-0.20175547500418511, Statistics.calcCovariance(x, y, xMeanFull, yMeanFull, 0, x.size()), 1e-8); + assertEquals(-0.6736187544740592, Statistics.calcCovariance(x, y, xMeanHalf, yMeanHalf, 5, x.size()), 1e-8); + } + +}