博客 基于 Zookeeper 实现服务注册和服务发现

基于 Zookeeper 实现服务注册和服务发现

   数栈君   发表于 2023-09-15 10:31  254  0

简介

ZooKeeper官网

在分布式系统中,服务注册与发现是一项重要的技术,本文提供Java代码基于ZooKeeper来实现的服务注册与发现的功能.

服务注册

package com.fanqiechaodan.service;

import org.apache.zookeeper.*;

import java.io.IOException;

/**
 * @author fanqiechaodan
 * @Classname ServiceRegistry
 * @Description 服务注册
 */
public class ServiceRegistry {

    private static final String SERVICE_REGISTRY_ROOT = "/services";

    private static final int SESSION_TIMEOUT = 5000;

    private ZooKeeper zooKeeper;

    public ServiceRegistry(String zooKeeperAddress) throws IOException {
            this.zooKeeper = new ZooKeeper(zooKeeperAddress, SESSION_TIMEOUT, null);
    }

    /**
     * 注册服务
     *
     * @param serviceName
     * @param serviceAddress
     * @throws KeeperException
     * @throws InterruptedException
      */
    public void registryService(String serviceName, String serviceAddress) throws KeeperException, InterruptedException {
        String servicePath = SERVICE_REGISTRY_ROOT + "/" + serviceName;

    // 如果根节点不存在就创建
    if (zooKeeper.exists(SERVICE_REGISTRY_ROOT, false) == null) {
        zooKeeper.create(SERVICE_REGISTRY_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
    }

    String serviceNodePath = zooKeeper.create(servicePath, serviceAddress.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println("注册成功;serviceNodePath:" + serviceNodePath);
    }

    public void close() throws InterruptedException {
                zooKeeper.close();
    }
}

服务消费

package com.fanqiechaodan.service;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import java.io.IOException;
import java.util.List;

/**
 * @author fanqiechaodan
 * @Classname ServiceConsumer
 * @Description
 */
public class ServiceConsumer {

    private static final String SERVICE_REGISTRY_ROOT = "/services";

    private static final int SESSION_TIMEOUT = 5000;

    private ZooKeeper zooKeeper;

    public ServiceConsumer(String zooKeeperAddress) throws IOException {
        this.zooKeeper = new ZooKeeper(zooKeeperAddress, SESSION_TIMEOUT, null);
    }

    /**
     * 获取服务地址
     *
     * @param serviceName
     * @return
     * @throws KeeperException
     * @throws InterruptedException
     */
    public String consumeService(String serviceName) throws KeeperException, InterruptedException {
        String servicePath = SERVICE_REGISTRY_ROOT;
        List<String> childrenList = zooKeeper.getChildren(servicePath, watchedEvent -> {
            if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                System.out.println("serviceNodePath:" + watchedEvent.getPath());
                // synchronized为了保证线程安全,确保notifyAll()调用之前不会有人修改ServiceConsumer
                // 使用notifyAll()唤醒左右等待中的线程,让他们同时重新获取服务列表,避免单个线程多次获取同一服务节点的情况
                synchronized (this) {
                    notifyAll();
                }
            }
        });
        String serviceNode = childrenList.get((int) (Math.random() * childrenList.size()));
        String res = new String(zooKeeper.getData(servicePath + "/" + serviceNode, false, null));
        System.out.println("serviceNode:" + serviceNode);
        System.out.println("serviceAddress:" + res);
        return res;
    }

    public void close() throws InterruptedException {
            zooKeeper.close();
    }
}

测试
package com.fanqiechaodan;

import com.fanqiechaodan.service.ServiceConsumer;
import com.fanqiechaodan.service.ServiceRegistry;

/**
 * @author fanqiechaodan
 * @Classname Demo
 * @Description
 */
public class Demo {

    private static final String ZOOKEEPER_ADDRESS = "192.168.56.129:2181";
    private static final String SERVICE_NAME = "fanqiechaodan";

    public static void main(String[] args) throws Exception {
        // 创建服务注册对象
        ServiceRegistry serviceRegistry = new ServiceRegistry(ZOOKEEPER_ADDRESS);

        // 启动线程1,进行服务的注册
        Thread thread1 = new Thread(() -> {
            try {
                serviceRegistry.registryService(SERVICE_NAME, "127.0.0.1:8080");
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
        thread1.start();
                                                                                                                                                                                                                                                                          //让线程1先执行一会儿,以确保服务已经被注册
    Thread.sleep(3000);
    // 创建服务消费对象
    ServiceConsumer serviceConsumer = new ServiceConsumer(ZOOKEEPER_ADDRESS);
    // 启动线程2,进行服务的消费
    Thread thread2 = new Thread(() -> {
        try {
            String serviceAddress = serviceConsumer.consumeService(SERVICE_NAME);
            System.out.println("Service consumed: " + serviceAddress);
        } catch (Exception e) {
            e.printStackTrace();
            }
        });
        thread2.start();

        // 等待线程2执行完成
        thread2.join();

        // 睡眠30秒查看ZooKeeper节点是否存在
        Thread.sleep(30000);
        // 关闭服务注册和服务消费对象
        serviceRegistry.close();
        serviceConsumer.close();
    }    

}


http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/0908bed4cf8692a71b9bbbdc5ee2313d..jpg
  
 
http://dtstack-static.oss-cn-hangzhou.aliyuncs.com/2021bbs/files_user1/article/9da053992ab893ecda08e8523f17bcff..jpg
  

总结

上面Java代码实现了服务注册和消费的基本流程,首先服务提供方通过ZooKeeper将服务节点注册到服务注册中心,然后再服务消费方通过ZooKeeper获取服务节点列表,并从中随机的选择一个服务节点进行调用,再实际应用中,还需要对服务节点进行心跳检测,负载均衡等处理,以保证服务的高可用性和稳定性.




免责申明:


本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!

《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu

《数栈V6.0产品白皮书》下载地址:
https://fs80.cn/cw0iw1

想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:
https://www.dtstack.com/?src=bbs

同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:
https://github.com/DTStack

0条评论
社区公告
  • 大数据领域最专业的产品&技术交流社区,专注于探讨与分享大数据领域有趣又火热的信息,专业又专注的数据人园地

最新活动更多
微信扫码获取数字化转型资料
钉钉扫码加入技术交流群