Angular Observable pipe

Written by: Arvind Rai,
Last updated:
January 25, 2024
This page will walk through Angular Observable pipe example. RxJS pipe is used to combine functional operators into a chain. pipe is an instance method of Observable as well as a standalone RxJS function. pipe can be used as Observable.pipe or we can use standalone pipe to combine functional operators. The declaration of pipe is as following.
public pipe(operations: ...*): Observable 
pipe accepts operators as arguments such as filter, map, mergeScan etc with comma separated and execute them in a sequence they are passed in as arguments and finally returns Observable instance. To get the result we need to subscribe it. Now let us discuss the complete example.

1. Import RxJS Observable and Operators

In RxJS 6, Observable and operators are imported as following.
1. Import Observable and of from rxjs.
import { Observable, of } from 'rxjs'; 
pipe will be accessed using Observable.pipe. To use standalone pipe, we can also import it.
import { pipe } from 'rxjs'; 
2. Operators are imported from rxjs/operators. Find some operators to import it.
import { mergeMap, switchMap, retry, 
         map, catchError, filter, scan } from 'rxjs/operators'; 

2. Observable.pipe with filter, map and scan

Here we will use Observable.pipe to combine functions. Suppose we have a service method to get numbers.
getNumbers(): Observable<number> {
   return of(1, 2, 3, 4, 5, 6, 7);
} 
1. Find the code to use pipe with filter operator.
calculateNumbers() {
  this.bookService.getNumbers().pipe(
    filter(n => n % 2 === 1)
  )
  .subscribe(result => console.log(result));
} 
Output will be 1,3,5,7.
We can observe that we are passing filter operator within pipe. Final result of pipe after executing filter, will be instance of Observable. To get the result we need to subscribe it.
2. Find the code to use pipe with filter and map operators.
calculateNumbers() {
  this.bookService.getNumbers().pipe(
    filter(n => n % 2 === 1),
    map(n => n + 10)
  )
  .subscribe(result => console.log(result));
} 
Output will be 11, 13, 15, 17.
In the above code we are passing filter and map operators in pipe function as arguments. filter and map will execute in the order they are passed in. On the Observable instance returned by getNumbers(), filter operator will be executed and on the Observable instance returned by filter, map operator will be executed and the final result returned by pipe will be the result returned by last operator i.e. map in above code.
3. Find the code to use pipe with filter, map and scan operators.
calculateNumbers() {
  this.bookService.getNumbers().pipe(
    filter(n => n % 2 === 1),
    map(n => n + 10),
    scan((sum, n) => sum + n)
  )
  .subscribe(result => console.log(result));
} 
The output will be 11, 24, 39, 56.
In the above code we are using three operators with pipe i.e. filter, map and scan. They will execute in the order they are passed in as arguments. In the above code, first filter will execute then map and then scan. Final output returned by pipe will be Observable returned by scan.

3. Using Standalone pipe

To use pipe standalone, we need to import it as following.
import { pipe } from 'rxjs'; 
Now find the code snippet to use it.
calculateNumbers() {
  //Create a function to accept Observable instance   
  const calculationFun = pipe(
	filter((n: number) => n % 2 === 1),
	map((n: number) => n + 10),
	scan((sum, n) => sum + n)
  );

  //Instantiate response Observable
  const calculation$ = calculationFun(this.bookService.getNumbers());

  //Subscribe the Observable instance
  calculation$.subscribe(result => console.log(result));
} 
Output will be 11, 24, 39, 56.

4. Error Handling: pipe with retry and catchError

retry operator reties to access URL for the given number of time before failing. catchError has been introduced in RxJS 6. catchError is used in place of catch. Find the code to use pipe with retry and catchError operator.
getBooks() {
 this.allBooks$ = this.bookService.getBooksFromStore().pipe(
	retry(3), 
	map(books => {
	  if (books.length < 5) {
		throw new Error('Not enough books');
	  }
	  return books;
	}),
	catchError(err => {
	  console.error(err);
	  return of([]);
	})
 );
} 
We can also subscribe the Observable instance.
getBooks() {
 this.bookService.getBooksFromStore().pipe(
	retry(3), 
	map(books => {
	  if (books.length < 5) {
		throw new Error('Not enough books');
	  }
	  return books;
	}),
	catchError(err => {
	  console.error(err);
	  return of([]);
	})
 )
 .subscribe(books => this.allBooks = books);
} 

5. pipe with mergeMap

Find the code snippet to use pipe with mergeMap operator.
getAllFavBooks() {
   this.bookService.getFavBookFromStore(101).pipe(
     mergeMap(book => { 
      let category = book.category;
      return this.bookService.getBooksByCategoryFromStore(category);
     })
   ).subscribe(books => {
     this.allFavBooks = books;
   });
} 

6. pipe with switchMap

Find the code snippet to use pipe with switchMap operator.
searchSimilarBooks(id: number) {
  this.bookService.getFavBookFromStore(id).pipe(
    switchMap(book => {
      let category = book.category;
      return this.bookService.getBooksByCategoryFromStore(category);
    }),
    catchError(err => of([]))
  )
  .subscribe(books => {
    this.similarBooks = books;
    console.log(books);
  });
} 

7. Output

Download source code and run the application. Find the print-screen of the output.
Angular Observable pipe

8. Reference

RxJS Operators

9. Download Source Code

Join the Newsletter

(Subscribe to get our latest content directly into your inbox)

WRITTEN BY
ARVIND RAI
ARVIND RAI









©2024 concretepage.com | Privacy Policy | Contact Us