logo头像

From zero to HERO

Java编程中的订阅发布模式

1. 前言

快手前天发布了《看见》一时间好评如潮,盖过了之前的《后浪》。现如今搞内容创作都要开始玩价值观导向了。不过互联网真是一个神奇的东西,我们足不出户就可以看到你想看的东西。不管是时下火热的抖音、快手,还是微信公众号、知乎。你只需要关注订阅你喜欢的领域,你就可以获取你想要的内容,甚至和创作者进行互动。创作者只需要创作的内容发布到对应的平台上,用户只需要在对应的平台上订阅自己喜欢的领域或者作者就可以了。用户和创作者并不认识,但是他们却可以“看见”。从编程范式上来说这就是发布-订阅

2. 发布-订阅

比如我自己也是一个创作者,有时候写点Java,我就打个Java的标签发布出去;有时候也写点其它的,比如Python什么的,打个Python标签发出去。读者只要订阅对应平台的主题(Topic)就能收到文章的推送。

Pub-Sub Pattern

上图就是简单的发布订阅的示意图。发布订阅由以下几种角色组成:

  • Publisher 发布者。消息的创造者,也是发布订阅的源头。
  • Msg 消息体。不但包含消息的基本信息,也包含消息目的地的一些信标(Topic)。
  • Topic 主题。用来建立消息和订阅者的指向关系。
  • Broker 在很多地方也称为 Channel 或者EventBus 。当消息从Publisher发出后,由Broker对消息进行定向转发到主题(Topic),同时维护主题(Topic)和订阅者的关系。Broker将发布者和订阅者进行了彻底的解耦。
  • Subcriber 最终的消费者。消费者从订阅的主题(Topic)中获取消息,获取消息的方式可能是Broker推送或者Subcriber拉取。

订阅发布模式的优点:订阅发布是基于事件驱动的,是具有响应式特点的,可以实现背压,异步。发布者和订阅者双方是完全解耦的。你可以轻松引入新的发布者和新的订阅者而无需修改原有的代码。而且更加适合分布式系统。

当然它也存在着不足:首先订阅者获取消息可能需要通过轮询或者迭代的方式。由于发布者和订阅者是完全解耦的,那么发布者的发布状态无法直接被订阅者获取,订阅者的消费状态也无法直接被发布者获取。其次双方的通信是建立在同一份协议之上,而且需要一个代理来完成,消息规范和相关的规则会给系统增加一些复杂度。

3. 简单实现

这里用Java简单实现上面我们写文章发到各个UGC平台上,然后平台再推给订阅的粉丝。

我们先定义一个事件发布接口:

package cn.felord;

/**
 * 发布接口.
 *
 * @param <E> the type parameter
 * @author felord.cn
 * @since 10 :21
 */
public interface Publisher<E> {
    /**
     *  向某个主题发布事件.
     *
     * @param topic the topic
     * @param event the event
     */
    void publish(String topic, E event);
}

事件的订阅接口:

package cn.felord;

/**
 * 订阅接口.
 *
 * @author a
 * @since 10 :37
 */
public interface Subscriber<E>  {

    /**
     * On event.
     *
     * @param event the event
     */
    void onEvent(E event);

}

然后是Broker代理,需要维护TopicSubcriber的关系,还要进行特定主题的推送广播。这里比较适合单例模式

package cn.felord;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

/**
 * The interface Broker.
 *
 * @author felord.cn
 * @since 10 :28
 */
public class Broker {

    private final Map<String, Set<Subscriber<String>>> subscribers = new HashMap<>();

    private Broker() {
    }

    private static class Instance {
        private static final Broker INSTANCE = new Broker();
    }

    /**
     * 获取单例.
     *
     * @return the instance
     */
    public static Broker getInstance() {
        return Instance.INSTANCE;
    }

