👉 Codes: finish custom observables.
- An observable = a data source.
- Observable patterns → observable —- stream —- observer
- Observer (in previous codes, you subscribe something to the event from observable) → 3 type of data received (3 hooks): Handle data, Handle error, Handle completion.
- Observables are contracts to which you subscribe to be informed about the changes in data.
There are several ways to create an observable from rxjs package.
1// home.component.ts
2import { interval, Subscription } from 'rxjs';
3
4export class ... implements OnInit, OnDestroy {
5 private firstObsSub: Subscription;
6
7 ngOnInit() {
8 this.firstObsSub = interval( 1000 ) // every seconds, a new event will be emitted
9 .subscribe( count => {
10 console.log(count); // log to the console
11 })
12 }
13
14 // we have to .unsubscribe() -> if not, we get more and more observables
15 // whenever we get back to home
16 ngOnDestroy(): void {
17//| // ^return nothing
18//^whenever we leave the component
19 this.firstObsSub.unsubscribe(); // prevent memory leak
20 }
21}
All observables provided by Angular → no need to import from "rxjs" + no need to unsubscribe().
1customIntervalObs.subscribe(
2 <function_for_next>, // performed when get an emit
3 <function_for_error>, // performed when there is error
4 <function_for_complete> // perform when finish getting emit
5);
You rarely build your own observable → use the ones given by angular or somewhere else. Below are just for more understanding how to do so if you want!
Create manually
interval()
1import { Observable } from 'rxjs';
2
3export ... {
4 private customObsSub: Subscription;
5
6 ngOnInit() {
7 const customIntervalObs = Observable.create(observer => {
8 let count = 0;
9 setInterval(() => {
10 observer.next(count);
11 // ^method to emit a new value, there are also: .error(), .complete()
12 count++;
13 }, 1000);
14 });
15
16 customObsSub = customIntervalObs.subscribe(data => {
17 console.log(data);
18 });
19 }
20
21 ngOnDestroy(): void {
22 this.customObsSub.unsubscribe();
23 }
24}
If we send HTTP request → we usually get .
1// using the same structure of code as previous code box
2setInterval(() => {
3 observer.next(count);
4 if (count > 3) {
5 observer.error(new Error('Count is greater than 3!'));
6 } // stop subscription -> observer ends at "4"
7 count++;
8}, 1000);
9
10customObsSub = customIntervalObs.subscribe(data => {
11 console.log(data);
12}, error => {
13 console.log(error); // or you could send error to backend
14 alert(error.message); // "Count is greater than 3!"
15});
There are some observables can be completed → what we will do when it happens?
Cancled due to an error IS DIFFERENT than it completes! → complete function in
.subscribe()
is not executed when there is an error!1setInterval(() => {
2 observer.next(count);
3 if (count === 2) {
4 observer.complete(); // complete observable before count=3
5 }
6 if (count > 3) {
7 observer.error(new Error('Count is greater than 3!'));
8 }
9 count++;
10}, 1000);
11
12customObsSub = customIntervalObs.subscribe(data => {
13 console.log(data);
14}, error => {
15 console.log(error); // or you could send error to backend
16 alert(error.message); // "Count is greater than 3!"
17}, () => {
18 console.log("Completed!"); // you don't need to unsubscribe if your observable
19 // did complete BUT there nothing wrong if you
20 // keep using .unsubcribe()
21});
1import { map } from 'rxjs/operators';
Operators are the magic features of the RxJS library turn observables → awesome construct!
Example: we wanna add "Round" before the number emiited (Round 1, Round 2,....) → an old option is to add "Round" in the
.subscribe()
function. → We can do that before theh subscription using Operators! ← .pipe()
comes in!1this.firstObsSubscription = customIntervalObservable.pipe(filter(data => {
2 return data > 0;
3 }), map((data: number) => {
4 return 'Round: ' + (data + 1);
5 })).subscribe(data => {
6 console.log(data);
7 }, ...);
We need a transformation of the received data before you subscribe → using operators.
With
.pipe()
, we can apply 1 or more operators inside it!1// import operators from rxjs/operators first!
2.pipe(<operator>, <operator>, ...)
Instead of using
EventEmitter
(from @angular/core
), to emit and subscribe event between components, we can use a subject!1// user.service.ts
2import { Subject } from 'rxjs';
3
4@Injectable({providedIn: 'root'})
5export class UserService {
6 activatedEmitter = new Subject<boolean>();
7}
8
9// compare with using EventEmitter
10@Injectable()
11export ... {
12 activatedEmitter = new EventEmitter<boolean>();
13}
1// user.component.ts
2onActivate() {
3 this.userService.activatedEmitter.next(true);
4}
5
6// compare with EventEmitter
7onActivate() {
8 this.userService.activatedEmitter.emit(true);
9}
1// app.component.ts
2// we use the same for both EventEmitter and Subject
3ngOnInit() {
4 this.userService.activateEmitter.subscribe(...);
5}
Unlike normal observable (you can created) →
.next()
is used INSIDE the observable (when you created it) ← subject is different: we can call .next()
outside the observable!There are also other subclasses for Subjects:
BehaviorSubject
, ReplaySubject
,...GOOD PRACTICE:
- Don't use
EventEmitter
, useSubject
!!!! ← don't forget to unsubcribe to Subect when you don't need them. ← by storing the subscription + then unsubscribe(). - Only use Subject to communicate across components through services/mechanisms where you in the end subscribe to somewhere!
- Not subscribe to an event which probably is an output if you do plan to subscribe manually!
- ONLY USE
EventEmitter
on the@Output
property!! ← EventEmitter gets cleaned auto (no need unsubscribe)