Combine Framework: Value Lane vs. Failure Lane Operators
Conceptual Model
┌─────────────────────────────────────────────────────────┐
│ Publisher Stream │
├─────────────────────────────────────────────────────────┤
│ │
│ ═══╦══════════════════════════════════════╦═══ │
│ ║ VALUE LANE (Output) ║ │
│ ║ ──●────●────●────●────●────●──▶ ║ │
│ ╠══════════════════════════════════════╣ │
│ ║ FAILURE LANE (Error) ║ │
│ ║ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ✗─▶ ║ │
│ ═══╩══════════════════════════════════════╩═══ │
│ │
└─────────────────────────────────────────────────────────┘
Operators That ONLY Touch the Value Lane
These transform/filter values and pass errors through unchanged:
| Operator | What It Does on Values |
|---|---|
map { } |
Transforms each value |
filter { } |
Conditionally passes values through |
compactMap { } |
Transforms + unwraps optionals |
flatMap { } |
Maps to a new publisher, flattens |
scan(initial) { } |
Accumulates values (like reduce but emits each step) |
reduce(initial) { } |
Accumulates into a single final value |
removeDuplicates() |
Suppresses consecutive equal values |
replaceEmpty(with:) |
Emits a default if no values arrived |
replaceNil(with:) |
Substitutes nil with a value |
collect() |
Buffers all values into one array |
first() / last() |
Takes first/last value only |
dropFirst(n) |
Skips the first N values |
drop(while:) |
Skips values while predicate is true |
prefix(n) |
Takes only the first N values |
prepend(...) |
Inserts values before the stream |
append(...) |
Adds values after completion |
min() / max() |
Emits the min/max value |
count() |
Emits the count of values |
contains { } |
Emits Bool if match found |
allSatisfy { } |
Emits Bool if all match |
output(at:) |
Emits only the value at index N |
debounce(for:) |
Waits for a pause in values |
throttle(for:) |
Rate-limits values |
delay(for:) |
Delays each value in time |
measureInterval |
Emits time between values |
removeDuplicates(by:) |
Custom equality suppression |
Values: ──●────●────●──▶ map { $0 * 2 } ──●────●────●──▶
Errors: ─ ─ ─ ─ ─ ─ ✗─▶ (pass-through) ─ ─ ─ ─ ─ ─ ✗─▶
Operators That ONLY Touch the Failure Lane
These handle/transform errors and pass values through unchanged:
| Operator | What It Does on Errors |
|---|---|
mapError { } |
Transforms the error type/value |
catch { } |
Replaces the error with a new publisher |
retry(n) |
Re-subscribes on failure (up to N times) |
replaceError(with:) |
Substitutes error with a default value + finishes |
setFailureType(to:) |
Adds a failure type to a Never-failing publisher |
Values: ──●────●────●──▶ mapError { } ──●────●────●──▶
Errors: ─ ─ ─ ─ ─ ─ ✗─▶ (transforms ✗) ─ ─ ─ ─ ─ ─ ✗─▶
Detailed Behavior:
// mapError — changes the error type, values pass through untouched
publisher
.mapError { error -> MyAppError in
.networkFailure(error) // only runs if an error occurs
}
// catch — swaps in a recovery publisher when error occurs
publisher
.catch { error in
Just(fallbackValue) // replaces failed stream with this
}
// retry — values pass through; on error, resubscribes
publisher
.retry(3) // tries up to 3 more times on failure
// replaceError — converts error into one final value
publisher
.replaceError(with: defaultValue) // Error → value + finished
Visual Summary
┌──────────────────────────────────────────────────────────────┐
│ │
│ VALUE-LANE OPERATORS FAILURE-LANE OPERATORS │
│ ════════════════════ ═══════════════════════ │
│ │
│ ┌─────────────────┐ ┌─────────────────────┐ │
│ │ map │ │ mapError │ │
│ │ filter │ │ catch │ │
│ │ compactMap │ │ retry │ │
│ │ flatMap │ │ replaceError │ │
│ │ scan / reduce │ │ setFailureType │ │
│ │ removeDuplicates│ └─────────────────────┘ │
│ │ collect │ │
│ │ first / last │ │
│ │ drop / prefix │ BOTH LANES (side effects) │
│ │ prepend / append│ ═════════════════════════ │
│ │ debounce │ ┌─────────────────────┐ │
│ │ throttle │ │ handleEvents │ │
│ │ delay │ │ print │ │
│ │ replaceEmpty │ │ breakpoint │ │
│ │ replaceNil │ │ receive(on:) │ │
│ │ min / max │ │ subscribe(on:) │ │
│ │ count │ └─────────────────────┘ │
│ └─────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────┘
Key Insight: The try Variants Bridge Lanes
Operators prefixed with try operate on the value lane but can throw into the failure lane:
// tryMap operates on values but can PRODUCE errors
publisher
.tryMap { value -> NewType in
guard isValid(value) else {
throw MyError.invalid // ← jumps to failure lane!
}
return transform(value)
}
Values: ──●────●────●──▶ ──●────●──▶
tryMap { }
Errors: ─ ─ ─ ─ ─ ─ ─ ▶ ─ ─ ─ ─ ─ ✗──▶
(value→error possible)
| try Variant | Base Operator |
|---|---|
tryMap |
map |
tryFilter |
filter |
tryCompactMap |
compactMap |
tryScan |
scan |
tryReduce |
reduce |
tryRemoveDuplicates |
removeDuplicates |
tryAllSatisfy |
allSatisfy |
tryContains |
contains |
tryDrop(while:) |
drop(while:) |
tryPrefix(while:) |
prefix(while:) |
These are value-lane operators with an escape hatch into the failure lane.
Combine Pipeline Learning Notes
Table of Contents
- Failure vs Error
- Operator Details
- Combining merge + removeDuplicates
- Two Lanes: Values vs Failures
- Operator Order Matters
- .sink Overloads
- Sources of Failure
- Completion vs Cancellation
- Synchronous vs Asynchronous Execution
- Real Example: SessionService Pipeline
Failure vs Error
This is a crucial distinction in Combine:
Failure (Combine-specific term)
- The type-level error that a publisher can produce
- Every publisher has a
Failureassociated type:Publisher<Output, Failure> - When a publisher “fails,” it sends a
.failure(error)completion event and the stream is permanently terminated - It’s part of the type system:
Failure == Nevermeans the publisher can never fail
Error (Swift general term)
- Any value conforming to the
Errorprotocol (likeNSError, custom enums, etc.) - It’s what gets wrapped inside a
.failurecompletion
How they relate:
// Publisher completion is an enum:
enum Subscribers.Completion<Failure: Error> {
case finished // Normal end, no problem
case failure(Failure) // Terminated with an error value
}
“Failure” is the event type (the envelope), “error” is the value inside it.
Practical example:
let subject = PassthroughSubject<String, URLError>()
// Output ^ ^ Failure type
subject.send("hello") // value event
subject.send(completion: .finished) // normal completion
subject.send(completion: .failure(URLError(.timedOut))) // failure completion
After either completion (.finished or .failure), the stream is dead — no more values can ever flow.
An enum case named .error is NOT a failure!
enum SessionVerificationResult {
case success
case error // This is just DATA, a label — not a Combine failure!
}
SessionVerificationResult.error is just a value flowing through the value lane. You could rename it to .denied or .banana and it would behave identically. Combine doesn’t care what you name your enum cases.
Operator Details
.filter
Purpose
Only lets values through that satisfy a condition. Values that don’t match are silently dropped.
Type signature
func filter(_ isIncluded: @escaping (Output) -> Bool) -> Publishers.Filter<Self>
- Output type: unchanged
- Failure type: unchanged
What flows through:
| Event type | Behavior |
|---|---|
| Value that passes predicate | Passes downstream |
| Value that fails predicate | Dropped silently |
.finished completion |
Passes downstream |
.failure completion |
Passes downstream |
Key insight
.filter never sees or touches failures. Failures always bypass it. Only values are tested.
[1, 2, 3, 4, 5].publisher
.filter { $0 > 3 }
.sink { print($0) }
// Prints: 4, 5
In context:
.filter { $0 == sessionId }
$0 is each UUID emitted by the upstream publisher. Only the UUID matching the current session passes through. All others are silently ignored — the timeout clock keeps ticking while waiting for a matching UUID.
.timeout
Purpose
Enforces a time limit. If the upstream publisher doesn’t emit a value within the specified interval, the timeout takes action.
Type signature
func timeout<S: Scheduler>(
_ interval: S.SchedulerTimeType.Stride,
scheduler: S,
options: S.SchedulerOptions? = nil,
customError: (() -> Failure)? = nil
) -> Publishers.Timeout<Self, S>
Two modes:
customError |
When timeout fires |
|---|---|
| Provided (closure) | Sends .failure(yourError) — stream dies with error |
| nil (default) | Sends .finished — stream ends normally, silently |
Timer behavior:
- Timer starts when subscribed
- Resets every time a value passes through
- Fires only when NO value arrives for the full interval
What flows through:
| Event type | Behavior |
|---|---|
| Value (before timeout) | Passes downstream, timer resets |
.finished (before timeout) |
Passes downstream, timer cancelled |
.failure (before timeout) |
Passes downstream, timer cancelled |
| Timer fires (with customError) | Sends .failure(customError()) downstream |
| Timer fires (without customError) | Sends .finished downstream |
Example:
.timeout(.milliseconds(100), scheduler: DispatchQueue.main,
customError: { NSError(domain: "CustomTimeoutError", code: -1) })
“If no matching UUID arrives within 100ms, terminate the stream with a .failure containing that NSError.”
.map
Purpose
Transforms each value into a different value. Does not affect failures/completions.
Type signature
func map<T>(_ transform: @escaping (Output) -> T) -> Publishers.Map<Self, T>
- Output type: changes to
T - Failure type: unchanged
What flows through:
| Event type | Behavior |
|---|---|
| Value | Transformed by closure, new value passes downstream |
.finished |
Passes through untouched |
.failure |
Passes through untouched — .map closure is NOT called |
Critical insight
Failures bypass .map entirely. The transform closure never executes for failures.
subject
.map { value in
print("Transform called") // NEVER prints if a failure arrives
return value * 2
}
.handleEvents
Purpose
Observe (but never modify) events at any stage of the pipeline. Used for side effects.
Type signature
func handleEvents(
receiveSubscription: ((Subscription) -> Void)? = nil,
receiveOutput: ((Output) -> Void)? = nil,
receiveCompletion: ((Subscribers.Completion<Failure>) -> Void)? = nil,
receiveCancel: (() -> Void)? = nil,
receiveRequest: ((Subscribers.Demand) -> Void)? = nil
) -> Publishers.HandleEvents<Self>
- Output type: unchanged
- Failure type: unchanged
All hooks:
| Hook | When it fires |
|---|---|
receiveSubscription |
When a subscriber subscribes |
receiveOutput |
Each time a value passes through |
receiveCompletion |
When the publisher completes (.finished or .failure) |
receiveCancel |
When the subscription is cancelled |
receiveRequest |
When demand is requested by downstream |
Critical insight about receiveOutput vs failures
receiveOutput is only called for values, NOT for failures. If a .failure arrives:
receiveOutput-> NOT calledreceiveCompletion-> IS called (with.failure(error))
.catch
Purpose
Intercept a failure and replace the failed publisher with a new publisher. This is error recovery.
Type signature
func catch<P: Publisher>(_ handler: @escaping (Failure) -> P) -> Publishers.Catch<Self, P>
where P.Output == Output
- Output type: unchanged (must match)
- Failure type: changes to the replacement publisher’s Failure type
What flows through:
| Event type | Behavior |
|---|---|
| Value | Passes through, .catch does nothing |
.finished |
Passes through, .catch does nothing |
.failure(error) |
Catches it! Original upstream is dead. Handler returns a new publisher |
Key insight
.catch only activates on failure. Normal completion and values pass through invisible.
What happens when it catches:
- Upstream publisher is terminated (dead, can never emit again)
- Your closure receives the error value
- You return a new publisher — becomes the new source
- Downstream subscribes to the new publisher
Type change with Just:
Just has Failure == Never, so after .catch { Just(...) }, the downstream pipeline has Failure == Never — it can never fail.
Two Lanes: Values vs Failures
A Combine pipeline has two separate lanes:
+-----------------------------------------------------+
| VALUE LANE (carries data) |
| |
| UUID -> filter -> timeout -> .map -> .handleEvents --> sink receives value
| | |
| SessionVerificationResult |
| .error (just data!) |
| |
+-----------------------------------------------------+
| FAILURE LANE (carries termination signal) |
| |
| timeout fires -> .failure(NSError) ------------------> .catch intercepts
| (bypasses .map!) |
| (bypasses .handleEvents!) |
| |
+-----------------------------------------------------+
.mapoperates on the value lane only.catchoperates on the failure lane only- They never interact with each other’s lane
Operator Order Matters
The order of operators fundamentally changes behavior. Each operator only sees what the previous one outputs.
Example 1: Swap .filter and .timeout
Current: .filter -> .timeout
.filter { $0 == sessionId } // Drop non-matching UUIDs
.timeout(.milliseconds(100), ...) // Timer resets ONLY when matching UUID arrives
Timer fires if no matching UUID arrives in 100ms.
Swapped: .timeout -> .filter
.timeout(.milliseconds(100), ...) // Timer resets on ANY UUID
.filter { $0 == sessionId } // Then check if it matches
Any UUID resets the timer — even wrong ones. A wrong UUID resets the clock but gets filtered out. You’d wait forever without a value or timeout!
Example 2: Swap .map and .catch
Current: .map -> .catch
.map { _ in SessionVerificationResult.error } // Value -> .error
.catch { _ in Just(.success) } // Failure -> .success
Two distinct paths, two distinct outcomes.
Swapped: .catch -> .map
.catch { _ in Just(.success) } // Failure -> .success VALUE
.map { _ in SessionVerificationResult.error } // ALL values -> .error
Failures get caught -> become a .success value -> then .map converts ALL values to .error. Login ALWAYS fails!
Example 3: Swap .map and .timeout
Current: .timeout -> .map
- Value arrives:
.mapconverts to.error - Timeout fires:
.failurebypasses.map
Swapped: .map -> .timeout
- UUID arrives ->
.mapconverts to.errorvalue ->.timeoutsees a value, resets timer - Same final outcomes but types change (
.timeoutnow operates onSessionVerificationResultinstead ofUUID)
Mental Model
Think of it as a literal assembly line:
Raw Material -> Station 1 -> Station 2 -> Station 3 -> Final Product
Each station only receives what the previous station outputs. Rearranging stations = completely different results.
.sink Overloads
Version 1: Two closures (for publishers that CAN fail)
.sink(
receiveCompletion: { completion in ... }, // Called on .finished or .failure
receiveValue: { value in ... } // Called on each value
)
Version 2: One closure (ONLY for Failure == Never)
.sink { value in ... } // Called ONLY on values — completions are silently ignored
When a publisher has Failure == Never (like after .catch { Just(...) }), the single-closure version is used. Completions are silently ignored — there’s no closure to handle them.
If completion arrives without a value first:
With single-closure .sink:
- The callback never fires
- The subscription silently dies
- No result is ever reported
Sources of Failure
Failures don’t only come from .send(completion:). They can originate from:
| Source | Example |
|---|---|
| Explicitly sent by a Subject | subject.send(completion: .failure(myError)) |
| Generated by an operator | .timeout(...) fires -> creates a .failure internally |
| Returned by a closure | tryMap { throw MyError() } -> failure |
| From an upstream system | URLSession.dataTaskPublisher -> network error |
From Future |
Future { promise in promise(.failure(error)) } |
A failure is any .failure(Error) completion event flowing through the pipeline — regardless of who created it.
Completion vs Cancellation
| Completion | Cancellation | |
|---|---|---|
| Who initiates | Upstream (publisher/operator) | Downstream (subscriber) |
| Direction | Flows downstream | Flows upstream |
| Callbacks fired | receiveCompletion is called |
receiveCancel (in handleEvents) |
| End result | Same: subscription is dead | Same: subscription is dead |
| How it happens | subject.send(completion:) or operator |
cancellable.cancel() or AnyCancellable dealloc |
Both terminate the subscription, but from opposite ends:
Cancellation (upstream <-)
+---------------------------+
| |
Publisher ---- Operators ---- Subscriber
| |
+---------------------------+
Completion (downstream ->)
After any completion (.finished or .failure):
receiveCompletionfires one final time- No more values will ever flow
- The subscription is effectively released
- Calling
.cancel()afterward is redundant but harmless
Synchronous vs Asynchronous Execution
Button(action: {
// 1. verify() is called — sets up subscription, returns IMMEDIATELY
sessionService.verify { ... }
// 2. This runs RIGHT AFTER verify() returns (nanoseconds later)
auth.passwordAttempts += 1
// 3. Button action closure ends. Control returns to the run loop.
// ... time passes ...
// 4. MUCH LATER: completion callback fires (after UUID arrives or 100ms timeout)
})
The @escaping keyword is the hint:
func verify(completion: @escaping (SessionVerificationResult) -> Void)
// ^^^^^^^^
// "escaping" means: this closure will be called AFTER the function returns
verify sets up a Combine subscription and returns immediately. The completion callback fires asynchronously — when the pipeline produces a value.
Real Example: SessionService Pipeline
self
.filter { $0 == sessionId }
.timeout(
.milliseconds(100),
scheduler: DispatchQueue.main,
customError: { NSError(domain: "CustomTimeoutError", code: -1) }
)
.map { _ in SessionVerificationResult.error }
.handleEvents(receiveOutput: { result in
if case .error = result {
verificationSubject.send(completion: .finished)
}
})
.catch { error in
Just(SessionVerificationResult.success)
}
.eraseToAnyPublisher()
Scenario A: UUID arrives within 100ms
verificationSubject emits UUID
-> .filter: UUID matches sessionId? YES -> passes through
-> .timeout: value arrived, no timeout fires
-> .map: UUID becomes .error (just a value label!)
-> .handleEvents: sees .error value, shuts down verificationSubject
-> .catch: no failure to catch, does nothing
-> downstream receives: .error
-> Login FAILS
Scenario B: No UUID arrives within 100ms
verificationSubject emits nothing
-> .filter: nothing to filter
-> .timeout: 100ms passes, fires NSError as failure
-> .map: SKIPPED (failures bypass .map)
-> .handleEvents: SKIPPED (failures bypass receiveOutput)
-> .catch: catches NSError, returns Just(.success)
-> downstream receives: .success
-> Login SUCCEEDS
The Inverted Logic (by design):
UUID arrives within 100ms -> .map -> .error -> Login FAILS
No UUID within 100ms -> timeout -> .catch -> .success -> Login SUCCEEDS
The Bug (in context):
When Login is clicked:
verify()sets up subscription (100ms clock starts)auth.passwordAttempts += 1mutates@Publishedproperty- SwiftUI re-renders
SecurityBannerView .execute { initializeVerificationProcess() }fires- UUID is sent to
verificationSubjectwithin ~1ms - UUID arrives before 100ms timeout ->
.map->.error-> Login always fails
The Fix:
Remove @Published from passwordAttempts -> no re-render -> no UUID sent -> timeout fires -> .success -> Login works.
Operator Details (continued)
.merge
Purpose
Combines multiple publishers of the same Output and Failure type into a single stream, emitting values from all sources in the order they arrive (interleaved by time).
Type signature
func merge(with other: P) -> Publishers.Merge<Self, P>
where P: Publisher, Self.Failure == P.Failure, Self.Output == P.Output
Variants exist up to Merge8, plus MergeMany for dynamic collections of publishers.
How it works:
Publisher A: --1------3------5-->
Publisher B: ----2------4-------->
Merged: --1-2----3-4----5-->
- Subscribes to all upstream publishers simultaneously
- Forwards every value from any upstream as soon as it arrives — no buffering, no reordering
- Completes only when all upstream publishers have completed
- Fails immediately if any upstream emits a failure (cancels others)
What flows through:
| Event type | Behavior |
|---|---|
| Value from any upstream | Forwarded downstream immediately |
.finished from one upstream |
That upstream is done; merge waits for others |
.finished from ALL upstreams |
.finished sent downstream |
.failure from any upstream |
.failure sent downstream immediately, others cancelled |
Key characteristics:
| Property | Behavior |
|---|---|
| Demand management | Requests from downstream are forwarded to all upstreams |
| Backpressure | Each upstream independently respects demand |
| Ordering | Temporal — whoever emits first is forwarded first |
| Completion | Waits for all upstreams to complete |
| Error | Fails fast — first error terminates the merged stream |
Example:
let localUpdates = database.publisher(for: User.self) // Publisher<User, Never>
let remoteUpdates = api.userStream() // Publisher<User, Never>
localUpdates
.merge(with: remoteUpdates)
.sink { user in
print("User updated: \(user)")
}
// Receives values from BOTH sources as they arrive
Key insight: merge vs combineLatest vs zip
| Operator | Output | When it emits |
|---|---|---|
merge |
Same type as inputs (single value) | Every time ANY upstream emits |
combineLatest |
Tuple of latest from each | Every time ANY upstream emits (after all have emitted at least once) |
zip |
Tuple pairing values 1-to-1 | When ALL upstreams have a new value to pair |
// merge: "give me everything from both"
A: --1----3--> merged: --1-2-3-4-->
B: ---2----4->
// combineLatest: "give me latest from each whenever either changes"
A: --1----3--> combined: --(1,2)-(3,2)-(3,4)-->
B: ---2----4->
// zip: "pair them up in order"
A: --1----3--> zipped: --(1,2)----(3,4)-->
B: ---2----4->
.removeDuplicates
Purpose
Suppresses consecutive duplicate values, only forwarding a value if it differs from the immediately preceding one.
Type signatures
// For Equatable outputs:
func removeDuplicates() -> Publishers.RemoveDuplicates<Self>
where Self.Output: Equatable
// With custom predicate:
func removeDuplicates(by predicate: @escaping (Output, Output) -> Bool)
-> Publishers.RemoveDuplicates<Self>
- Output type: unchanged
- Failure type: unchanged
How it works:
Input: --1--1--2--2--1--1--3-->
Output: --1-----2-----1-----3-->
- Stores the last emitted value (exactly one element of state)
- For each new value from upstream, compares it to the stored value using
==(or the custom predicate) - If equal → the value is dropped (not forwarded downstream)
- If different → the value is forwarded and becomes the new stored value
- The first value is always forwarded (there’s nothing to compare against)
What flows through:
| Event type | Behavior |
|---|---|
| Value (different from previous) | Passes downstream, becomes new “previous” |
| Value (same as previous) | Dropped silently |
| First value ever | Always passes (nothing to compare against) |
.finished completion |
Passes downstream |
.failure completion |
Passes downstream |
Critical insight: consecutive only, NOT global
[1, 1, 2, 2, 3, 3, 1, 1].publisher
.removeDuplicates()
.sink { print($0) }
// Output: 1, 2, 3, 1
// ^ emitted again! Only consecutive duplicates are suppressed
This is NOT a “unique” filter — it only compares against the immediately preceding value.
Custom predicate example:
struct UserLocation {
let lat: Double
let lon: Double
}
locationPublisher
.removeDuplicates { prev, current in
// Return true = "these are duplicates" = SUPPRESS
// Return false = "these are different" = PASS THROUGH
distance(prev, current) < 10.0
}
.sink { location in
updateMap(location) // Only fires when user moves > 10 meters
}
Predicate semantics (easy to confuse!):
| Predicate returns | Meaning | Action |
|---|---|---|
true |
“These are equal/duplicates” | Suppress (drop the new value) |
false |
“These are different” | Pass through |
This is the opposite of .filter! In .filter, true = keep. In .removeDuplicates(by:), true = drop.
Common use with @Published properties:
class ViewModel: ObservableObject {
@Published var searchText = ""
}
viewModel.$searchText
.removeDuplicates() // Don't re-search if text hasn't actually changed
.debounce(for: .milliseconds(300), scheduler: RunLoop.main)
.sink { text in
performSearch(text)
}
For global uniqueness (NOT what removeDuplicates does):
If you need to filter out ALL previously seen values (not just consecutive):
.scan((Set<Output>(), nil as Output?)) { state, value in
var (seen, _) = state
if seen.contains(value) { return (seen, nil) }
seen.insert(value)
return (seen, value)
}
.compactMap { $0.1 }
Combining merge + removeDuplicates
A common real-world pattern — merge multiple sources and deduplicate:
let local = database.publisher(for: User.self) // Publisher<User, Never>
let remote = api.fetchUser() // Publisher<User, Never>
local
.merge(with: remote)
.removeDuplicates() // suppress if both sources emit the same User
.assign(to: \.user, on: viewModel)
Execution flow:
local: --UserV1------------------->
remote: ----------UserV1---UserV2-->
merged: --UserV1--UserV1---UserV2-->
removeDuplicates: --UserV1----------UserV2-->
^ suppressed (consecutive duplicate)
This works because after merging, if both sources happen to emit the same value in sequence, removeDuplicates catches the redundancy.