Angular Observable pipe

By Arvind Rai, October 30, 2018
This page will walk through Angular Observable pipe example. RxJS pipe is used to combine functional operators into a chain. pipe is an instance method of Observable as well as a standalone RxJS function. pipe can be used as Observable.pipe or we can use standalone pipe to combine functional operators. The declaration of pipe is as following.
public pipe(operations: ...*): Observable 
pipe accepts operators as arguments such as filter, map, mergeScan etc with comma separated and execute them in a sequence they are passed in as arguments and finally returns Observable instance. To get the result we need to subscribe it. Now let us discuss the complete example.

Technologies Used

Find the technologies being used in our example.
1. Angular 7.0.0
2. Angular CLI 7.0.3
3. TypeScript 3.1.1
4. Node.js 10.3.0
5. NPM 6.1.0
6. RxJS 6.3.3
7. In-Memory Web API 0.6.1

Import RxJS 6 Observable and Operators

In RxJS 6, Observable and operators are imported as following.
1. Import Observable and of from rxjs.
import { Observable, of } from 'rxjs'; 
pipe will be accessed using Observable.pipe. To use standalone pipe, we can also import it.
import { pipe } from 'rxjs'; 
2. Operators are imported from rxjs/operators. Find some operators to import it.
import { mergeMap, switchMap, retry, 
         map, catchError, filter, scan } from 'rxjs/operators'; 

Observable.pipe with filter, map and scan

Here we will use Observable.pipe to combine functions. Suppose we have a service method to get numbers.
getNumbers(): Observable<number> {
   return of(1, 2, 3, 4, 5, 6, 7);
} 
1. Find the code to use pipe with filter operator.
calculateNumbers() {
  this.bookService.getNumbers().pipe(
    filter(n => n % 2 === 1)
  )
  .subscribe(result => console.log(result));
} 
Output will be 1,3,5,7.
We can observe that we are passing filter operator within pipe. Final result of pipe after executing filter, will be instance of Observable. To get the result we need to subscribe it.
2. Find the code to use pipe with filter and map operators.
calculateNumbers() {
  this.bookService.getNumbers().pipe(
    filter(n => n % 2 === 1),
    map(n => n + 10)
  )
  .subscribe(result => console.log(result));
} 
Output will be 11, 13, 15, 17.
In the above code we are passing filter and map operators in pipe function as arguments. filter and map will execute in the order they are passed in. On the Observable instance returned by getNumbers(), filter operator will be executed and on the Observable instance returned by filter, map operator will be executed and the final result returned by pipe will be the result returned by last operator i.e. map in above code.
3. Find the code to use pipe with filter, map and scan operators.
calculateNumbers() {
  this.bookService.getNumbers().pipe(
    filter(n => n % 2 === 1),
    map(n => n + 10),
    scan((sum, n) => sum + n)
  )
  .subscribe(result => console.log(result));
} 
The output will be 11, 24, 39, 56.
In the above code we are using three operators with pipe i.e. filter, map and scan. They will execute in the order they are passed in as arguments. In the above code, first filter will execute then map and then scan. Final output returned by pipe will be Observable returned by scan.

Using Standalone pipe

To use pipe standalone, we need to import it as following.
import { pipe } from 'rxjs'; 
Now find the code snippet to use it.
calculateNumbers() {
  //Create a function to accept Observable instance   
  const calculationFun = pipe(
	filter((n: number) => n % 2 === 1),
	map((n: number) => n + 10),
	scan((sum, n) => sum + n)
  );

  //Instantiate response Observable
  const calculation$ = calculationFun(this.bookService.getNumbers());

  //Subscribe the Observable instance
  calculation$.subscribe(result => console.log(result));
} 
Output will be 11, 24, 39, 56.

Error Handling: pipe with retry and catchError

retry operator reties to access URL for the given number of time before failing. catchError has been introduced in RxJS 6. catchError is used in place of catch. Find the code to use pipe with retry and catchError operator.
getBooks() {
 this.allBooks$ = this.bookService.getBooksFromStore().pipe(
	retry(3), 
	map(books => {
	  if (books.length < 5) {
		throw new Error('Not enough books');
	  }
	  return books;
	}),
	catchError(err => {
	  console.error(err);
	  return of([]);
	})
 );
} 
We can also subscribe the Observable instance.
getBooks() {
 this.bookService.getBooksFromStore().pipe(
	retry(3), 
	map(books => {
	  if (books.length < 5) {
		throw new Error('Not enough books');
	  }
	  return books;
	}),
	catchError(err => {
	  console.error(err);
	  return of([]);
	})
 )
 .subscribe(books => this.allBooks = books);
} 

pipe with mergeMap

Find the code snippet to use pipe with mergeMap operator.
getAllFavBooks() {
   this.bookService.getFavBookFromStore(101).pipe(
     mergeMap(book => { 
      let category = book.category;
      return this.bookService.getBooksByCategoryFromStore(category);
     })
   ).subscribe(books => {
     this.allFavBooks = books;
   });
} 

pipe with switchMap

Find the code snippet to use pipe with switchMap operator.
searchSimilarBooks(id: number) {
  this.bookService.getFavBookFromStore(id).pipe(
    switchMap(book => {
      let category = book.category;
      return this.bookService.getBooksByCategoryFromStore(category);
    }),
    catchError(err => of([]))
  )
  .subscribe(books => {
    this.similarBooks = books;
    console.log(books);
  });
} 

Complete Example

Here we will provide complete demo to use pipe.
book.component.ts
import { Component, OnInit } from '@angular/core';
import { Observable, of, pipe } from 'rxjs';
import { mergeMap, switchMap, retry, 
         map, catchError, filter, scan } from 'rxjs/operators';
import { BookService } from './book.service';
import { Book } from './book';

