Skip to main content
Technology & EngineeringAngular313 lines

Angular Rxjs Patterns

RxJS reactive patterns for data fetching, state management, and event handling in Angular

Quick Summary15 lines
You are an expert in RxJS patterns for building reactive Angular applications with efficient data flows and clean observable composition.

## Key Points

1. **Use `takeUntilDestroyed()` for manual subscriptions.** It auto-completes when the component's `DestroyRef` fires. Always prefer `async` pipe or `toSignal` when possible.
2. **Choose the right flattening operator.** `switchMap` for search/navigation, `exhaustMap` for button clicks, `concatMap` for ordered writes, `mergeMap` for parallel work.
4. **Prefer declarative streams over imperative subscriptions.** Build observable chains that the template consumes via `async` pipe rather than subscribing in code and writing to local variables.
5. **Keep operators in pipelines focused.** If a pipeline exceeds 8-10 operators, extract intermediate observables or move logic into a service.
6. **Use `toSignal` / `toObservable` for interop.** Angular's rxjs-interop package bridges signals and observables cleanly.
- **Memory leaks from unsubscribed observables.** Observables from `HttpClient` auto-complete, but router events, form value changes, and custom subjects do not. Always unsubscribe.
- **Nested subscriptions.** Subscribing inside a subscribe callback is a code smell. Use flattening operators instead:
- **Using `shareReplay` without `refCount`.** Without `refCount: true`, the source subscription is never torn down even when all downstream subscribers unsubscribe.
- **Swallowing errors silently.** `catchError(() => EMPTY)` hides failures. Always log or report errors before recovering.
skilldb get angular-skills/Angular Rxjs PatternsFull skill: 313 lines
Paste into your CLAUDE.md or agent config

RxJS Patterns — Angular

You are an expert in RxJS patterns for building reactive Angular applications with efficient data flows and clean observable composition.

Core Philosophy

RxJS is Angular's tool for managing asynchronous complexity. While signals handle synchronous reactive state elegantly, RxJS excels at async streams: HTTP responses, WebSocket messages, form value changes over time, timer-based polling, and complex event composition with backpressure control. The key insight is choosing the right tool for the job — signals for synchronous derived state, RxJS for async streams and event coordination.

The most important RxJS skill in Angular is choosing the right flattening operator. switchMap cancels the previous inner observable, making it perfect for search typeahead where only the latest query matters. exhaustMap ignores new emissions while the current one is in progress, preventing duplicate login requests from button double-clicks. concatMap queues emissions in order for sequential writes. mergeMap runs in parallel for batch processing. Using the wrong operator is not just a performance issue — it causes real bugs like duplicate form submissions or lost search results.

Declarative observable chains are the RxJS ideal. Instead of subscribing to an observable, storing the result in a local variable, and binding that variable in the template, you define the observable pipeline declaratively and let the async pipe or toSignal handle subscription lifecycle. This approach eliminates an entire class of bugs (forgotten unsubscribe, stale state, timing errors) and makes the data flow visible in the code.

Anti-Patterns

  • Nested Subscriptions — subscribing inside a subscribe callback, creating a deeply nested, hard-to-read pattern that also leaks subscriptions. Always use flattening operators (switchMap, mergeMap, etc.) to compose observable chains.

  • shareReplay Without refCount — using shareReplay(1) without { refCount: true }, which keeps the source subscription alive permanently even after all subscribers leave. This causes memory leaks in long-running applications.

  • Swallowing Errors Silently — writing catchError(() => EMPTY) without logging or reporting the error. The stream recovers, but the failure is invisible and undiagnosable. Always log or report before recovering.

  • combineLatest for One-Time Parallel Requests — using combineLatest to combine HTTP requests that each emit once, when forkJoin is the correct operator for waiting on multiple completable sources. combineLatest will not emit until every source has emitted at least once, which is fine, but it also re-emits on every subsequent emission, which is unnecessary for one-shot requests.

  • Manual Subscription Without Cleanup — calling .subscribe() in a component without takeUntilDestroyed(), the async pipe, or toSignal. This creates memory leaks because router events, form value changes, and custom subjects do not auto-complete.

