Home  >  Angular

Angular + mergeMap

By Arvind Rai, May 05, 2019
RxJS mergeMap operator projects each source value to an Observable and finally they are merged into output Observable using RxJS mergeAll operator. If the source Observable has emitted more than one element to mergeMap and if inner Observable of mergeMap has not completed yet for the previous element then mergeMap will wait to execute all inner Observable and then merge them in one Observable as output.
Suppose source Observable is emitting 3 elements consecutively to mergeMap and every element is processed by an inner Observable inside mergeMap. Now suppose all the three elements from source Observable has entered into mergeMap before inner Observable for first element is not completed. Then mergeMap will wait to complete the inner Observable for all the three elements and then their Observable responses will be merged into one Observable response and returned it by mergeMap as output.


Technologies Used

Find the technologies being used in our example.
1. Angular 7.0.0
2. Angular CLI 7.0.3
3. TypeScript 3.1.1
4. Node.js 10.3.0
5. NPM 6.1.0
6. RxJS 6.3.3

Using RxJS mergeMap

RxJS mergeMap is imported as following.
import { mergeMap } from 'rxjs/operators'; 
Find sample examples.
Example-1:
of('x', 'y', 'z').pipe(
   mergeMap(el => of(1, 2).pipe(
	   delay(2000),
	   map(num => el+num)        
	 )
   )
).subscribe(res => console.log(res)); 
The 'x', 'y', 'z' elements from the source Observable will be emitted to mergeMap. The inner Observable will emit 1, 2 number for every elements of source Observable. We are delaying inner Observable process by 2 seconds and hence before completing request for 'x' element, the 'y' and 'z' element will also enter into mergeMap. So mergeMap will wait for completion of inner Observable for all the three elements and finally will merge them into one Observable as output of mergeMap.
We can understand merging of inner Observable into one as following.

of(x1) + of(x2) + of(y1) + of(y2) + of(z1) + of(z2) = of(x1, x2, y1, y2, z1, z2)

When we subscribe, we will get output x1, x2, y1, y2, z1, z2.
If inner Observable of mergeMap, is responding for any emitted element from source Observable before getting request for new element, then there is nothing to merge.

Example-2:
id = 102;
addMoreBooks() {
 let book1 = new Book(++this.id, "Book-"+ this.id);
 let book2 = new Book(++this.id, "Book-"+ this.id);
 let book3 = new Book(++this.id, "Book-"+ this.id);

 of(book1, book2, book3).pipe(
	mergeMap(book => {
	  delay(2000);
	  return this.bookService.addBook(book);
	})
 ).subscribe(book=>{
	console.log(book.id+":"+book.name);
  });
} 
In the above code, before addBook returns response for book1, source Observable will emit book2, and book3 to mergeMap. So the responses of all the three inner Observable will be merged as final Observable output of mergeMap. On subscribe of above code, output in console would be as following.
103:Book-103
104:Book-104
105:Book-105 

mergeMap + catchError

To handle the error thrown by mergeMap, we can use catchError as following.
of(book1, book2, book3).pipe(
	mergeMap(book => {
	  delay(2000);
	  return this.bookService.addBook(book);
	}),
	catchError(err => {
	  console.error(err.message);
	  return of(new Book(100, "Default Book"));
	})
).subscribe(book=>{
	console.log(book.id+":"+book.name);
}); 

mergeMap vs switchMap vs concatMap vs exhaustMap

1. mergeMap merges the output of inner Observable into one Observable and returns it as response. Output order is not fixed.
of('x', 'y', 'z').pipe(
   mergeMap(el => of(1, 2).pipe(
	   delay(2000),
	   map(num => el+num)        
	 )
   )
).subscribe(res => console.log(res)); 
Output
x1, x2, y1, y2, z1, z2

2. switchMap returns the response of only latest inner Observable output.
of('x', 'y', 'z').pipe(
   switchMap(el => of(1, 2).pipe(
	   delay(2000),
	   map(num => el+num)        
	 )
   )
).subscribe(res => console.log(res)); 
Output
z1, z2

3. concatMap is same as mergeMap operator but inner Observable are not merged but they are concatenated and keeps the order.
of('x', 'y', 'z').pipe(
   concatMap(el => of(1, 2).pipe(
	   delay(2000),
	   map(num => el+num)        
	 )
   )
).subscribe(res => console.log(res)); 
Output
x1, x2 (after 2 seconds) y1, y2 (after 2 seconds) z1, z2

4. exhaustMap returns the response of oldest inner Observable. It is opposite to switchMap operator.
of('x', 'y', 'z').pipe(
   exhaustMap(el => of(1, 2).pipe(
	   delay(2000),
	   map(num => el+num)        
	 )
   )
).subscribe(res => console.log(res)); 
Output
x1, x2

Complete Example

