PS: Use the Observer pattern (Queue Manager = Subject, Subscribers = Observers).
eventType (only process configured types, ignore others).void addSubscriber(String subscriberId, List[String] eventTypesToProcess)
subscriberId.eventType is in eventTypesToProcess, other types are ignored.void removeSubscriber(String subscriberId)
subscriberId exists, unsubscribe it.addSubscribervoid sendMessage(String eventType, String message)
eventType) to the global FIFO queue.eventType exists in their eventTypesToProcess list.int countProcessedMessages(String subscriberId)
eventType count as processed, ignored types do not increment this count.sendMessage("ORDER", "pre-1")
No subscribers yet ⇒ message is appended to the global FIFO queue, nobody processes it (no retroactive delivery).
addSubscriber("S1", ["ORDER","PAYMENT"])
addSubscriber("S2", ["PAYMENT"])
Register two subscribers with overlapping but not identical filters.
sendMessage("ORDER", "o-1")
S1 processes (filter matches). S2 ignores (ORDER not in its filter). Demonstrates filtering.
addSubscriber("S3", ["SHIPMENT"])
Late joiner: S3 will only see messages from now on.
sendMessage("PAYMENT", "p-1")
S1 and S2 both process (both filter PAYMENT), S3 ignores.
sendMessage("SHIPMENT", "s-1")
S3 processes (matches filter, arrived after S3 subscribed). S1/S2 ignore (type not in filters).
removeSubscriber("S2")
Unsubscribing immediately stops further processing for S2.
sendMessage("PAYMENT", "p-2")
Only S1 processes (S2 is unsubscribed, S3 ignores). Demonstrates unsubscribe stops processing.
removeSubscriber("S1")
S1 also unsubscribes.
sendMessage("ORDER", "o-2")
Active subscribers: only S3 (but filter is SHIPMENT). Therefore nobody processes this message. Demonstrates “message with no eligible subscribers”.
addSubscriber("S1", ["ORDER","REFUND"])
Re-subscribe the same subscriberId with an expanded filter. Processed counts must accumulate across sessions.
sendMessage("REFUND", "r-1")
S1 processes (new filter covers REFUND). S3 ignores.
countProcessedMessages("S1") → 4
S1 processed: o-1, p-1, p-2, r-1 ⇒ cumulative across unsubscribe/resubscribe.
countProcessedMessages("S2") → 1
S2 processed only p-1 (was unsubscribed before p-2).
countProcessedMessages("S3") → 1
S3 processed only s-1 (late subscriber, nothing retroactive, filters enforced).