Overview

RxJS (Reactive Extensions for JavaScript) is deeply integrated into Angular for handling asynchronous operations — HTTP requests, route events, form value changes, WebSockets, and more. While Angular Signals handle synchronous reactive state, RxJS remains essential for async streams, complex event composition, and backpressure management.

Core Concepts

Observable Lifecycle in Angular

Angular manages observable subscriptions through several mechanisms:

// 1. AsyncPipe (auto-subscribes and unsubscribes)
@Component({
  template: `
    @if (user$ | async; as user) {
      <h1>{{ user.name }}</h1>
    }
  `,
})
export class ProfileComponent {
  user$ = inject(UserService).getCurrentUser();
}

// 2. toSignal (subscribes on creation, unsubscribes on destroy)
@Component({
  template: `<h1>{{ user()?.name }}</h1>`,
})
export class ProfileComponent {
  user = toSignal(inject(UserService).getCurrentUser());
}

// 3. DestroyRef + takeUntilDestroyed
@Component({ /* ... */ })
export class ChatComponent {
  constructor() {
    inject(WebSocketService).messages$
      .pipe(takeUntilDestroyed())
      .subscribe(msg => this.handleMessage(msg));
  }
}

Key Operators for Angular

import {
  switchMap,    // Cancel previous, switch to new (search, navigation)
  concatMap,    // Queue in order (form submissions)
  mergeMap,     // Run in parallel (batch operations)
  exhaustMap,   // Ignore new until current completes (login button)
  distinctUntilChanged,
  debounceTime,
  catchError,
  retry,
  shareReplay,
  combineLatest,
  withLatestFrom,
} from 'rxjs';

Implementation Patterns

Typeahead Search

The canonical RxJS pattern — debounce, deduplicate, cancel previous requests:

@Component({
  template: `
    <input [formControl]="searchControl" placeholder="Search..." />
    <ul>
      @for (result of results$ | async; track result.id) {
        <li>{{ result.name }}</li>
      }
    </ul>
  `,
})
export class SearchComponent {
  private http = inject(HttpClient);
  searchControl = new FormControl('');

  results$ = this.searchControl.valueChanges.pipe(
    debounceTime(300),
    distinctUntilChanged(),
    switchMap(query =>
      query.length < 2
        ? of([])
        : this.http.get<Result[]>(`/api/search?q=${query}`).pipe(
            catchError(() => of([])),
          )
    ),
  );
}

Caching with shareReplay

Prevent duplicate HTTP requests for data that multiple components need:

@Injectable({ providedIn: 'root' })
export class ConfigService {
  private http = inject(HttpClient);

  private config$ = this.http.get<AppConfig>('/api/config').pipe(
    shareReplay({ bufferSize: 1, refCount: true }),
  );

  getConfig(): Observable<AppConfig> {
    return this.config$;
  }
}

refCount: true ensures the source is re-subscribed if all subscribers leave and a new one arrives after the source completes.

Polling with Timer

@Injectable({ providedIn: 'root' })
export class StatusService {
  private http = inject(HttpClient);

  pollStatus(intervalMs = 5000): Observable<SystemStatus> {
    return timer(0, intervalMs).pipe(
      switchMap(() => this.http.get<SystemStatus>('/api/status')),
      retry({ delay: 2000 }),
      distinctUntilChanged((a, b) => JSON.stringify(a) === JSON.stringify(b)),
    );
  }
}

Combining Multiple Data Sources

@Component({ /* ... */ })
export class DashboardComponent {
  private userService = inject(UserService);
  private orderService = inject(OrderService);
  private notificationService = inject(NotificationService);

  vm$ = combineLatest({
    user: this.userService.currentUser$,
    orders: this.orderService.recentOrders$,
    notifications: this.notificationService.unread$,
  }).pipe(
    map(({ user, orders, notifications }) => ({
      user,
      orders,
      notifications,
      hasUrgent: notifications.some(n => n.priority === 'urgent'),
    })),
  );
}

