Angular + mergeMap
May 05, 2019
RxJS mergeMap
operator projects each source value to an Observable
and finally they are merged into output Observable
using RxJS mergeAll
operator. If the source Observable
has emitted more than one element to mergeMap
and if inner Observable
of mergeMap
has not completed yet for the previous element then mergeMap
will wait to execute all inner Observable
and then merge them in one Observable
as output.
Suppose source
Observable
is emitting 3 elements consecutively to mergeMap
and every element is processed by an inner Observable
inside mergeMap
. Now suppose all the three elements from source Observable
has entered into mergeMap
before inner Observable
for first element is not completed. Then mergeMap
will wait to complete the inner Observable
for all the three elements and then their Observable
responses will be merged into one Observable
response and returned it by mergeMap
as output.
Contents
Technologies Used
Find the technologies being used in our example.1. Angular 7.0.0
2. Angular CLI 7.0.3
3. TypeScript 3.1.1
4. Node.js 10.3.0
5. NPM 6.1.0
6. RxJS 6.3.3
Using RxJS mergeMap
RxJSmergeMap
is imported as following.
import { mergeMap } from 'rxjs/operators';
Example-1:
of('x', 'y', 'z').pipe( mergeMap(el => of(1, 2).pipe( delay(2000), map(num => el+num) ) ) ).subscribe(res => console.log(res));
Observable
will be emitted to mergeMap
. The inner Observable
will emit 1, 2 number for every elements of source Observable
. We are delaying inner Observable
process by 2 seconds and hence before completing request for 'x' element, the 'y' and 'z' element will also enter into mergeMap
. So mergeMap
will wait for completion of inner Observable
for all the three elements and finally will merge them into one Observable
as output of mergeMap
.
We can understand merging of inner
Observable
into one as following.
of(x1) + of(x2) + of(y1) + of(y2) + of(z1) + of(z2) = of(x1, x2, y1, y2, z1, z2)
When we subscribe, we will get output x1, x2, y1, y2, z1, z2.
If inner
Observable
of mergeMap
, is responding for any emitted element from source Observable
before getting request for new element, then there is nothing to merge.
Example-2:
id = 102; addMoreBooks() { let book1 = new Book(++this.id, "Book-"+ this.id); let book2 = new Book(++this.id, "Book-"+ this.id); let book3 = new Book(++this.id, "Book-"+ this.id); of(book1, book2, book3).pipe( mergeMap(book => { delay(2000); return this.bookService.addBook(book); }) ).subscribe(book=>{ console.log(book.id+":"+book.name); }); }
addBook
returns response for book1, source Observable
will emit book2, and book3 to mergeMap
. So the responses of all the three inner Observable
will be merged as final Observable
output of mergeMap
. On subscribe of above code, output in console would be as following.
103:Book-103 104:Book-104 105:Book-105
mergeMap + catchError
To handle the error thrown bymergeMap
, we can use catchError
as following.
of(book1, book2, book3).pipe( mergeMap(book => { delay(2000); return this.bookService.addBook(book); }), catchError(err => { console.error(err.message); return of(new Book(100, "Default Book")); }) ).subscribe(book=>{ console.log(book.id+":"+book.name); });
mergeMap vs switchMap vs concatMap vs exhaustMap
1. mergeMap merges the output of innerObservable
into one Observable
and returns it as response. Output order is not fixed.
of('x', 'y', 'z').pipe( mergeMap(el => of(1, 2).pipe( delay(2000), map(num => el+num) ) ) ).subscribe(res => console.log(res));
x1, x2, y1, y2, z1, z2
2. switchMap returns the response of only latest inner
Observable
output.
of('x', 'y', 'z').pipe( switchMap(el => of(1, 2).pipe( delay(2000), map(num => el+num) ) ) ).subscribe(res => console.log(res));
z1, z2
3. concatMap is same as
mergeMap
operator but inner Observable
are not merged but they are concatenated and keeps the order.
of('x', 'y', 'z').pipe( concatMap(el => of(1, 2).pipe( delay(2000), map(num => el+num) ) ) ).subscribe(res => console.log(res));
x1, x2 (after 2 seconds) y1, y2 (after 2 seconds) z1, z2
4. exhaustMap returns the response of oldest inner
Observable
. It is opposite to switchMap
operator.
of('x', 'y', 'z').pipe( exhaustMap(el => of(1, 2).pipe( delay(2000), map(num => el+num) ) ) ).subscribe(res => console.log(res));
x1, x2
Complete Example
book.component.tsimport { Component, OnInit } from '@angular/core'; import { FormControl } from '@angular/forms'; import { Observable, of } from 'rxjs'; import { mergeMap, switchMap, catchError, map, delay } from 'rxjs/operators'; import { BookService } from './book.service'; import { Book } from './book'; @Component({ selector: 'app-book', templateUrl: '/book.component.html' }) export class BookComponent implements OnInit { book: Book; allBooks$: Observable<Book[]>; constructor(private bookService: BookService) { } ngOnInit() { this.searchBook(); this.getAllBooks(); of('x', 'y', 'z').pipe( mergeMap(el => of(1, 2).pipe( delay(2000), map(num => el+num) ) ) ).subscribe(res => console.log(res)); } id = 102; addMoreBooks() { let book1 = new Book(++this.id, "Book-"+ this.id); let book2 = new Book(++this.id, "Book-"+ this.id); let book3 = new Book(++this.id, "Book-"+ this.id); of(book1, book2, book3).pipe( mergeMap(book => { delay(2000); return this.bookService.addBook(book); }), catchError(err => { console.error(err.message); return of(new Book(100, "Default Book")); }) ).subscribe(book=>{ console.log(book.id+":"+book.name); this.getAllBooks(); }); } bookId = new FormControl(); searchBook() { this.bookId.valueChanges.pipe( switchMap(id => { delay(2000); if (id > this.id || id < 101) { return of(null); } return this.bookService.getBookById(id); }) ).subscribe(res => this.book = res); } getAllBooks() { this.allBooks$ = this.bookService.getAllBooks(); } }
<h3>All Books</h3> <div *ngFor="let book of allBooks$ | async"> Id: {{book.id}}, Name: {{book.name}} </div> <button (click)="addMoreBooks()">Get More Books</button> <h3>Search Book</h3> ID: <input [formControl]="bookId"> <div *ngIf="book"> Id: {{book.id}}, Name: {{book.name}} </div>
export class Book { constructor(public id: number, public name: string) { } }
import { Injectable } from '@angular/core'; import { HttpClient } from '@angular/common/http'; import { Observable } from 'rxjs'; import { Book } from './book'; @Injectable({ providedIn: 'root' }) export class BookService { bookUrl = "/api/books"; constructor(private http: HttpClient) { } addBook(book: Book): Observable<Book> { return this.http.post<Book>(this.bookUrl, book); } removeBook(id: number) { return this.http.delete<any>(this.bookUrl + "/" + id); } getBookById(id: number): Observable<Book> { return this.http.get<Book>(this.bookUrl + "/" + id); } getAllBooks(): Observable<Book[]> { return this.http.get<Book[]>(this.bookUrl); } }
import { Component } from '@angular/core'; @Component({ selector: 'app-root', template: ` <app-book></app-book> ` }) export class AppComponent { }
import { InMemoryDbService } from 'angular-in-memory-web-api'; export class TestData implements InMemoryDbService { createDb() { let bookDetails = [ { id: 101, name: 'Angular by Krishna' }, { id: 102, name: 'Core Java by Vishnu' }, ]; return { books: bookDetails }; } }
import { NgModule } from '@angular/core'; import { BrowserModule } from '@angular/platform-browser'; import { ReactiveFormsModule } from '@angular/forms'; import { HttpClientModule } from '@angular/common/http'; import { RouterModule } from '@angular/router'; import { AppComponent } from './app.component'; import { BookComponent } from './book.component'; //For InMemory testing import { InMemoryWebApiModule } from 'angular-in-memory-web-api'; import { TestData } from './test-data'; @NgModule({ imports: [ BrowserModule, HttpClientModule, ReactiveFormsModule, InMemoryWebApiModule.forRoot(TestData, {delay: 2000}) ], declarations: [ AppComponent, BookComponent ], providers: [ ], bootstrap: [ AppComponent ] }) export class AppModule { }
Run Application
To run the application, find the steps.1. Download source code using download link given below on this page.
2. Use downloaded src in your Angular CLI application. To install Angular CLI, find the link.
3. Run ng serve using command prompt.
4. Access the URL http://localhost:4200
Find the print screen of the output.

References
RxJS mergeMapAngular + switchMap Example