WIP basic architecture done. Need to fix broken LinearRegressionTests

This commit is contained in:
Alex Selimov 2025-04-02 15:37:30 -04:00
commit 95e549b8f3
10 changed files with 759 additions and 0 deletions

110
pom.xml Normal file
View File

@ -0,0 +1,110 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.aselimov.app</groupId>
<artifactId>SteadyState-Detector</artifactId>
<version>1.0-SNAPSHOT</version>
<name>SteadyState-Detector</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.release>17</maven.compiler.release>
<exec.mainClass>com.aselimov.app.SteadyDetect</exec.mainClass>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
<version>5.11.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<!-- Optionally: parameterized tests support -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<!-- Jackson Core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.15.2</version>
</dependency>
<!-- Jackson Databind (includes jackson-annotations) -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.15.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/args4j/args4j -->
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
<version>2.37</version>
</dependency>
</dependencies>
<build>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<!-- clean lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#clean_Lifecycle -->
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.4.0</version>
</plugin>
<!-- default lifecycle, jar packaging: see https://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_jar_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.3.1</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.13.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>3.3.0</version>
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<version>3.4.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>3.1.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>3.1.2</version>
</plugin>
<!-- site lifecycle, see https://maven.apache.org/ref/current/maven-core/lifecycles.html#site_Lifecycle -->
<plugin>
<artifactId>maven-site-plugin</artifactId>
<version>3.12.1</version>
</plugin>
<plugin>
<artifactId>maven-project-info-reports-plugin</artifactId>
<version>3.6.1</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>

View File

@ -0,0 +1,78 @@
package com.aselimov.app;
import static org.kohsuke.args4j.ExampleMode.ALL;
import java.io.IOException;
import org.kohsuke.args4j.Argument;
import org.kohsuke.args4j.CmdLineException;
import org.kohsuke.args4j.CmdLineParser;
import com.aselimov.app.socket.SocketListener;
import com.aselimov.app.steady_state_analysis.LinearRegression;
import com.aselimov.app.steady_state_analysis.SteadyStateCalculator;
import com.aselimov.app.timedata.TimeData;
public class SteadyDetect {
/**
* Main function accepts 1 argument which is the port that should be bound.
*
*/
public static void main(String[] args) throws IOException {
new SteadyDetect().doMain(args);
}
private SocketListener socketListener;
private TimeData data = new TimeData();
private SteadyStateCalculator steadyStateCalc;
@Argument
private Integer port = null;
@Argument
private Double slopeTol = null;
@Argument
private Integer numNeededMatching = null;
@Argument
private Integer windowSize = null;
public void doMain(String[] args) throws IOException {
CmdLineParser parser = new CmdLineParser(this);
parser.setUsageWidth(80);
try {
parser.parseArgument(args);
if (port == null)
throw new CmdLineException(parser, "No argument is given");
} catch (CmdLineException e) {
// if there's a problem in the command line,
// you'll get this exception. this will report
// an error message.
System.err.println(e.getMessage());
System.err.println("java SteadyDetect [options...] port");
// print the list of available options
parser.printUsage(System.err);
System.err.println();
// print option sample. This is useful some time
System.err.println(" Example: java SteadyDetect" + parser.printExample(ALL));
return;
}
System.out.println("Binding SteadyDetect to port " + port);
socketListener = new SocketListener(port, TimeData.TimeDataPoint.class, data);
socketListener.start();
// Initialize the SteadyStateCalc to the LinearRegression calculator
steadyStateCalc = new LinearRegression(slopeTol, numNeededMatching, windowSize);
while (true) {
steadyStateCalc.printIfSteady(data);
}
}
}

View File