Error Handling Strategies

// Retry with exponential backoff
this.http.get<Data>('/api/data').pipe(
  retry({
    count: 3,
    delay: (error, retryCount) => timer(Math.pow(2, retryCount) * 1000),
  }),
  catchError(err => {
    this.errorService.report(err);
    return of(FALLBACK_DATA);
  }),
);

// Per-item error handling in a stream
this.items$.pipe(
  mergeMap(item =>
    this.processItem(item).pipe(
      catchError(err => {
        console.error(`Failed to process item ${item.id}`, err);
        return EMPTY; // Skip this item, continue the stream
      }),
    )
  ),
);

Imperative Action Streams with Subjects

Use a Subject to bridge imperative events into observable chains:

@Component({ /* ... */ })
export class PaginatedListComponent {
  private http = inject(HttpClient);

  private pageChange$ = new Subject<number>();
  private refresh$ = new Subject<void>();

  items$ = merge(
    this.pageChange$,
    this.refresh$.pipe(withLatestFrom(this.pageChange$), map(([, page]) => page)),
  ).pipe(
    startWith(1),
    switchMap(page =>
      this.http.get<Page<Item>>(`/api/items?page=${page}`)
    ),
    shareReplay(1),
  );

  goToPage(page: number): void {
    this.pageChange$.next(page);
  }

  refresh(): void {
    this.refresh$.next();
  }
}

WebSocket Management

@Injectable({ providedIn: 'root' })
export class WebSocketService {
  private socket$ = new ReplaySubject<WebSocket>(1);

  connect(url: string): Observable<MessageEvent> {
    return new Observable(observer => {
      const ws = new WebSocket(url);
      ws.onmessage = event => observer.next(event);
      ws.onerror = event => observer.error(event);
      ws.onclose = () => observer.complete();
      this.socket$.next(ws);
      return () => ws.close();
    }).pipe(
      retry({ delay: 3000 }),
      share(),
    );
  }

  send(message: unknown): void {
    this.socket$.pipe(take(1)).subscribe(ws => {
      ws.send(JSON.stringify(message));
    });
  }
}

Best Practices

  1. Use takeUntilDestroyed() for manual subscriptions. It auto-completes when the component's DestroyRef fires. Always prefer async pipe or toSignal when possible.

  2. Choose the right flattening operator. switchMap for search/navigation, exhaustMap for button clicks, concatMap for ordered writes, mergeMap for parallel work.

  3. Use shareReplay(1) for shared HTTP data. Prevents duplicate requests. Always include { refCount: true } unless you specifically need the cached value to persist after all subscribers leave.

  4. Prefer declarative streams over imperative subscriptions. Build observable chains that the template consumes via async pipe rather than subscribing in code and writing to local variables.

  5. Keep operators in pipelines focused. If a pipeline exceeds 8-10 operators, extract intermediate observables or move logic into a service.

  6. Use toSignal / toObservable for interop. Angular's rxjs-interop package bridges signals and observables cleanly.

Common Pitfalls

  • Memory leaks from unsubscribed observables. Observables from HttpClient auto-complete, but router events, form value changes, and custom subjects do not. Always unsubscribe.

  • Nested subscriptions. Subscribing inside a subscribe callback is a code smell. Use flattening operators instead:

    // Bad
    this.route.params.subscribe(params => {
      this.http.get(`/api/${params['id']}`).subscribe(data => { ... });
    });
    
    // Good
    this.route.params.pipe(
      switchMap(params => this.http.get(`/api/${params['id']}`))
    );
    
  • Using shareReplay without refCount. Without refCount: true, the source subscription is never torn down even when all downstream subscribers unsubscribe.

  • Swallowing errors silently. catchError(() => EMPTY) hides failures. Always log or report errors before recovering.

  • Triggering multiple HTTP calls with combineLatest. When combining observables that emit at different times, combineLatest fires on every emission. Use forkJoin if you only need the final values from completable sources.

Install this skill directly: skilldb add angular-skills

Get CLI access →