    /**
     * 对主题进行订阅.
     *
     * @param topic      the topic
     * @param subscriber the subscriber
     * @return the boolean
     */
    public boolean addSub(String topic, Subscriber<String> subscriber) {

        if (subscribers.get(topic) == null) {
            Set<Subscriber<String>> objects = new HashSet<>();
            objects.add(subscriber);
            subscribers.put(topic, objects);
        }

        return subscribers.get(topic).add(subscriber);
    }

    /**
     * 取消订阅.
     *
     * @param topic      the topic
     * @param subscriber the subscriber
     * @return the boolean
     */
    public boolean removeSub(String topic, Subscriber<String> subscriber) {
        if (subscribers.get(topic) == null) {
            return true;
        }
        return subscribers.get(topic).remove(subscriber);
    }


    /**
     * 迭代推送事件给订阅者.
     *
     * @param topic the topic
     * @param event the event
     */
    public void broadcasting(String topic, String event) {
        subscribers.getOrDefault(topic,new HashSet<>()).forEach(subscriber -> subscriber.onEvent(event));
    }

}

接下来是发布者的实现:

package cn.felord;

/**
 * @author felord.cn
 */
public class FelordPublisher implements Publisher<String> {
    @Override
    public void publish(String topic, String event) {
        System.out.println("码农小胖哥在 " + topic + " 中发布了一个 " + event + "的事件");
        Broker.getInstance().broadcasting(topic, event);
    }
}

订阅者的实现:

package cn.felord;

import java.util.Objects;

/**
 * 因为需要在Set 中 所以这里最好进行equals hash 覆写
 * 
 * @author felord.cn
 */
public class SomeSubscriber implements Subscriber<String> {
    private final String name;

    public SomeSubscriber(String name) {
        this.name = name;
    }

    @Override
    public void onEvent(String event) {
        System.out.println("粉丝 "+name+"接收到了事件 " + event);
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        SomeSubscriber that = (SomeSubscriber) o;
        return Objects.equals(name, that.name);
    }

    @Override
    public int hashCode() {
        return Objects.hash(name);
    }
}

然后我们可以写一个小故事了:

public static void main(String[] args) {
    FelordPublisher felordPublisher = new FelordPublisher();
    // 发布了一篇文章
    felordPublisher.publish("Java", "发布订阅模式相关探讨");
    // 然而没有订阅者
    System.out.println("然而没有订阅者");

    System.out.println("张三订阅了 Java,李四订阅了Python");
    Broker.getInstance().addSub("Java", new SomeSubscriber("张三"));
    Broker.getInstance().addSub("Python", new SomeSubscriber("李四"));

    felordPublisher.publish("Java", "Java 真的很难学吗");
    System.out.println("王五 订阅了Java");
    Broker.getInstance().addSub("Java", new SomeSubscriber("王五"));
    felordPublisher.publish("Java", "新鲜资讯可访问felord.cn");
    // 发布了Python 文章
    felordPublisher.publish("Python", "Python 一天入门");
}

打印结果:

码农小胖哥在 Java 中发布了一个 发布订阅模式相关探讨的事件
然而没有订阅者
张三订阅了 Java,李四订阅了Python
码农小胖哥在 Java 中发布了一个 Java 真的很难学吗的事件
粉丝 张三接收到了事件 Java 真的很难学吗
王五 订阅了Java
码农小胖哥在 Java 中发布了一个 新鲜资讯可访问felord.cn的事件
粉丝 张三接收到了事件 新鲜资讯可访问felord.cn
粉丝 王五接收到了事件 新鲜资讯可访问felord.cn
码农小胖哥在 Python 中发布了一个 Python 一天入门的事件
粉丝 李四接收到了事件 Python 一天入门

在实际使用中,应该注意Broker的并发安全问题。

5. 总结

今天介绍了订阅发布的一些相关知识,我们平常使用的一些消息中间件KafkaRabbitMQ等都是订阅发布的体现,另外最新的响应式编程中也有相关的思想。所以还是有必要看一看的。有很多文章说观察者模式和订阅发布是一个东西;还有的说不是一个东西;众说纷纭,不知道你怎么看,欢迎关注留言。

评论系统未开启,无法评论!