Best way to share mutable state across operations with concatMap?

1 day ago 4
ARTICLE AD BOX

I'm working with an rxjs stream where I need to maintain some shared state across multiple sequential operations:

valueChange$.pipe( concatMap(x => someMethodWhichNeedsToAccessOrModifySharedState(x)) ).subscribe();

The method needs to both read and modify state. So like, the first value might set some state, then the second value needs to read what the first one set.

My first instinct was just using a class property:

class MyService { private sharedState = { value: null as string | null }; setupStream() { return valueChange$.pipe( concatMap(x => this.processValue(x)) ); } private processValue(x: any): Observable<any> { const prev = this.sharedState.value; this.sharedState.value = x; return this.http.post('/api', { current: x, previous: prev }); } }

I know concatMap ensures sequential execution so that part should be fine, but I'm worried about multiple subscriptions interfering with each other since they'd share the same sharedState object. Even if I expect only one subscription, it still feels messy.

Other approaches I considered:

Using the above approach with shareReplay to force single subscription.

Using a closure, which would eliminate the shared state between subscriptions and callers.

const createStream = () => { return defer(() => { const sharedState = { value: null as string | null }; return valueChange$.pipe( concatMap(x => processValue(x,sharedState)) ); }); };

My question is, are these approaches the right way to do this? Or is there a more idiomatic rxjs pattern where the state flows through the pipeline itself instead of living outside it? That would handle the multiple subscription issue more naturally I think.

I've seen scan mentioned for state management but it doesn't seem like this would work with concatMap.

Read Entire Article