The Detectives, the Dispatcher, and the BlockingQueue
iCopyright Discovery is a fairly new Java program we've been working on that scours the web for potential copyright infringement. Because it does a lot of pulling content off of the web as part of its job, by refactoring it into a multithreaded application build around a producer/consumer work queue, we increase throughput dramatically.
If an iCopyright publisher posts a story on the web that some scofflaw copies and pastes into his own website, Discovery pulls down a copy of the copied article and analyzes it for likeness. Discovery runs 24/7, but it spends a remarkable amount of clock time waiting for something else to happen — most of the time waiting on an HTTP GET on the suspicious websites to return. Perhaps a few seconds isn't a long time for a human to wait for a web page to load, but for iCopyright's Linux servers, of course, that's an eternity. By having multiple worker threads each working its own case concurrently, the JVM can simply switch away from a thread when it's waiting for a page to be returned, and give the CPU to a thread that's churning away on some local computation.
The traditional way to solve this problem, of course, is a producer/consumer queue. We'll establish a queue of cases to be worked. The CaseDispatcher thread, the producer in this model, is responsible only for identifying what publisher content should be worked; it will place the ID of each piece of content on the back of a work queue. We call the consumers in this model the Detective: it picks the ID of a case off the front of the queue, works the case, and then reports back for more. We want to have multiple Detectives running at once, so while some are doing HTTP GETs, others can be analyzing the content they've already pulled down.
Since a number of threads, both CaseDispatcher and Detective, will be accessing a common data structure, obviously we need to guard against concurrent modification. Happily for us, there's a Java library object that's perfect for what we want: the BlockingQueue, in the java.util.concurrent package. It's obviously designed for implementing producer/consumer queues: both putting on and taking from the queue are fully threadsafe, and they both block (that is, sit there quietly waiting, but without consuming any CPU resources) if the queue is full or empty, respectively.
Here's the relevant code to the Case Dispatcher:
class CaseDispatcher implements Runnable { public CaseDispatcher(BlockingQueue<Long> queue) { this.caseQueue = queue; } public void run() { try { while (!Thread.currentThread().isInterrupted()) { ContentSelector selector = new ContentSelector(); do { List<Content> contents = selector.select(); for (Content content : contents) { // This call blocks until there is room on the queue this.caseQueue.put(content.getId()); } selector.advance(); } while (foundContent); } } catch (InterruptedException e) { log.info("Dispatcher: interrupted; shutting down"); return; } } BlockingQueue<Long> caseQueue; }
Note that there is no explicit synchronization on the call to the case queue. The ContentSelector returns a list of content objects to process, and each is put on the queue in turn for a Detective to process. (The details of ContentSelector aren't relevant here, and they're trade secrets of iCopyright anyway.) The call to put will eventually block, when we've filled up the queue — meaning that the JVM will effectively put the CaseDispatcher to sleep until there's room.
The Detective code is gloriously simple too:
class Detective implements Runnable { public Detective(BlockingQueue<Long> queue) { this.caseQueue = queue; } public void run() { try { while (!Thread.currentThread().isInterrupted()) { // This call blocks until there is a case to investigate Long id = this.caseQueue.take(); Content content = ContentDAO.getById(id); this.investigate(content); } } catch (InterruptedException e) { log.info("Detective: interrupted; shutting down"); return; } } void investigate(Content c) { // Interesting but trade-secret stuff here } BlockingQueue<Long> caseQueue; }
Like the CaseDispatcher, the Detective works off a single case queue. The call to take will block if there are no cases to work — in other words, the thread will wait for work to show up, but won't waste any computing resources while it's not working. When a piece of content shows up in the queue, the Detective springs into action, loading it up from the database and investigating it. On completion, it grabs another case off the front of the queue.
Finally, we need to create the case queue itself, spawn CaseDispatcher and a set of Detectives, and kick off the entire process. We call that block of code the DiscoveryRunner:
public class DiscoveryRunner { private DiscoveryRunner(int detectives, int queueSize) { this.detectives = detectives; this.executor = Executors.newFixedThreadPool(detectives + 1); this.caseQueue = new LinkedBlockingQueue<Long>(queueSize); log.info("DiscoveryRunner starting with " + detectives + " Detectives on a case queue of size " + queueSize); } private void doWork() { // Kick off the Dispatcher executor.execute(new CaseDispatcher(caseQueue)); // Kick off each Detective for (int i = 0; i < detectives; i++) { Detective d = new Detective(caseQueue); executor.execute(d); } } public static void main(String[] args) { final DiscoveryRunner runner = new DiscoveryRunner(20, 100); runner.doWork(); } private BlockingQueue<Long> caseQueue; }
Here we use an Executor, also in the java.util.concurrency package, which takes care of a lot of the busywork in creating and spawning threads. We know precisely how many threads we'll need for the lifetime of the process, so we create them all at the start. Then, we just pass the tasks off to the waiting threads and away they go.
We're really happy with this design. A multithreaded producer/consumer queue really fits the problem space well, in that there's a lot of network latency inherent to it. And by leveraging the BlockingQueue for our work queue, we're able to concentrate on what makes our app special — the investigation of those cases. A big win for iCopyright.