How To Use RxJS To Coordinate Both Serial And Parallel Jobs?
With RxJS, I'm struggling to find a way to process a mix of serial and parallel jobs. Or even vary/change the processing concurrency of a queue of jobs.
Here is a basic working example of processing items in an array with concurrency hardcoded:
import { from, defer } from 'rxjs'
import { mergeAll } from 'rxjs/operators'
const delay = 1000
const concurrency = 2
const doSomethingSlow = async (val) => {
await new Promise(resolve => {
setTimeout(() => {
console.log(`${val} - Done`)
resolve()
}, delay)
})
}
const jobs = [
1,2,3,4,5,6,7,8,9,10
]
const observables = from(jobs.map(num => defer(() => doSomethingSlow(num))))
observables
.pipe(
mergeAll(concurrency)
)
.subscribe()
https://stackblitz.com/edit/rxjs-tyfer3?file=index.ts
What I'd like to do is have the parent level array as a list of Serial Tasks that contain sub-arrays of Parallel Jobs. e.g..
const tasks = [
{ concurrency: 1, jobs: [1] },
{ concurrency: 2, jobs: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11] },
{ concurrency: 1, jobs: [12, 13] }
]
The hope would be that job 1
is processed first and that nothing else starts until it finishes. Then jobs 2
and 3
are picked up in parallel and jobs 4
through 11
are processed whenever a slot opens up out of the two available concurrency processors. Only when they are all finished does job 12
get picked up followed by 13
.
I can't really visualize the right way to approach this. I'd attempted to merge
separate obserables
which had their own pipe(mergeAll(num))
in them, but this didn't seem to work. I have a feeling that switchMap
may be the key here, but I'm unsure how exactly to utilize it in this scenario.
The alternative thought I'd had, was attempting to change the concurrency value dynamically, but this seems like it may be even harder to do and may not result in exactly the behavior I'm looking for.
Any help would be greatly appreciated.
Answer
mergeMap
has an optional concurrent
parameter that tells it how many inner observables should it stay subscribed to at a time. In fact, concatMap
is a mergeMap
with 1
concurrency. So you can use this to your advantage:
const tasks = [
{ concurrency: 1, jobs: [1] },
{ concurrency: 2, jobs: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11] },
{ concurrency: 1, jobs: [12, 13] }
];
from(tasks)
.pipe(
concatMap(task => from(task.jobs).pipe( // single task such as `{ concurrency: 1, jobs: [1] }`
mergeMap(job => doSyncJob(job), task.concurrency) // `task.concurrency` is the important part here
// maybe `toArray()` to collect all results for this task into a single array
)),
)
.subscribe(...);
Related Questions
- → How to update data attribute on Ajax complete
- → October CMS - Radio Button Ajax Click Twice in a Row Causes Content to disappear
- → Octobercms Component Unique id (Twig & Javascript)
- → Passing a JS var from AJAX response to Twig
- → Laravel {!! Form::open() !!} doesn't work within AngularJS
- → DropzoneJS & Laravel - Output form validation errors
- → Import statement and Babel
- → Uncaught TypeError: Cannot read property '__SECRET_DOM_DO_NOT_USE_OR_YOU_WILL_BE_FIRED' of undefined
- → React-router: Passing props to children
- → ListView.DataSource looping data for React Native
- → Can't test submit handler in React component
- → React + Flux - How to avoid global variable
- → Webpack, React & Babel, not rendering DOM