Ad

How To Use RxJS To Coordinate Both Serial And Parallel Jobs?

- 1 answer

With RxJS, I'm struggling to find a way to process a mix of serial and parallel jobs. Or even vary/change the processing concurrency of a queue of jobs.

Here is a basic working example of processing items in an array with concurrency hardcoded:

import { from, defer } from 'rxjs'
import { mergeAll } from 'rxjs/operators'

const delay = 1000
const concurrency = 2

const doSomethingSlow = async (val) => {
  await new Promise(resolve => {
    setTimeout(() => {
      console.log(`${val} - Done`)
      resolve()
    }, delay)
  })
}

const jobs = [
  1,2,3,4,5,6,7,8,9,10
]
const observables = from(jobs.map(num => defer(() => doSomethingSlow(num))))

observables
  .pipe(
    mergeAll(concurrency)
  )
  .subscribe()

https://stackblitz.com/edit/rxjs-tyfer3?file=index.ts

What I'd like to do is have the parent level array as a list of Serial Tasks that contain sub-arrays of Parallel Jobs. e.g..

const tasks = [
  { concurrency: 1, jobs: [1] },
  { concurrency: 2, jobs: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11] },
  { concurrency: 1, jobs: [12, 13] }
]

The hope would be that job 1 is processed first and that nothing else starts until it finishes. Then jobs 2 and 3 are picked up in parallel and jobs 4 through 11 are processed whenever a slot opens up out of the two available concurrency processors. Only when they are all finished does job 12 get picked up followed by 13.

I can't really visualize the right way to approach this. I'd attempted to merge separate obserables which had their own pipe(mergeAll(num)) in them, but this didn't seem to work. I have a feeling that switchMap may be the key here, but I'm unsure how exactly to utilize it in this scenario.

The alternative thought I'd had, was attempting to change the concurrency value dynamically, but this seems like it may be even harder to do and may not result in exactly the behavior I'm looking for.

Any help would be greatly appreciated.

Ad

Answer

mergeMap has an optional concurrent parameter that tells it how many inner observables should it stay subscribed to at a time. In fact, concatMap is a mergeMap with 1 concurrency. So you can use this to your advantage:

const tasks = [
  { concurrency: 1, jobs: [1] },
  { concurrency: 2, jobs: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11] },
  { concurrency: 1, jobs: [12, 13] }
];

from(tasks)
  .pipe(
    concatMap(task => from(task.jobs).pipe( // single task such as `{ concurrency: 1, jobs: [1] }`
      mergeMap(job => doSyncJob(job), task.concurrency) // `task.concurrency` is the important part here
      // maybe `toArray()` to collect all results for this task into a single array
    )),
  )
  .subscribe(...);

Ad
source: stackoverflow.com
Ad