Angular Rxjs Patterns
RxJS reactive patterns for data fetching, state management, and event handling in Angular
You are an expert in RxJS patterns for building reactive Angular applications with efficient data flows and clean observable composition. ## Key Points 1. **Use `takeUntilDestroyed()` for manual subscriptions.** It auto-completes when the component's `DestroyRef` fires. Always prefer `async` pipe or `toSignal` when possible. 2. **Choose the right flattening operator.** `switchMap` for search/navigation, `exhaustMap` for button clicks, `concatMap` for ordered writes, `mergeMap` for parallel work. 4. **Prefer declarative streams over imperative subscriptions.** Build observable chains that the template consumes via `async` pipe rather than subscribing in code and writing to local variables. 5. **Keep operators in pipelines focused.** If a pipeline exceeds 8-10 operators, extract intermediate observables or move logic into a service. 6. **Use `toSignal` / `toObservable` for interop.** Angular's rxjs-interop package bridges signals and observables cleanly. - **Memory leaks from unsubscribed observables.** Observables from `HttpClient` auto-complete, but router events, form value changes, and custom subjects do not. Always unsubscribe. - **Nested subscriptions.** Subscribing inside a subscribe callback is a code smell. Use flattening operators instead: - **Using `shareReplay` without `refCount`.** Without `refCount: true`, the source subscription is never torn down even when all downstream subscribers unsubscribe. - **Swallowing errors silently.** `catchError(() => EMPTY)` hides failures. Always log or report errors before recovering.
skilldb get angular-skills/Angular Rxjs PatternsFull skill: 313 linesRxJS Patterns — Angular
You are an expert in RxJS patterns for building reactive Angular applications with efficient data flows and clean observable composition.
Core Philosophy
RxJS is Angular's tool for managing asynchronous complexity. While signals handle synchronous reactive state elegantly, RxJS excels at async streams: HTTP responses, WebSocket messages, form value changes over time, timer-based polling, and complex event composition with backpressure control. The key insight is choosing the right tool for the job — signals for synchronous derived state, RxJS for async streams and event coordination.
The most important RxJS skill in Angular is choosing the right flattening operator. switchMap cancels the previous inner observable, making it perfect for search typeahead where only the latest query matters. exhaustMap ignores new emissions while the current one is in progress, preventing duplicate login requests from button double-clicks. concatMap queues emissions in order for sequential writes. mergeMap runs in parallel for batch processing. Using the wrong operator is not just a performance issue — it causes real bugs like duplicate form submissions or lost search results.
Declarative observable chains are the RxJS ideal. Instead of subscribing to an observable, storing the result in a local variable, and binding that variable in the template, you define the observable pipeline declaratively and let the async pipe or toSignal handle subscription lifecycle. This approach eliminates an entire class of bugs (forgotten unsubscribe, stale state, timing errors) and makes the data flow visible in the code.
Anti-Patterns
-
Nested Subscriptions — subscribing inside a subscribe callback, creating a deeply nested, hard-to-read pattern that also leaks subscriptions. Always use flattening operators (
switchMap,mergeMap, etc.) to compose observable chains. -
shareReplayWithoutrefCount— usingshareReplay(1)without{ refCount: true }, which keeps the source subscription alive permanently even after all subscribers leave. This causes memory leaks in long-running applications. -
Swallowing Errors Silently — writing
catchError(() => EMPTY)without logging or reporting the error. The stream recovers, but the failure is invisible and undiagnosable. Always log or report before recovering. -
combineLatestfor One-Time Parallel Requests — usingcombineLatestto combine HTTP requests that each emit once, whenforkJoinis the correct operator for waiting on multiple completable sources.combineLatestwill not emit until every source has emitted at least once, which is fine, but it also re-emits on every subsequent emission, which is unnecessary for one-shot requests. -
Manual Subscription Without Cleanup — calling
.subscribe()in a component withouttakeUntilDestroyed(), theasyncpipe, ortoSignal. This creates memory leaks because router events, form value changes, and custom subjects do not auto-complete.
Overview
RxJS (Reactive Extensions for JavaScript) is deeply integrated into Angular for handling asynchronous operations — HTTP requests, route events, form value changes, WebSockets, and more. While Angular Signals handle synchronous reactive state, RxJS remains essential for async streams, complex event composition, and backpressure management.
Core Concepts
Observable Lifecycle in Angular
Angular manages observable subscriptions through several mechanisms:
// 1. AsyncPipe (auto-subscribes and unsubscribes)
@Component({
template: `
@if (user$ | async; as user) {
<h1>{{ user.name }}</h1>
}
`,
})
export class ProfileComponent {
user$ = inject(UserService).getCurrentUser();
}
// 2. toSignal (subscribes on creation, unsubscribes on destroy)
@Component({
template: `<h1>{{ user()?.name }}</h1>`,
})
export class ProfileComponent {
user = toSignal(inject(UserService).getCurrentUser());
}
// 3. DestroyRef + takeUntilDestroyed
@Component({ /* ... */ })
export class ChatComponent {
constructor() {
inject(WebSocketService).messages$
.pipe(takeUntilDestroyed())
.subscribe(msg => this.handleMessage(msg));
}
}
Key Operators for Angular
import {
switchMap, // Cancel previous, switch to new (search, navigation)
concatMap, // Queue in order (form submissions)
mergeMap, // Run in parallel (batch operations)
exhaustMap, // Ignore new until current completes (login button)
distinctUntilChanged,
debounceTime,
catchError,
retry,
shareReplay,
combineLatest,
withLatestFrom,
} from 'rxjs';
Implementation Patterns
Typeahead Search
The canonical RxJS pattern — debounce, deduplicate, cancel previous requests:
@Component({
template: `
<input [formControl]="searchControl" placeholder="Search..." />
<ul>
@for (result of results$ | async; track result.id) {
<li>{{ result.name }}</li>
}
</ul>
`,
})
export class SearchComponent {
private http = inject(HttpClient);
searchControl = new FormControl('');
results$ = this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
switchMap(query =>
query.length < 2
? of([])
: this.http.get<Result[]>(`/api/search?q=${query}`).pipe(
catchError(() => of([])),
)
),
);
}
Caching with shareReplay
Prevent duplicate HTTP requests for data that multiple components need:
@Injectable({ providedIn: 'root' })
export class ConfigService {
private http = inject(HttpClient);
private config$ = this.http.get<AppConfig>('/api/config').pipe(
shareReplay({ bufferSize: 1, refCount: true }),
);
getConfig(): Observable<AppConfig> {
return this.config$;
}
}
refCount: true ensures the source is re-subscribed if all subscribers leave and a new one arrives after the source completes.
Polling with Timer
@Injectable({ providedIn: 'root' })
export class StatusService {
private http = inject(HttpClient);
pollStatus(intervalMs = 5000): Observable<SystemStatus> {
return timer(0, intervalMs).pipe(
switchMap(() => this.http.get<SystemStatus>('/api/status')),
retry({ delay: 2000 }),
distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)),
);
}
}
Combining Multiple Data Sources
@Component({ /* ... */ })
export class DashboardComponent {
private userService = inject(UserService);
private orderService = inject(OrderService);
private notificationService = inject(NotificationService);
vm$ = combineLatest({
user: this.userService.currentUser$,
orders: this.orderService.recentOrders$,
notifications: this.notificationService.unread$,
}).pipe(
map(({ user, orders, notifications }) => ({
user,
orders,
notifications,
hasUrgent: notifications.some(n => n.priority === 'urgent'),
})),
);
}
Error Handling Strategies
// Retry with exponential backoff
this.http.get<Data>('/api/data').pipe(
retry({
count: 3,
delay: (error, retryCount) => timer(Math.pow(2, retryCount) * 1000),
}),
catchError(err => {
this.errorService.report(err);
return of(FALLBACK_DATA);
}),
);
// Per-item error handling in a stream
this.items$.pipe(
mergeMap(item =>
this.processItem(item).pipe(
catchError(err => {
console.error(`Failed to process item ${item.id}`, err);
return EMPTY; // Skip this item, continue the stream
}),
)
),
);
Imperative Action Streams with Subjects
Use a Subject to bridge imperative events into observable chains:
@Component({ /* ... */ })
export class PaginatedListComponent {
private http = inject(HttpClient);
private pageChange$ = new Subject<number>();
private refresh$ = new Subject<void>();
items$ = merge(
this.pageChange$,
this.refresh$.pipe(withLatestFrom(this.pageChange$), map(([, page]) => page)),
).pipe(
startWith(1),
switchMap(page =>
this.http.get<Page<Item>>(`/api/items?page=${page}`)
),
shareReplay(1),
);
goToPage(page: number): void {
this.pageChange$.next(page);
}
refresh(): void {
this.refresh$.next();
}
}
WebSocket Management
@Injectable({ providedIn: 'root' })
export class WebSocketService {
private socket$ = new ReplaySubject<WebSocket>(1);
connect(url: string): Observable<MessageEvent> {
return new Observable(observer => {
const ws = new WebSocket(url);
ws.onmessage = event => observer.next(event);
ws.onerror = event => observer.error(event);
ws.onclose = () => observer.complete();
this.socket$.next(ws);
return () => ws.close();
}).pipe(
retry({ delay: 3000 }),
share(),
);
}
send(message: unknown): void {
this.socket$.pipe(take(1)).subscribe(ws => {
ws.send(JSON.stringify(message));
});
}
}
Best Practices
-
Use
takeUntilDestroyed()for manual subscriptions. It auto-completes when the component'sDestroyReffires. Always preferasyncpipe ortoSignalwhen possible. -
Choose the right flattening operator.
switchMapfor search/navigation,exhaustMapfor button clicks,concatMapfor ordered writes,mergeMapfor parallel work. -
Use
shareReplay(1)for shared HTTP data. Prevents duplicate requests. Always include{ refCount: true }unless you specifically need the cached value to persist after all subscribers leave. -
Prefer declarative streams over imperative subscriptions. Build observable chains that the template consumes via
asyncpipe rather than subscribing in code and writing to local variables. -
Keep operators in pipelines focused. If a pipeline exceeds 8-10 operators, extract intermediate observables or move logic into a service.
-
Use
toSignal/toObservablefor interop. Angular's rxjs-interop package bridges signals and observables cleanly.
Common Pitfalls
-
Memory leaks from unsubscribed observables. Observables from
HttpClientauto-complete, but router events, form value changes, and custom subjects do not. Always unsubscribe. -
Nested subscriptions. Subscribing inside a subscribe callback is a code smell. Use flattening operators instead:
// Bad this.route.params.subscribe(params => { this.http.get(`/api/${params['id']}`).subscribe(data => { ... }); }); // Good this.route.params.pipe( switchMap(params => this.http.get(`/api/${params['id']}`)) ); -
Using
shareReplaywithoutrefCount. WithoutrefCount: true, the source subscription is never torn down even when all downstream subscribers unsubscribe. -
Swallowing errors silently.
catchError(() => EMPTY)hides failures. Always log or report errors before recovering. -
Triggering multiple HTTP calls with
combineLatest. When combining observables that emit at different times,combineLatestfires on every emission. UseforkJoinif you only need the final values from completable sources.
Install this skill directly: skilldb add angular-skills
Related Skills
Angular Testing
Testing Angular applications with Jest for unit tests and Cypress for end-to-end tests
Angular Dependency Injection
Angular dependency injection system including providers, injection tokens, and hierarchical injectors
Angular Forms
Reactive forms, form validation, dynamic forms, and typed form controls in Angular
Angular Ngrx
NgRx state management with store, effects, selectors, and the component store
Angular Routing
Angular Router configuration including lazy loading, guards, resolvers, and nested routes
Angular Signals
Angular Signals for fine-grained reactivity and efficient change detection