Throttling is an important concept in multi-threaded applications. It allows us to control the throughput in order to manage resources appropriately. This can be achieved in several ways including CountDownLatch, BlockingQueue and Bounded Semaphore. These are great when you need to keep a Consumer under control, so that it does not start queuing up too much work.
File Poller Example One example might be with a Directory Consumer mechanism that implements a ScheduledExecutorService that runs every 30 seconds to look for files in a directory. For each file, the flow may look something like this:
One Step Further Let's now expand our example. Our directory consumer has been working fine, but we are CPU bound and need more throughput. In addition, we have a single point of failure that needs to be addressed. To solve our problems, we can move our directory to an nfs mount and add an additional application server. Now we can introduce competing directory consumers. Since our directory consumer first tries to rename a file before processing, it will lock the file so we won't have any problems with contention (This assumes that the nfs mount and server are configured to do so). Looks like we are done, but not quite. At least one problem still exists. We now have two directory consumers polling for files. Assuming we keep our bound setting to 3 for each of them, what happens if 4 files are dropped and one of the directory consumers happens to see them first? It will rename all 4 files and start processing 3 of them. This means that we now have 1 file sitting idle when it could have been processed by the other consumer. The problem is that the bound exists on the consumer so the only way to know when we hit the limit is to exceed it. To get around this, the directory consumer needs to be able to reserve a semaphore permit ahead of time. Only once a permit is obtained would the directory consumer rename the file and have the producer submit the task. I have seen people attempt a tryAcquire and if successful, they release the permit and queue up the job. This is just silly as there is no guarantee that the permit will be available by the time the producer queues up the task. One solution is to have a concrete semaphore permit that can be passed to the consumer. This way the permit is maintained until the task completes. Implementation The source code can be found here. I'd also like to take this opportunity to apologize for the unimaginative class names. Feel free to rename them as you see fit. ADSempaphore This class extends the existing Semaphore class in order to preserve the original functionality. It then adds the following methods:
This represents a concrete semaphore permit. An ADPermit can only be created if the semaphore and the identifier match. This will prevent someone from trying to manually create a permit with a fake identifier. Another way to protect the class is to play with the access modifier, but then there are restrictions on how it's used. The class relies on the following methods:
In order to use our implementation, we need to have a consumer and producer that will work together. The consumer would create the semaphore with the appropriate bound limit and would expose the ability to get a permit. It would also accept and verify permits before accepting tasks to process. The producer would need to be able to request permits from the consumer and pass them along when queuing up work. ProdcuerHere is an example of how a producer can obtain a permit and use it: try { Look for files ... if (file.exists()) { // Obtain permit from the consumer ADPermit permit = consumer.getPermit(); if (permit != null) { fileUtility.rename(...); ProcuderOjbect producerObject = producer.createtObject(renamedfile); // Producer creates object to queue consumer.submitTask(permit, producerObject); } // end if permit check } // end if file check } catch (...) { } // Just for illustration. Please don't leave a catch empty :) finally { if (permit != null) { permit.release(); } // VERY IMPORTANT!!! } ConsumerBoundedExecutor requires the following modifications:
Comments are closed.
|
Visit our ​Code Repository for samples and demos.
ARTICLES
All
|