lundi 9 mars 2015

Run computation on only the most recent, asynchronously updating data

Java Wizards!


I'd like to implement the following requirements as efficiently as possible in Java. Every microsecond counts.


The tl;dr version is I have a some computation that needs to run on fresh data. As soon as the data changes, the computation needs to run. If the data changes before the computation finishes, the computation needs to be cancelled and started over on the most up-to-date data.


In detail:



  • I have N sources of new data that update asynchronously. Call them instances of class DataSource (DataSource ds1 = new DataSource();, DataSource ds2 = new DataSource();, etc.)

  • The public method getNewData() of a DataSource returns new data if it is available, otherwise the thread blocks until there is new data.

  • Let GlobalState be the snapshot of all the states of the streams at any given moment. Any time any of the streams update, GlobalState changes. In other words, GlobalState always has the most up-to-date information on all the streams' data. If Java was pass by reference, one might imagine instantiating a GlobalState as follows: GlobalState gs = new GlobalState(ds1.datum, ds2.datum, ...);

  • As soon as GlobalState changes (because of one of the streams updating), a job is kicked off, which may take some amount of time. If the job finishes before GlobalState changes again, great, we save the result, and then wait for it to change and then do work on the new state, ad infinitum. If it doesn't finish before GlobalState changes again, then the job is cancelled, and a new job is started for the new state.


My best guess:



public class App {
public static void main(String[] args) {

DataSource ds1 = new DataSource(...);
DataSource ds2 = new DataSource(...);
GlobalState gs = new GlobalState(ds1, ds2);

ds1.start(); // runs and updates its data asynchronously
ds2.start(); // runs and updates its data asynchronously

Worker worker = new Worker();

while(true) {
try{
GlobalDataState gds = gs.getState(); // this blocks if the state isn't different from when the method was last called.
Future result = worker.doWork(gds); // work happening in a different thread.
System.out.println("Result is: " + result.get()); // blocks until its result is available or cancelled.
} catch (CancellationException ce) {
System.err.println("Workers too slow! Starting over on new data.");
}
}
}
}

public class Worker {

private Future pendingResult;
private final ExecutorService exec;

public Worker() {
this.exec = Executors.newFixedThreadPool(2);
}

public Future doWork(GlobalDataState gds) { // GlobalDataState implements Callable
// cancels jobs that hadn't finished yet.
if (pendingResult != null ) {
if (!pendingResult.isDone()) {
pendingResult.cancel(true);
}
}
pendingResult = exec.submit(gds);

return pendingResult;
}

}


The major problem I'm having is figuring out how to implement GlobalState in a way that doesn't require me to poll for new data in a loop. I'm thinking the way to do this is with a blocking queue with a capacity of 1, (SynchronousQueue, ArrayBlockingQueue(1), ...?) but I only want it to block take() not put(). If the main thread is blocked on the gs.getState() call, we can't have this block the part of the program that is adding a new GlobalDataState to this single element queue. On the other hand, if data is updating way faster than my worker can handle it, I don't want old data waiting in this queue. If there is a GlobalDataState object sitting in the queue, and another one is being offered, it needs to evict the one sitting there and add the new one. That way, whenever the main thread does get around to calling gs.getState(), it is definitely the most up-to-date info.


I've also considered using a Phaser to manage the go-ahead on computations, but every one of my attempts seems messy.


So that's my best guess. I'd appreciate any suggestions regarding data structures and or design patterns that best accomplishes the application goal. Remember, every microsecond counts.


Thanks!


Aucun commentaire:

Enregistrer un commentaire