@Component({
   selector: 'app-book',
   templateUrl: './book.component.html'
})
export class BookComponent implements OnInit { 
   softBooks: Book[];
   allFavBooks: Book[];
   similarBooks: Book[];

   constructor(private bookService: BookService) { }
   
   ngOnInit() {
      this.calculateNumbers1();
      this.calculateNumbers2();
      this.getSoftBooks();
      this.getAllFavBooks();
   }

   //Using Observable.pipe with filter, map and scan
   calculateNumbers1() {
    this.bookService.getNumbers().pipe(
      filter(n => n % 2 === 1),
      map(n => n + 10),
      scan((sum, n) => sum + n)
    )
    .subscribe(result => console.log(result));
   }

   //Using Standalone pipe
   calculateNumbers2() {
    //Create a function to accept Observable instance   
    const calculationFun = pipe(
        filter((n: number) => n % 2 === 1),
        map((n: number) => n + 10),
        scan((sum, n) => sum + n)
      );

    //Instantiate response Observable
    const calculation$ = calculationFun(this.bookService.getNumbers());

    //Subscribe the Observable instance
    calculation$.subscribe(result => console.log(result));
   }
 
   //Using retry() and catchError operator
   getSoftBooks() {
      this.bookService.getBooksFromStore().pipe(
          retry(3), 
          map(books => {
            if (books.length < 5) {  //It will throw error in console
              throw new Error('Not enough books');
            }
            return books;
          }),
          catchError(err => {
            console.error(err);
            return of([]);
          })
      )
      .subscribe(books => this.softBooks = books);
   }

   //Using mergeMap
   getAllFavBooks() {
      this.bookService.getFavBookFromStore(101).pipe(
          mergeMap(book => { 
            let category = book.category;
            return this.bookService.getBooksByCategoryFromStore(category);
          })
      ).subscribe(books => {
            this.allFavBooks = books;
      });
   }   
   
   //Using switchMap
   searchSimilarBooks(id: number) {
      this.bookService.getFavBookFromStore(id).pipe(
          switchMap(book => {
              let category = book.category;
              return this.bookService.getBooksByCategoryFromStore(category);
          }),
          catchError(err => of([]))
      )
      .subscribe(books => {
          this.similarBooks = books;
          console.log(books);
      });
   }
} 
book.component.html
<b>All Favorite Books</b><br/>
<ul>
  <li *ngFor="let book of allFavBooks" >
    Id: {{book.id}}, Name: {{book.name}}, Category: {{book.category}}
  </li>
</ul>

<b>Search Similar Books</b><br/><br/>
  
ID: <input [(ngModel)]="bookId" (input)="searchSimilarBooks(bookId)">

<ul>
  <li *ngFor="let book of similarBooks" >
    Id: {{book.id}}, Name: {{book.name}}, Category: {{book.category}}
  </li>
</ul> 
book.service.ts
import { Injectable } from '@angular/core';
import { HttpClient } from '@angular/common/http';
import { Observable, of } from 'rxjs';
import { Book } from './book';

@Injectable({
    providedIn: 'root'
})
export class BookService {
    bookUrl = "/api/books";	

    constructor(private http: HttpClient) { }
    
    getNumbers(): Observable<number> {
      return of(1, 2, 3, 4, 5, 6, 7);
    }
    getBooksFromStore(): Observable<Book[]> {
      return this.http.get<Book[]>(this.bookUrl);
    }
    getFavBookFromStore(id: number): Observable<Book> {
      return this.http.get<Book>(this.bookUrl + "/" + id);
    }    
    getBooksByCategoryFromStore(category: string): Observable<Book[]> {
      return this.http.get<Book[]>(this.bookUrl + "?category=" + category);
    }        
} 
book.ts
export interface Book {
   id: number;
   name: string;
   category: string;
} 
app.component.ts
import { Component } from '@angular/core';

@Component({
   selector: 'app-root',
   template: `
	<app-book></app-book>
    `
})
export class AppComponent { 
} 
test-data.ts
import { InMemoryDbService } from 'angular-in-memory-web-api';

export class TestData implements InMemoryDbService {
  createDb() {
    let bookDetails = [
      { id: 101, name: 'Angular by Krishna', category: 'Angular' },
      { id: 102, name: 'Core Java by Vishnu', category: 'Java' },
      { id: 103, name: 'NgRx by Rama', category: 'Angular' }
    ];
    return { books: bookDetails };
  }
} 
app.module.ts
import { NgModule } from '@angular/core';
import { BrowserModule } from '@angular/platform-browser';
import { FormsModule } from '@angular/forms';
import { HttpClientModule } from '@angular/common/http';

import { AppComponent }  from './app.component';
import { BookComponent }  from './book.component';
import { BookService } from './book.service';

//For InMemory testing
import { InMemoryWebApiModule } from 'angular-in-memory-web-api';
import { TestData } from './test-data';

@NgModule({
  imports: [     
      BrowserModule,
      HttpClientModule,
      FormsModule,
      InMemoryWebApiModule.forRoot(TestData)		
  ],
  declarations: [
      AppComponent,
      BookComponent
  ],
  providers: [
  ],
  bootstrap: [
      AppComponent
  ]
})
export class AppModule { } 

Run Application

To run the application, find the steps.
1. Download source code using download link given below on this page.
2. Use downloaded src in your Angular CLI application. To install Angular CLI, find the link.
3. Install angular-in-memory-web-api@0.6.1
4. Run ng serve using command prompt.
5. Access the URL http://localhost:4200
Find the print screen of the output.
Angular Observable pipe

References

RxJS Observable
Angular: The RxJS library
Pipeable Operators

Download Source Code

POSTED BY
ARVIND RAI
ARVIND RAI








©2023 concretepage.com | Privacy Policy | Contact Us