@ -0,0 +1,154 @@
package com.aselimov.app.socket;
import java.io.*;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
/**
* Listen for data over a socket
*
*/
public class SocketListener<T> {
private final int port;
private final Class<T> dataClass;
private final ObjectMapper objectMapper;
private final Consumer<T> dataConsumer;
private ServerSocket serverSocket;
private boolean running = false;
private ExecutorService executorService;
/**
* Creates a new StreamingSocketJsonListener.
*
* @param port The port to listen on
* @param targetClass The class to parse incoming JSON into
* @param dataConsumer Consumer that processes parsed data objects
*/
public SocketListener(int port, Class<T> dataClass, Consumer<T> dataConsumer) {
this.port = port;
this.dataClass = dataClass;
this.dataConsumer = dataConsumer;
this.objectMapper = new ObjectMapper();
}
/**
* Start the SocketListener.
* This creates a threadpool to process the socket. For each piece of data
* received we immediately call the consumer
* function
*/
public void start() {
if (running) {
return;
}
System.out.println("Started listening on " + port);
running = true;
// Use a new Cached Thread Pool since we expect low numbers of clients
// connecting
executorService = Executors.newCachedThreadPool();
executorService.execute(() -> {
try {
serverSocket = new ServerSocket(port);
while (running) {
try {
Socket clientSocket = serverSocket.accept();
handleConnection(clientSocket);
} catch (IOException e) {
if (running) {
System.err.println("Error accepting connections: " + e.getMessage());
}
}
}
} catch (IOException e) {
System.err.println("Error starting server because: " + e.getMessage());
}
});
}
private void handleConnection(Socket clientSocket) {
System.out.println("Opening new client connection");
executorService.execute(() -> {
try {
BufferedInputStream inputStream = new BufferedInputStream(clientSocket.getInputStream());
JsonFactory factory = objectMapper.getFactory();
JsonParser parser = factory.createParser(inputStream);
// Set the parser for streaming mode
parser.setCodec(objectMapper);
while (running && clientSocket.isConnected() && !clientSocket.isClosed()) {
try {
T parsedData = parser.readValueAs(dataClass);
if (parsedData != null) {
// NOTE: This is going to run pretty slow for multi-client architectures because
// each JSON object in the stream is going to lock the data structure to push a
// new data point. Probably here we should use per thread buffers and then add
// large groups of data points at once
dataConsumer.accept(parsedData);
}
} catch (IOException e) {
if (clientSocket.isClosed() || !clientSocket.isConnected()) {
break;
} else {
System.err.println("Error parser JSON because " + e.getMessage());
}
}
}
} catch (IOException e) {
System.err.println("Error reading from socket because " + e.getMessage());
} finally {
try {
clientSocket.close();
System.out.println("Closed client connection");
} catch (IOException e) {
System.err.println("Error closing client socket because " + e.getMessage());
}
}
});
}
/**
* Stops the listener and cleans up resources.
*/
public void stop() {
running = false;
System.out.println("Stopping SteadyDetect and freeing port " + port);
if (serverSocket != null && !serverSocket.isClosed()) {
try {
serverSocket.close();
} catch (IOException e) {
System.err.println("Error closing server socket: " + e.getMessage());
}
}
if (executorService != null) {
executorService.shutdown();
}
System.out.println("Streaming socket listener stopped");
}
@Override
protected void finalize() {
stop();
}
}

View File

@ -0,0 +1,79 @@
package com.aselimov.app.steady_state_analysis;
import com.aselimov.app.timedata.TimeData;
import com.aselimov.app.timedata.TimeData.TimeDataPoints;
import com.aselimov.app.utilities.Statistics;
/**
* Determine whether a time series is steady state using continuous linear
* regression.
*
* This class performs a basic linear regression on chunks of the input signal.
* If a user specified number of chunks have a slope within a specified
* tolerance, then the signal is labeled as having reached steady state
*/
public class LinearRegression extends SteadyStateCalculator {
private double slopeTol;
private int numNeededMatching;
private int windowSize;
private int numMatching = 0;
private int lastChecked = 0;
/**
* Create a LinearRegression object
*
* @param slopeTol Tolerance for the slope to be detected as 0
* @param numNeededMatching the number of consecutive matches before we detect
* the signal as steady state
* @param windowSize The size of the window to perform linear regression
* on
*/
public LinearRegression(double slopeTol, int numNeededMatching, int windowSize) {
this.slopeTol = slopeTol;
this.numNeededMatching = numNeededMatching;
this.windowSize = windowSize;
}
/**
* Determine whether a signal is steady or not.
* This performs linear regression on the final windowSize number of data
* points and determines whether the signal is steady state or not
*/
@Override
protected SignalStatus isSteady(TimeData inData) {
// TODO: This is a race condition, in that the time between checking whether we
// need to check steady state can yield additional data points, meaning some
// points aren't getting checked. Probably need ot refactor this code so that
// calling the steady checker happens on the data addition
if ((inData.getNumPnts() - lastChecked) < windowSize) {
return SignalStatus.NOT_CHECKED;
}
TimeDataPoints data = inData.threadsafeClone();
lastChecked = data.times.size();
// Estimate the slope of the fitted line (the intercept doesn't matter)
int endIdx = data.times.size();
int startIdx = endIdx - windowSize;
double timeMean = Statistics.calcMean(data.times, startIdx, endIdx);
double valueMean = Statistics.calcMean(data.values, startIdx, endIdx);
double timeVariance = Statistics.calcVariance(data.times, timeMean, startIdx, endIdx);
double covariance = Statistics.calcCovariance(data.times, data.values, timeMean, valueMean, startIdx, endIdx);
double slope = covariance / timeVariance;
if (Math.abs(slope) < slopeTol) {
numMatching += 1;
} else {
numMatching = 0;
}
if (numMatching >= numNeededMatching) {
return SignalStatus.STEADY;
} else {
return SignalStatus.NOT_STEADY;
}
}
}

