博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
使用Kazoo操作ZooKeeper服务治理
阅读量:4618 次
发布时间:2019-06-09

本文共 5120 字,大约阅读时间需要 17 分钟。

单机服务的可靠性及可扩展性有限,某台服务宕机可能会影响整个系统的正常使用;分布式服务能够有效地解决这一问题,但同时分布式服务也会带来一些新的问题,如:服务发现(新增或者删除了服务如何确保能让客户端知道),容灾(某些服务出现故障如何让客户端只访问正常的服务);ZooKeeper的提出主要是为了解决分布式服务的治理问题,它在分布式环境中协调和管理服务。

Zookeeper协调管理服务的过程如下图:

 

服务端:每台服务器都要向注册中心Zookeeper进行注册登记,并且保持与Zookeeper的连接,如果服务器与Zookeeper断开了连接,Zookeeper将删除该服务器的地址。

客户端:需要服务的时候先向Zookeeper订阅服务器的地址信息,Zookeeper返回给客户端已注册的服务器信息列表,客户端从服务器信息列表中选择服务器进行服务调用,如果Zookeeper记录的服务器信息发生了变更,服务器会通知客户端变更事件,客户端可以获取最新的服务器信息。
ZooKeeper文件系统的数据结构是个树状结构,它的每个节点(znode)由一个名称标识,并用路径/分割:

 

 ZooKeeper的节点类型有:

  1. 持久节点(ZooKeeper默认的节点类型,创建该节点的客户端断开连接后,持久节点仍然存在)

  2. 顺序节点(将10位的序列号附加到原始名称来设置节点的路径,如:/server0000000001)

  3. 临时节点(当客户端与ZooKeeper断开连接时,临时节点会自动删除)

 

RPC服务注册到ZooKeeper

服务端:

1 import threading 2 import json 3 import socket 4 import sys 5 from kazoo.client import KazooClient 6 from divide_rpc import ServerStub 7 from divide_rpc import InvalidOperation 8  9 10 class ThreadServer(object):11     def __init__(self, host, port, handlers):12         self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)13         self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)14         self.host = host15         self.port = port16         self.sock.bind((host, port))17         self.handlers = handlers18 19     def serve(self):20         """21         开始服务22         """23         self.sock.listen(128)24         self.register_zk()25         print("开始监听")26         while True:27             conn, addr = self.sock.accept()28             print("建立链接%s" % str(addr))29             t = threading.Thread(target=self.handle, args=(conn,))30             t.start()31 32     def handle(self, client):33         stub = ServerStub(client, self.handlers)34         try:35             while True:36                 stub.process()37         except EOFError:38             print("客户端关闭连接")39 40         client.close()41 42     def register_zk(self):43         """44         注册到zookeeper45         """46         self.zk = KazooClient(hosts='127.0.0.1:2181')47         self.zk.start()48         self.zk.ensure_path('/rpc')  # 创建根节点49         value = json.dumps({
'host': self.host, 'port': self.port})50 # 创建服务子节点51 self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True)52 53 54 class Handlers:55 @staticmethod56 def divide(num1, num2=1):57 """58 除法59 :param num1:60 :param num2:61 :return:62 """63 if num2 == 0:64 raise InvalidOperation()65 val = num1 / num266 return val67 68 69 if __name__ == '__main__':70 if len(sys.argv) < 3:71 print("usage:python server.py [host] [port]")72 exit(1)73 host = sys.argv[1]74 port = sys.argv[2]75 server = ThreadServer(host, int(port), Handlers)76 server.serve()

服务端通过kazoo连接zookeeper,依次创建根节点和服务的子节点,当启动多线程服务器的时候,会根据ip和端口创建不同的节点,依次启动两个server(8001、8002),查看zookeeper的节点信息:

1 >>> from kazoo.client import KazooClient2 >>> zk = KazooClient(hosts='127.0.0.1:2181')3 >>> zk.start() 4 >>> children = zk.get_children("/rpc")5 >>> print(children)6 ['server0000000001', 'server0000000000']

 

客户端:

1 import random 2 import time 3 import json 4 import socket 5 from divide_rpc import ( 6     ClientStub, InvalidOperation 7 ) 8 from kazoo.client import KazooClient 9 10 11 class DistributedChannel(object):12     def __init__(self):13         self._zk = KazooClient(hosts='127.0.0.1:2181')14         self._zk.start()15         self._get_servers()16 17     def _get_servers(self, event=None):18         """19         从zookeeper获取服务器地址信息列表20         """21         servers = self._zk.get_children('/rpc', watch=self._get_servers)22         print(servers)23         self._servers = []24         for server in servers:25             data = self._zk.get('/rpc/' + server)[0]26             if data:27                 addr = json.loads(data.decode())28                 self._servers.append(addr)29 30     def _get_server(self):31         """32         随机选出一个可用的服务器33         """34         return random.choice(self._servers)35 36     def get_connection(self):37         """38         提供一个可用的tcp连接39         """40         while True:41             server = self._get_server()42             print(server)43             try:44                 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)45                 sock.connect((server['host'], server['port']))46             except ConnectionRefusedError:47                 time.sleep(1)48                 continue49             else:50                 break51         return sock52 53 54 channel = DistributedChannel()55 56 for i in range(50):57     try:58         stub = ClientStub(channel)59         val = stub.divide(i)60     except InvalidOperation as e:61         print(e.message)62     else:63         print(val)64     time.sleep(1)

 

客户端连接zookeeper,通过get_children来获取服务器信息,并watch监听服务器的变化情况,启动客户端会发现它会调用8001端口的server和8002端口的server:

此时服务端新增加一个结点,8003,客户端变化情况:

可以看出zookeeper总共有三个节点了,前面调用的server都是8001和8002,当8003加入后,zookeeper会发现并调用它

此时服务端断开一个server,8001,客户端变化情况:

断开server前客户端会调用8001、8002、8003这三个服务,当断开server 8001以后,zookeeper只会调用8002和8003这两个server了

转载于:https://www.cnblogs.com/FG123/p/10261682.html

你可能感兴趣的文章
2014年10月9日——语言基础2
查看>>
mysql查
查看>>
[正则表达式]难点和误区
查看>>
217. Contains Duplicate
查看>>
hadoop遇到问题总结
查看>>
Windows下手动安装redis服务
查看>>
把 MongoDB 当成是纯内存数据库来使用(Redis 风格)
查看>>
PyTorch 1.0 中文官方教程:使用ONNX将模型从PyTorch传输到Caffe2和移动端
查看>>
LeetCode 4Sum
查看>>
BBC-The Race and a quiz
查看>>
大端小端
查看>>
IntelliJ IDEA 把java项目导出成可执行的jar
查看>>
DynamicReports
查看>>
鼠标经过图像改变实现
查看>>
二分查找法
查看>>
Spring3升级到Spring4时, 运行时出现找不到MappingJacksonHttpMessageConverter的情况
查看>>
详解缓冲区溢出攻击以及防范方法
查看>>
分布式事务解决方案(一) 2阶段提交 & 3阶段提交 & TCC
查看>>
android之网格布局和线性布局实现注册页面
查看>>
BZOJ 1014: [JSOI2008]火星人prefix( splay + hash )
查看>>