ActiveDev Inc.
  • Home
  • ADCLOUD
    • Getting Started
    • Developing a Microservice
    • Reference
  • Services
  • About
  • Contact
  • Articles

concrete semaphore permit

1/12/2015

 
Throttling is an important concept in multi-threaded applications. It allows us to control the throughput in order to manage resources appropriately. This can be achieved in several ways including CountDownLatch, BlockingQueue and Bounded Semaphore. These are great when you need to keep a Consumer under control, so that it does not start queuing up too much work.

File Poller Example
One example might be with a Directory Consumer mechanism that implements a ScheduledExecutorService that runs every 30 seconds to look for files in a directory. For each file, the flow may look something like this:
  1. Rename the file indicating that it is being looked at.
  2. A producer creates a Callable object that will process the file.
  3. The producer then places the Callable object on the consumer's queue.
  4. Consumer implements a ThreadPoolExecutor with an ArrayBlockingQueue that will pick up the Callable objects and process them.
The flow above describes a common approach to implementing a producer/consumer, but there a nagging problem to consider. The BlockingQueue blocks the consumer, not the producer, so how do we block the producer? If a hundred files are dropped, the producer will rename them all and queue them all up. One approach is to use a BoundedExecutor By implementing a semaphore, the BoundedExecutor can now block the Producer when the upper bound is reached. If we were to set the bound to 3, then our directory consumer would rename the first 4 files it finds, the producer would submit 3 successfully and would wait on the 4th. This seems like a perfect solution and in most cases, it is.

One Step Further
Let's now expand our example. Our directory consumer has been working fine, but we are CPU bound and need more throughput. In addition, we have a single point of failure that needs to be addressed. To solve our problems, we can move our directory to an nfs mount and add an additional application server. Now we can introduce competing directory consumers. Since our directory consumer first tries to rename a file before processing, it will lock the file so we won't have any problems with contention (This assumes that the nfs mount and server are configured to do so).
Looks like we are done, but not quite. At least one problem still exists. We now have two directory consumers polling for files. Assuming we keep our bound setting to 3 for each of them, what happens if 4 files are dropped and one of the directory consumers happens to see them first? It will rename all 4 files and start processing 3 of them. This means that we now have 1 file sitting idle when it could have been processed by the other consumer. The problem is that the bound exists on the consumer so the only way to know when we hit the limit is to exceed it. To get around this, the directory consumer needs to be able to reserve a semaphore permit ahead of time. Only once a permit is obtained would the directory consumer rename the file and have the producer submit the task. I have seen people attempt a tryAcquire and if successful, they release the permit and queue up the job. This is just silly as there is no guarantee that the permit will be available by the time the producer queues up the task. One solution is to have a concrete semaphore permit that can be passed to the consumer. This way the permit is maintained until the task completes.

Implementation
The source code can be found here. I'd also like to take this opportunity to apologize for the unimaginative class names. Feel free to rename them as you see fit.
ADSempaphore
This class extends the existing Semaphore class in order to preserve the original functionality. It then adds the following methods:
  • acquireWithPermit will only create an ADPermit object if the semaphore has a permit to give.
  • isValidPermit will authenticate the permit against the semaphore by passing the permit's identifier. This ensures the caller (i.e. a consumer) that the permit is for the semaphore it holds.
  • authenticate compares a value to its own GUUID. The use of a GUUID is just some insurance that the permit belongs to the correct semaphore.
ADPermit
This represents a concrete semaphore permit. An ADPermit can only be created if the semaphore and the identifier match. This will prevent someone from trying to manually create a permit with a fake identifier. Another way to protect the class is to play with the access modifier, but then there are restrictions on how it's used. The class relies on the following methods:
  • getGUUID will return the identifier and can only be seen by classes in the same package.
  • release will release the permit back to the semaphore and resets the class level variables.
Using Our Implementation
In order to use our implementation, we need to have a consumer and producer that will work together. The consumer would create the semaphore with the appropriate bound limit and would expose the ability to get a permit. It would also accept and verify permits before accepting tasks to process. The producer would need to be able to request permits from the consumer and pass them along when queuing up work.
ProdcuerHere is an example of how a producer can obtain a permit and use it:
try {
    Look for files ...
    if (file.exists()) {
        // Obtain permit from the consumer
       ADPermit permit = consumer.getPermit();
       if (permit != null) {
           fileUtility.rename(...);
           ProcuderOjbect producerObject = producer.createtObject(renamedfile); // Producer creates object to queue
           consumer.submitTask(permit, producerObject);
        } // end if permit check
    } // end if file check
}
catch (...) { } // Just for illustration. Please don't leave a catch empty :)
finally {
    if (permit != null) { permit.release(); } // VERY IMPORTANT!!!
}
ConsumerBoundedExecutor requires the following modifications:
    • Use ADSemaphore instead of Semaphore, i.e. private final ADSemaphore semaphore;
    • In the constructor, make sure it creates the ADSemaphore instead of Semaphore, i.e. this.semaphore = new ADSemaphore(bound);
    • Add a method to just expose the aquireWithPermit of the ADSemaphore, i.e. public ADPermit getPermit() { return adSemaphore.aquireWithPermit(); }
    • Add ADPermit to submitTask, i.e. public void submitTask(ADPermit permit, final Runnable command) ...
    • In submitTask, check to see if permit is valid instead of doing the acquire on the semaphore. In addition, semaphore.release calls should be replaced with permit.releaase in order to ensure the permit is reset properly for the producer.
Considerations:
    • Consumer has to assume that the producer will do the throttling (although the consumer can still protect itself with Queues)
    • Multiple producers would need to share the same semaphore to ensure the Consumer is throttled properly. In this case The consumer contains the semaphore and exposes it to the producers via the getPermit method.
    • Producers MUST release permits under all circumstances such as in a finally clause

Comments are closed.
    Picture
    Visit our ​Code Repository for samples and demos.

    ARTICLES

    All
    Becoming A Consultant
    Development
    EAI - Part 1
    EAI - Part 2
    Microservice - Spring Boot
    Microservice - Spring Cloud
    Problem Solving
    Roadmap

Site powered by Weebly. Managed by SiteGround
  • Home
  • ADCLOUD
    • Getting Started
    • Developing a Microservice
    • Reference
  • Services
  • About
  • Contact
  • Articles