Proper Synchronization Of Java Threads Using Wait/notifyAll?
Here is a simplified version of my application showing what I'm doing.
/*
in my app's main():
Runner run = new Runner();
run.dowork();
*/
class Runner
{
private int totalWorkers = 2;
private int workersDone = 0;
public synchronized void workerDone()
{
workersDone++;
notifyAll();
}
public synchronized void dowork()
{
workersDone = 0;
//<code for opening a file here, other setup here, etc>
Worker a = new Worker(this);
Worker b = new Worker(this);
while ((line = reader.readLine()) != null)
{
//<a large amount of processing on 'line'>
a.setData(line);
b.setData(line);
while (workersDone < totalWorkers)
{
wait();
}
}
}
}
class Worker implements Runnable
{
private Runner runner;
private String data;
public Worker(Runner r)
{
this.runner = r;
Thread t = new Thread(this);
t.start();
}
public synchronized void setData(String s)
{
this.data = s;
notifyAll();
}
public void run
{
while (true)
{
synchronized(this)
{
wait();
//<do work with this.data here>
this.runner.workerDone();
}
}
}
}
The basic concept here is that I have a bunch of workers which all do some processing on an incoming line of data, all independently, and write out the data wherever they like - they do not need to report any data back to the main thread or share data with each other.
The problem that I'm having is that this code deadlocks. I'm reading a file of over 1 million lines and I'm lucky to get 100 lines into it before my app stops responding.
The workers, in reality, all do differing amounts of work so I want to wait until they all complete before moving to the next line.
I cannot let the workers process at different speeds and queue the data internally because the files I am processing are too large for this and won't fit in memory.
I cannot give each worker its own FileReader to independently get 'line', because I do a ton of processing on the line before the workers see it, and do not want to have to re-do the processing in each worker.
I know I'm missing some fairly simple aspect of synchronization in Java but I'm stuck at this point. If someone could explain what I'm doing wrong here I would appreciate it. I believe I'm misunderstanding some aspect of the synchronization but I'm out of ideas for attempting to fix it.
Answer
Working directly with synchronized
, wait()
, and notify()
is definitely tricky.
Fortunately the Java Concurrency API provides some excellent control objects for this sort of thing that are much more intuitive. In particular, look at CyclicBarrier
and CountDownLatch
; one of them almost certainly will be what you're looking for.
You may also find a ThreadPoolExecutor
to be handy for this situation.
Here's a simple example / conversion of your snippet that produces the following output (without deadlock, of course):
Read line: Line 1
Waiting for work to be complete on line: Line 1
Working on line: Line 1
Working on line: Line 1
Read line: Line 2
Waiting for work to be complete on line: Line 2
Working on line: Line 2
Working on line: Line 2
Read line: Line 3
Waiting for work to be complete on line: Line 3
Working on line: Line 3
Working on line: Line 3
All work complete!
public class Runner
{
public static void main(String args[]) {
Runner r = new Runner();
try {
r.dowork();
} catch (IOException e) {
// handle
e.printStackTrace();
}
}
CyclicBarrier barrier;
ExecutorService executor;
private int totalWorkers = 2;
public Runner() {
this.barrier = new CyclicBarrier(this.totalWorkers + 1);
this.executor = Executors.newFixedThreadPool(this.totalWorkers);
}
public synchronized void dowork() throws IOException
{
//<code for opening a file here, other setup here, etc>
//BufferedReader reader = null;
//String line;
final Worker worker = new Worker();
for(String line : new String[]{"Line 1", "Line 2", "Line 3"})
//while ((line = reader.readLine()) != null)
{
System.out.println("Read line: " + line);
//<a large amount of processing on 'line'>
for(int c = 0; c < this.totalWorkers; c++) {
final String curLine = line;
this.executor.submit(new Runnable() {
public void run() {
worker.doWork(curLine);
}
});
}
try {
System.out.println("Waiting for work to be complete on line: " + line);
this.barrier.await();
} catch (InterruptedException e) {
// handle
e.printStackTrace();
} catch (BrokenBarrierException e) {
// handle
e.printStackTrace();
}
}
System.out.println("All work complete!");
}
class Worker
{
public void doWork(String line)
{
//<do work with this.data here>
System.out.println("Working on line: " + line);
try {
Runner.this.barrier.await();
} catch (InterruptedException e) {
// handle
e.printStackTrace();
} catch (BrokenBarrierException e) {
// handle
e.printStackTrace();
}
}
}
}
Related Questions
- → How to update data attribute on Ajax complete
- → October CMS - Radio Button Ajax Click Twice in a Row Causes Content to disappear
- → Octobercms Component Unique id (Twig & Javascript)
- → Passing a JS var from AJAX response to Twig
- → Laravel {!! Form::open() !!} doesn't work within AngularJS
- → DropzoneJS & Laravel - Output form validation errors
- → Import statement and Babel
- → Uncaught TypeError: Cannot read property '__SECRET_DOM_DO_NOT_USE_OR_YOU_WILL_BE_FIRED' of undefined
- → React-router: Passing props to children
- → ListView.DataSource looping data for React Native
- → Can't test submit handler in React component
- → React + Flux - How to avoid global variable
- → Webpack, React & Babel, not rendering DOM