book.component.ts
import { Component, OnInit } from '@angular/core';
import { FormControl } from '@angular/forms';
import { Observable, of } from 'rxjs';
import { mergeMap, switchMap, catchError, map, delay } from 'rxjs/operators';
import { BookService } from './book.service';
import { Book } from './book';

@Component({
   selector: 'app-book',
   templateUrl: '/book.component.html'
})
export class BookComponent implements OnInit { 
   book: Book;
   allBooks$: Observable<Book[]>;

   constructor(private bookService: BookService) { }
   ngOnInit() {
     this.searchBook();
     this.getAllBooks();

     of('x', 'y', 'z').pipe(
      mergeMap(el => of(1, 2).pipe(
           delay(2000),
           map(num => el+num)        
         )
       )
     ).subscribe(res => console.log(res));

   }

   id = 102;
   addMoreBooks() {
     let book1 = new Book(++this.id, "Book-"+ this.id);
     let book2 = new Book(++this.id, "Book-"+ this.id);
     let book3 = new Book(++this.id, "Book-"+ this.id);
     of(book1, book2, book3).pipe(
        mergeMap(book => {
          delay(2000);
          return this.bookService.addBook(book);
        }),
        catchError(err => {
          console.error(err.message);
          return of(new Book(100, "Default Book"));
        })
     ).subscribe(book=>{
        console.log(book.id+":"+book.name);
        this.getAllBooks();
      });
   }
   bookId = new FormControl(); 
   searchBook() {
    this.bookId.valueChanges.pipe(
      switchMap(id => {
        delay(2000);
        if (id > this.id || id < 101) {
          return of(null);
        }
        return this.bookService.getBookById(id);
      })
    ).subscribe(res => this.book = res);
   }
   getAllBooks() {
      this.allBooks$ = this.bookService.getAllBooks();
   }
} 
book.component.html
<h3>All Books</h3>
<div *ngFor="let book of allBooks$ | async">
  Id: {{book.id}}, Name: {{book.name}}
</div>
<button (click)="addMoreBooks()">Get More Books</button>
<h3>Search Book</h3>
  ID: <input [formControl]="bookId">
<div *ngIf="book">
  Id: {{book.id}}, Name: {{book.name}}
</div> 
book.ts
export class Book {
   constructor(public id: number, public name: string) { }
} 
book.service.ts
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable } from 'rxjs';
import { Book } from './book';

@Injectable({
    providedIn: 'root'
})
export class BookService {
    bookUrl = "/api/books";	

    constructor(private http: HttpClient) { }
    addBook(book: Book): Observable<Book> {
      return this.http.post<Book>(this.bookUrl, book);
    }
    removeBook(id: number) {
      return this.http.delete<any>(this.bookUrl + "/" + id);
    }
    getBookById(id: number): Observable<Book> {
      return this.http.get<Book>(this.bookUrl + "/" + id);
    }    
    getAllBooks(): Observable<Book[]> {
      return this.http.get<Book[]>(this.bookUrl);
    }        
} 
app.component.ts
import { Component } from '@angular/core';

@Component({
   selector: 'app-root',
   template: `
    <app-book></app-book>
   `
})
export class AppComponent { 
} 
test-data.ts
import { InMemoryDbService } from 'angular-in-memory-web-api';

export class TestData implements InMemoryDbService {
  createDb() {
    let bookDetails = [
      { id: 101, name: 'Angular by Krishna' },
      { id: 102, name: 'Core Java by Vishnu' },
    ];
    return { books: bookDetails };
  }
} 
app.module.ts
import { NgModule } from '@angular/core';
import { BrowserModule } from '@angular/platform-browser';
import { ReactiveFormsModule } from '@angular/forms';
import { HttpClientModule } from '@angular/common/http';
import { RouterModule }   from '@angular/router';

import { AppComponent }  from './app.component';
import { BookComponent }  from './book.component';

//For InMemory testing
import { InMemoryWebApiModule } from 'angular-in-memory-web-api';
import { TestData } from './test-data';

@NgModule({
  imports: [     
      BrowserModule,
      HttpClientModule,
      ReactiveFormsModule,
      InMemoryWebApiModule.forRoot(TestData, {delay: 2000})		
  ],
  declarations: [
      AppComponent,
      BookComponent
  ],
  providers: [
  ],
  bootstrap: [
      AppComponent
  ]
})
export class AppModule { } 

Run Application

To run the application, find the steps.
1. Download source code using download link given below on this page.
2. Use downloaded src in your Angular CLI application. To install Angular CLI, find the link.
3. Run ng serve using command prompt.
4. Access the URL http://localhost:4200
Find the print screen of the output.
Angular + mergeMap

References

RxJS mergeMap
Angular + switchMap Example

Download Source Code

POSTED BY
ARVIND RAI
ARVIND RAI
FIND MORE TUTORILAS






©2019 concretepage.com | Privacy Policy | Contact Us