Ad

Proper Synchronization Of Java Threads Using Wait/notifyAll?

- 1 answer

Here is a simplified version of my application showing what I'm doing.

/*
in my app's main():

    Runner run = new Runner();

    run.dowork();

*/

class Runner
{
    private int totalWorkers = 2;
    private int workersDone = 0;

    public synchronized void workerDone()
    {
        workersDone++;
        notifyAll();
    }

    public synchronized void dowork()
    {
        workersDone = 0;

        //<code for opening a file here, other setup here, etc>

        Worker a = new Worker(this);
        Worker b = new Worker(this);

        while ((line = reader.readLine()) != null)
        {
            //<a large amount of processing on 'line'>

            a.setData(line);
            b.setData(line);

            while (workersDone < totalWorkers)
            {
                wait();
            }               
        }
    }
}

class Worker implements Runnable
{
    private Runner runner;
    private String data;

    public Worker(Runner r)
    {
        this.runner = r;
        Thread t = new Thread(this);
        t.start();
    }

    public synchronized void setData(String s)
    {
        this.data = s;
        notifyAll();
    }

    public void run
    {
        while (true)
        {
            synchronized(this)
            {
                wait();

                //<do work with this.data here>

                this.runner.workerDone();
            }
        }
    }
}

The basic concept here is that I have a bunch of workers which all do some processing on an incoming line of data, all independently, and write out the data wherever they like - they do not need to report any data back to the main thread or share data with each other.

The problem that I'm having is that this code deadlocks. I'm reading a file of over 1 million lines and I'm lucky to get 100 lines into it before my app stops responding.

The workers, in reality, all do differing amounts of work so I want to wait until they all complete before moving to the next line.

I cannot let the workers process at different speeds and queue the data internally because the files I am processing are too large for this and won't fit in memory.

I cannot give each worker its own FileReader to independently get 'line', because I do a ton of processing on the line before the workers see it, and do not want to have to re-do the processing in each worker.

I know I'm missing some fairly simple aspect of synchronization in Java but I'm stuck at this point. If someone could explain what I'm doing wrong here I would appreciate it. I believe I'm misunderstanding some aspect of the synchronization but I'm out of ideas for attempting to fix it.

Ad

Answer

Working directly with synchronized, wait(), and notify() is definitely tricky.

Fortunately the Java Concurrency API provides some excellent control objects for this sort of thing that are much more intuitive. In particular, look at CyclicBarrier and CountDownLatch; one of them almost certainly will be what you're looking for.

You may also find a ThreadPoolExecutor to be handy for this situation.

Here's a simple example / conversion of your snippet that produces the following output (without deadlock, of course):

Read line: Line 1
Waiting for work to be complete on line: Line 1
Working on line: Line 1
Working on line: Line 1
Read line: Line 2
Waiting for work to be complete on line: Line 2
Working on line: Line 2
Working on line: Line 2
Read line: Line 3
Waiting for work to be complete on line: Line 3
Working on line: Line 3
Working on line: Line 3
All work complete!

public class Runner
{

    public static void main(String args[]) {
        Runner r = new Runner();
        try {
            r.dowork();
        } catch (IOException e) {
            // handle
            e.printStackTrace();
        }
    }

    CyclicBarrier barrier;
    ExecutorService executor;
    private int totalWorkers = 2;

    public Runner() {
        this.barrier = new CyclicBarrier(this.totalWorkers + 1);
        this.executor = Executors.newFixedThreadPool(this.totalWorkers);
    }

    public synchronized void dowork() throws IOException
    {
        //<code for opening a file here, other setup here, etc>
        //BufferedReader reader = null;
        //String line;

        final Worker worker = new Worker();

        for(String line : new String[]{"Line 1", "Line 2", "Line 3"})
        //while ((line = reader.readLine()) != null)
        {
            System.out.println("Read line: " + line);
            //<a large amount of processing on 'line'>

            for(int c = 0; c < this.totalWorkers; c++) {
                final String curLine = line;
                this.executor.submit(new Runnable() {
                    public void run() {
                        worker.doWork(curLine);
                    }
                });
            }

            try {
                System.out.println("Waiting for work to be complete on line: " + line);
                this.barrier.await();
            } catch (InterruptedException e) {
                // handle
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // handle
                e.printStackTrace();
            }
        }

        System.out.println("All work complete!");
    }

    class Worker
    {
        public void doWork(String line)
        {
            //<do work with this.data here>
            System.out.println("Working on line: " + line);

            try {
                Runner.this.barrier.await();
            } catch (InterruptedException e) {
                // handle
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                // handle
                e.printStackTrace();
            }
        }
    }    
}
Ad
source: stackoverflow.com
Ad