_getSubscribers(topic) {
const subscribers = this._topics.get(topic);
- if (typeof subscribers === 'undefined') {
+ if (subscribers === undefined) {
throw new Error(`Topic:"${topic}" doesn't exist.`);
}
return subscribers;
}
- _getSubscriber(topic, address) {
+ _getInboxes(topic, address) {
const subscribers = this._getSubscribers(topic);
- const subscriber = subscribers.get(address);
- if (typeof subscriber === 'undefined') {
- throw new Error(`Subscriber on topic:"${topic}" at address:"${address}" doesn't exist.`);
+ const inboxes = subscribers.get(address);
+ if (inboxes === undefined) {
+ throw new Error(`Inbox on topic:"${topic}" at address:"${address}" doesn't exist.`);
}
- return subscriber;
+ return inboxes;
}
hasTopic(topic) {
- return typeof this._topics.get(topic) !== 'undefined';
+ return this._topics.get(topic) !== undefined;
}
hasSubscriber(topic, address) {
const subscribers = this._getSubscribers(topic);
- return typeof subscribers.get(address) !== 'undefined';
+ return subscribers.get(address) !== undefined;
}
hasTopicAndSubscriber(topic, address) {
throw new TypeError('Inbox must be a function.');
}
- if (typeof this._topics.get(topic) === 'undefined') {
+ if (this._topics.has(topic) === false) {
this._topics.set(topic, new Map());
}
const subscribers = this._topics.get(topic);
- if (subscribers.get(address)) {
- throw new Error(`Subscription on topic:"${topic}" at address:"${address}" already exist.`);
- }
- subscribers.set(address, inbox);
+
+ const inboxes = subscribers.get(address) ?? new Set();
+ inboxes.add(inbox);
+ subscribers.set(address, inboxes);
+
+ return () => this.unsubscribe(topic, address, inbox);
}
- unsubscribe(topic, address) {
+ unsubscribe(topic, address, inbox) {
const subscribers = this._getSubscribers(topic);
- if (subscribers.has(address)) {
- subscribers.delete(address);
- } else throw new Error(`Unable to unsubscribe. Subscriber on topic:"${topic}" at address:"${address}" doesn't exist`);
- if (subscribers.size === 0) {
- this._topics.delete(topic);
- }
+ if (!subscribers) throw new Error(`Unable to unsubscribe. Topic: "${topic}" doesn't exist.`);
+
+ const inboxes = subscribers.get(address);
+ if (!inboxes) throw new Error(`Unable to unsubscribe. Subscriber on topic:"${topic}" at address:"${address}" doesn't exist`);
+
+ if (!inboxes.delete(inbox)) throw new Error('Unable to unsubscribe. Inbox doesn\'t exist');
+
+ if (inboxes.size === 0) subscribers.delete(address);
+ if (subscribers.size === 0) this._topics.delete(topic);
}
/**
* @param {*} data - Data to deliver to subscriber
*/
post(topic, address, data) {
- const sendPost = (subscriber, addr) => {
- if (typeof subscriber === 'undefined') {
+ const sendPost = (inboxes, addr) => {
+ if (inboxes === undefined) {
throw new Error(`Unable to post on topic:"${topic}" at address:"${addr}". Subscriber doesn't exist.`);
}
- subscriber(data);
+ inboxes.forEach((inbox) => inbox(data));
};
- if (Array.isArray(address)) {
- const subscribers = this._getSubscribers(topic);
- address.forEach((addr) => {
- sendPost(subscribers.get(addr), addr);
- });
+
+ if (typeof address === 'string') {
+ sendPost(this._getInboxes(topic, address), address);
return;
}
- sendPost(this._getSubscriber(topic, address), address);
+ const subscribers = this._getSubscribers(topic);
+ address.forEach((addr) => {
+ sendPost(subscribers.get(addr), addr);
+ });
}
}