Limiting Concurrent Subscriptions With Rx

This post was prompted by a question on the RxHttp project. I thought that there may be others looking for a similar solution.

Sometimes when using Rx you may wish to limit the number of concurrent subscriptions.

Take the following example (RxPHP and RxHttp):

    Observable::fromArray(
        [
            'http://www.example.com/',
            // ... lots more urls here
            'http://www.example.com/',
        ]
    )
        ->flatMap(function ($url) {
                return \Rx\React\Http::get($url);
            })

If there are 100 URLs in the source array, you will create 100 nearly simultaneous HTTP requests. This is probably not the best scenario for your system, or if they are all on the same site, for the server.

You can limit the number of concurrent subscriptions by grouping the observables into concatenated queues. These queues will hold the observables and subscribe to them one at a time per queue. Here is the code:

    Observable::fromArray(
        [
            'http://www.example.com/',
            // ... lots more urls here
            'http://www.example.com/',
        ]
    )
        ->map(function ($url) {
                return \Rx\React\Http::get($url);
            })
        ->groupBy(function () {
            static $index = 0;
            return $index++ % 4;
        })
        ->flatMap(function (Observable\GroupedObservable $go) {
            return $go->concatAll();
        })

Note that the `flatMap` has been changed to `map`.

The `4` in the `groupBy` keySelector determines the number of concurrent subscriptions.

One issue that this solution may experience is that things get “queued” into groups and wait there. This makes it so that if you have an observable waiting on one of the grouped streams when another becomes idle, it can’t then switch to the idle stream. Everything will still process fine, just may not always be running at “full capacity”.

This strategy is not just for large arrays but could be used to smooth out bursts of data that may create undesired amounts of traffic or system load from downstream observables.

Leave a Reply