View File

@ -0,0 +1,30 @@
package com.aselimov.app.steady_state_analysis;
import com.aselimov.app.timedata.TimeData;
/**
* A calculator that determines whether input time_series data is steady state.
*/
public abstract class SteadyStateCalculator {
public enum SignalStatus {
STEADY,
NOT_STEADY,
NOT_CHECKED
}
public void printIfSteady(TimeData data) {
SignalStatus status = isSteady(data);
switch (status) {
case STEADY:
System.out.println("Signal is steady-state");
break;
case NOT_STEADY:
System.out.println("Signal is not steady-state");
break;
case NOT_CHECKED:
break;
}
}
protected abstract SignalStatus isSteady(TimeData data);
}

View File

@ -0,0 +1,94 @@
package com.aselimov.app.timedata;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import com.aselimov.app.steady_state_analysis.SteadyStateCalculator;
public class TimeData implements Consumer<TimeData.TimeDataPoint> {
private TimeDataPoints data = new TimeDataPoints();
private ReentrantLock mutex = new ReentrantLock();
/**
* Single time data point.
* Used for formatted parsing of incoming JSON data
*
*/
public record TimeDataPoint(double time, double value) {
};
/**
* Collection of data points
*/
public class TimeDataPoints {
public List<Double> times;
public List<Double> values;
/**
* Empty constructor
*/
public TimeDataPoints() {
times = new ArrayList();
values = new ArrayList();
}
/**
* Implement copy constructors for TimeDataPoints
*/
public TimeDataPoints(TimeDataPoints data) {
this(data.times, data.values);
}
public TimeDataPoints(List<Double> times, List<Double> values) {
this.times = new ArrayList(times);
this.values = new ArrayList(values);
}
public void add(double time, double value) {
times.add(time);
values.add(value);
}
};
/**
* Allow TimeData to be a consumer of TimeDataPoint.
* Add the TimeDataPoint to the overall time_data
*/
@Override
public void accept(TimeDataPoint arg0) {
mutex.lock();
try {
data.add(arg0.time(), arg0.value());
} finally {
mutex.unlock();
}
}
/**
* Get a clone of the TimeDataPoints.
* We get a clone for local operation while ensuring the memory access is
* threadsafe
*/
public TimeDataPoints threadsafeClone() {
mutex.lock();
try {
return new TimeDataPoints(data.times, data.values);
} finally {
mutex.unlock();
}
}
/**
* Return the number of data points in the class
*
* @return number of data points
*/
public int getNumPnts() {
return data.times.size();
}
}

View File

@ -0,0 +1,60 @@
package com.aselimov.app.utilities;
import java.util.List;
public class Statistics {
/**
* Calculate the mean of a set of data
*
* @param data input data
* @param startIdx start idx of data
* @param endIdx end idx of data
* @return
*/
public static double calcMean(List<Double> data, int startIdx, int endIdx) {
double sum = 0.0;
for (int i = startIdx; i < endIdx; i++) {
sum += data.get(i);
}
return sum / (endIdx - startIdx);
}
/**
* Calculate the variance of a set of data
*
* @param data input data
* @param mean mean value of data
* @param startIdx start_idx of data
* @param endIdx end_idx of data
* @return variance of data
*/
public static double calcVariance(List<Double> data, double mean, int startIdx, int endIdx) {
double variance = 0.0;
for (int i = startIdx; i < endIdx; i++) {
double xMinusMean = data.get(i) - mean;
variance += xMinusMean * xMinusMean;
}
return variance / (endIdx - startIdx);
}
/**
* Calculate the covariance of two variables
*
* @param x List of data representing one variable
* @param y List of data representing other variable
* @param xMean mean of x data
* @param yMean mean of y data
* @param startIdx start index for both data
* @param endIdx end index for both data
* @return covariance of x and y
*/
public static double calcCovariance(List<Double> x, List<Double> y, double xMean, double yMean, int startIdx,
int endIdx) {
double covariance = 0;
for (int i = startIdx; i < endIdx; i++) {
covariance += (x.get(i) - xMean) * (y.get(i) - yMean);
}
return covariance / (endIdx - startIdx - 1);
}
}

View File

@ -0,0 +1,19 @@
package com.aselimov.app;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Test;
/**
* Unit test for simple App.
*/
public class AppTest {
/**
* Rigorous Test :-)
*/
@Test
public void shouldAnswerWithTrue() {
assertTrue(true);
}
}

View File

