package com.fanqiechaodan.service;
import org.apache.zookeeper.*;
import java.io.IOException;
/**
* @author fanqiechaodan
* @Classname ServiceRegistry
* @Description 服务注册
*/
public class ServiceRegistry {
private static final String SERVICE_REGISTRY_ROOT = "/services";
private static final int SESSION_TIMEOUT = 5000;
private ZooKeeper zooKeeper;
public ServiceRegistry(String zooKeeperAddress) throws IOException {
this.zooKeeper = new ZooKeeper(zooKeeperAddress, SESSION_TIMEOUT, null);
}
/**
* 注册服务
*
* @param serviceName
* @param serviceAddress
* @throws KeeperException
* @throws InterruptedException
*/
public void registryService(String serviceName, String serviceAddress) throws KeeperException, InterruptedException {
String servicePath = SERVICE_REGISTRY_ROOT + "/" + serviceName;
// 如果根节点不存在就创建
if (zooKeeper.exists(SERVICE_REGISTRY_ROOT, false) == null) {
zooKeeper.create(SERVICE_REGISTRY_ROOT, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
String serviceNodePath = zooKeeper.create(servicePath, serviceAddress.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("注册成功;serviceNodePath:" + serviceNodePath);
}
public void close() throws InterruptedException {
zooKeeper.close();
}
}
package com.fanqiechaodan.service;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.List;
/**
* @author fanqiechaodan
* @Classname ServiceConsumer
* @Description
*/
public class ServiceConsumer {
private static final String SERVICE_REGISTRY_ROOT = "/services";
private static final int SESSION_TIMEOUT = 5000;
private ZooKeeper zooKeeper;
public ServiceConsumer(String zooKeeperAddress) throws IOException {
this.zooKeeper = new ZooKeeper(zooKeeperAddress, SESSION_TIMEOUT, null);
}
/**
* 获取服务地址
*
* @param serviceName
* @return
* @throws KeeperException
* @throws InterruptedException
*/
public String consumeService(String serviceName) throws KeeperException, InterruptedException {
String servicePath = SERVICE_REGISTRY_ROOT;
List<String> childrenList = zooKeeper.getChildren(servicePath, watchedEvent -> {
if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
System.out.println("serviceNodePath:" + watchedEvent.getPath());
// synchronized为了保证线程安全,确保notifyAll()调用之前不会有人修改ServiceConsumer
// 使用notifyAll()唤醒左右等待中的线程,让他们同时重新获取服务列表,避免单个线程多次获取同一服务节点的情况
synchronized (this) {
notifyAll();
}
}
});
String serviceNode = childrenList.get((int) (Math.random() * childrenList.size()));
String res = new String(zooKeeper.getData(servicePath + "/" + serviceNode, false, null));
System.out.println("serviceNode:" + serviceNode);
System.out.println("serviceAddress:" + res);
return res;
}
public void close() throws InterruptedException {
zooKeeper.close();
}
}
package com.fanqiechaodan;
import com.fanqiechaodan.service.ServiceConsumer;
import com.fanqiechaodan.service.ServiceRegistry;
/**
* @author fanqiechaodan
* @Classname Demo
* @Description
*/
public class Demo {
private static final String ZOOKEEPER_ADDRESS = "192.168.56.129:2181";
private static final String SERVICE_NAME = "fanqiechaodan";
public static void main(String[] args) throws Exception {
// 创建服务注册对象
ServiceRegistry serviceRegistry = new ServiceRegistry(ZOOKEEPER_ADDRESS);
// 启动线程1,进行服务的注册
Thread thread1 = new Thread(() -> {
try {
serviceRegistry.registryService(SERVICE_NAME, "127.0.0.1:8080");
} catch (Exception e) {
e.printStackTrace();
}
});
thread1.start();
//让线程1先执行一会儿,以确保服务已经被注册
Thread.sleep(3000);
// 创建服务消费对象
ServiceConsumer serviceConsumer = new ServiceConsumer(ZOOKEEPER_ADDRESS);
// 启动线程2,进行服务的消费
Thread thread2 = new Thread(() -> {
try {
String serviceAddress = serviceConsumer.consumeService(SERVICE_NAME);
System.out.println("Service consumed: " + serviceAddress);
} catch (Exception e) {
e.printStackTrace();
}
});
thread2.start();
// 等待线程2执行完成
thread2.join();
// 睡眠30秒查看ZooKeeper节点是否存在
Thread.sleep(30000);
// 关闭服务注册和服务消费对象
serviceRegistry.close();
serviceConsumer.close();
}
}
免责申明:
本文系转载,版权归原作者所有,如若侵权请联系我们进行删除!
《数据治理行业实践白皮书》下载地址:https://fs80.cn/4w2atu
《数栈V6.0产品白皮书》下载地址:https://fs80.cn/cw0iw1
想了解或咨询更多有关袋鼠云大数据产品、行业解决方案、客户案例的朋友,浏览袋鼠云官网:https://www.dtstack.com/?src=bbs
同时,欢迎对大数据开源项目有兴趣的同学加入「袋鼠云开源框架钉钉技术群」,交流最新开源技术信息,群号码:30537511,项目地址:https://github.com/DTStack