Sunday , September 22 2019
Home / Uncategorized / How to implement reactive polling in Java

How to implement reactive polling in Java



There are no comments

Suppose it is necessary to stream data from an external source, but that the data source does not support push natively, so it is necessary to resort to periodic polling. How to implement this in Java, in the simplest way possible, while remaining responsive?

General idea

Before we immerse ourselves in the code, we first discuss the requirements and the general idea. What we need to do is basically to activate a recovery operation at a fixed rate, blocking the client until new data are available. In addition, suppose we want to remain responsive, so we should not lock indefinitely, but unlock after a certain maximum time has elapsed, after which the client can react accordingly (try again, stop or do something else) .

To meet these requirements, we will implement a variant of the Token Bucket algorithm, which is commonly used for Traffic Shaping. In this algorithm, a fixed number of tokens is periodically placed in a virtual bucket of a specified capacity. At the same time, another thread waiting to perform some operations (for example, sending a data packet on the network) checks the contents of the bucket and if there are enough tokens in it, remove them from the bucket and run the & # 39; operation. In this article, we will simplify the algorithm by simulating a bucket with a capacity of one and using only one consumption thread.

Implementation

Since our bucket has the capacity of one, it will only have two states (full and empty). This can be represented by a single Boolean value, true empty meaning and full empty meaning:

private Boolean going to take = true; // let's start to recover immediately

Furthermore, we need to plan an activity that will "fill the bucket" periodically at a fixed speed. This is done using a ScheduledExecutor service:

empty beginning() {
    ScheduledExecutorService es = Executors.newScheduledThreadPool(1);
    es.scheduleAtFixedRate(This::scheduleFetch, FETCH_INTERVAL, FETCH_INTERVAL, TimeUnit.MILLISECONDS);
}

What does the scheduleFetch appearance? Simply set the fetch variable to true (fills the bucket) and notify another (recovery) thread, which at that time might wait for the status of our bucket to be changed. For a discussion of why the next two methods need to be synchronized, see this stack overflow question.

synchronized empty scheduleFetch() {
    going to take = true;
    notify();
}

Next, we will provide an operation that will be returned immediately if the bucket is full or blocked for a set period of time, waiting for it to become full, returning the most recent bucket status and emptying it at the end:

synchronized Boolean awaitFetch() throws InterruptedException {
    Self (!going to take)
        wait(WAIT_LIMIT);
    try {
        return going to take;
    } finally {
        going to take = false;
    }
}

Since we will not stop WAIT_LIMIT any longer, this method is guaranteed to return no more than WAIT_LIMIT. We need this guarantee to ensure reactivity, as we will see shortly. In total, the operation reports to the caller whether a recovery is allowed, returning no more than WAIT_LIMIT milliseconds.

With this post, and assuming that the actual recovery operation (sending a request on the network, interpreting the answer etc.) be implemented in the doFetch method, we can finally implement our survey blocking method:

List poll() throws InterruptedException {
    return awaitFetch() ? doFetch() : null;
}

Here, there are no signals to the customer that new data are not yet available. In fact, this is the exact protocol needed by the source connectors in Kafka Connect and the described implementation is used in the PLC4X source connector.

Remarks

There are two main parameters in this program: WAIT_LIMIT and FETCH_INTERVAL. The former controls the responsiveness of the client – the longer WAIT_LIMIT is low, the faster the control is returned to the client in the event that new data is not available.

The second parameter controls the maximum request speed (sampling). It is in fact a higher limit because the actual sampling rate may be lower, ie when the recovery operation takes longer than FETCH_INTERVAL.

alternatives

Although the proposed solution works, there are alternatives. One of these alternatives is to directly retrieve data in the scheduled periodic activity instead of notifying the recovery thread (client). However, since it is necessary to block the client thread waiting for new data, it is necessary to transfer the retrieved results from the periodic task to the client, for example through a blocking queue.

Another alternative is to use a utility class already prepared for this type of activity, such as RateLimiter from the Google Guava library. This would further simplify the implementation. However, you will need to add another library dependency to your project, which, depending on the circumstances, may be appropriate for you or not.

Conclusion

Simple reactive polling can be implemented in a surprisingly simple way using a variant of the Token Bucket algorithm, using two low-level synchronization primitives of the Java platform: waiting and notification. Although common knowledge implies that you should never create problems with basic synchronization primitives and use abstractions in java.util.concurrent, this example shows that sometimes it is correct to break the rules if the work is done.


Source link

Leave a Reply

Your email address will not be published.