@ -0,0 +1,77 @@
package com.aselimov.app.steady_state_analysis;
import com.aselimov.app.timedata.TimeData;
import com.aselimov.app.timedata.TimeData.TimeDataPoints;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*;
public class LinearRegressionTest {
private LinearRegression linearRegression;
private TimeData timeData;
@BeforeEach
void setUp() {
// Set up a LinearRegression instance with reasonable default values
linearRegression = new LinearRegression(0.01, 3, 5); // slope tolerance = 0.01, need 3 matches, window size = 5
timeData = new TimeData();
}
@Test
void testNotEnoughData() {
// Test when there's not enough data points
timeData.accept(new TimeData.TimeDataPoint(1.0, 1.0));
timeData.accept(new TimeData.TimeDataPoint(2.0, 2.0));
assertEquals(SteadyStateCalculator.SignalStatus.NOT_CHECKED, linearRegression.isSteady(timeData));
}
@Test
void testSteadyStateDetection() {
// Test steady state detection with a flat line
for (int i = 0; i < 10; i++) {
timeData.accept(new TimeData.TimeDataPoint(i * 1.0, 1.0)); // Constant value of 1.0
}
assertEquals(SteadyStateCalculator.SignalStatus.STEADY, linearRegression.isSteady(timeData));
}
@Test
void testNonSteadyState() {
// Test non-steady state with a linearly increasing signal
for (int i = 0; i < 10; i++) {
timeData.accept(new TimeData.TimeDataPoint(i * 1.0, i * 1.0)); // Linear increase
}
assertEquals(SteadyStateCalculator.SignalStatus.NOT_STEADY, linearRegression.isSteady(timeData));
}
@Test
void testMultipleWindows() {
// Test multiple windows with a signal that becomes steady
// First add non-steady data
for (int i = 0; i < 5; i++) {
timeData.accept(new TimeData.TimeDataPoint(i * 1.0, i * 1.0)); // Linear increase
}
// Add steady data
for (int i = 0; i < 10; i++) {
timeData.accept(new TimeData.TimeDataPoint(i * 1.0 + 5.0, 5.0)); // Constant value of 5.0
}
assertEquals(SteadyStateCalculator.SignalStatus.STEADY, linearRegression.isSteady(timeData));
}
@Test
void testSlopeTolerance() {
// Test with a signal that has a small slope within tolerance
linearRegression = new LinearRegression(0.1, 3, 5); // Increase slope tolerance
for (int i = 0; i < 10; i++) {
// Create a signal with a small slope (0.05)
timeData.accept(new TimeData.TimeDataPoint(i * 1.0, 1.0 + i * 0.05));
}
assertEquals(SteadyStateCalculator.SignalStatus.STEADY, linearRegression.isSteady(timeData));
}
}

View File

@ -0,0 +1,58 @@
package com.aselimov.app.utilities;
import static org.junit.jupiter.api.Assertions.assertEquals;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
/**
* Unit test the statistics class
*/
public class StatisticsTest {
List<Double> x;
List<Double> y;
double xMeanFull;
double xMeanHalf;
double yMeanFull;
double yMeanHalf;
@BeforeEach
void setup() {
x = new ArrayList(Arrays.asList(1.30656191, -0.9354737, -1.10250799, 1.00903942, 1.37715207, -0.163614,
-1.17995963, 0.52334468, -0.24819095, 0.22252544));
y = new ArrayList(Arrays.asList(0.56089952, 1.9944321, -0.39985776, 0.60433577, 1.01884808,
0.39328237, 1.72282243, -1.3340576, -0.12969627, -0.1063279));
xMeanFull = Statistics.calcMean(x, 0, x.size());
xMeanHalf = Statistics.calcMean(x, 5, x.size());
yMeanFull = Statistics.calcMean(y, 0, y.size());
yMeanHalf = Statistics.calcMean(y, 5, y.size());
}
@Test
void testMean() {
assertEquals(0.08088772442417858, xMeanFull, 1e-8);
assertEquals(0.43246807465934944, yMeanFull, 1e-8);
assertEquals(-0.1691788946337382, xMeanHalf, 1e-8);
assertEquals(0.10920460800791403, yMeanHalf, 1e-8);
}
@Test
void testVariance() {
assertEquals(0.8451099344901358, Statistics.calcVariance(x, xMeanFull, 0, x.size()), 1e-8);
assertEquals(0.33219455205301196, Statistics.calcVariance(x, xMeanHalf, 5, x.size()), 1e-8);
assertEquals(0.8915865222994588, Statistics.calcVariance(y, yMeanFull, 0, y.size()), 1e-8);
assertEquals(0.9741992662607654, Statistics.calcVariance(y, yMeanHalf, 5, y.size()), 1e-8);
}
@Test
void testCovariance() {
assertEquals(-0.20175547500418511, Statistics.calcCovariance(x, y, xMeanFull, yMeanFull, 0, x.size()), 1e-8);
assertEquals(-0.6736187544740592, Statistics.calcCovariance(x, y, xMeanHalf, yMeanHalf, 5, x.size()), 1e-8);
}
}