ZooKeeper安装与操作实例

上一篇ZooKeeper详解解了ZooKeeper的各个模块及其工作机制,
本文简单介绍一下ZooKeeper的安装,以及怎么通过Java编程方式操作ZooKeeper。

单机安装

下载zookeeper,解压缩至/usr/zookeeper。

复制conf/zoo_sample.cfg重命名为conf/zoo.cfg,执行命令

1
cp conf/zoo_sample.cfg conf/zoo.cfg

修改zoo.cfg文件,编辑为如下:

1
2
3
4
5
6
tickTime=2000						#心跳间隔
dataDir=/home/sulin/zookeeper #数据存放目录
dataLogDir=/usr/zookeeper/logs
#日志存放目录,手动存在。如果没有貌似启动不成功
clientPort=2181 #客户端连接端口
server.1=218.196.207.186:2888:3888 #2888是zookeeper服务之间的通信端口,3888是zookeeper与其他应用程序之间通信端口。

此时已经可以通过“bin/zkServer.sh start”命令启动zookeeper,然后通过shell访问了zookeeper了

最后一行(server.1)配置是为了让其他主机也能访问此zookeeper,也是伪分布式。

手动创建数据存放目录(/home/sulin/zookeeper),然后在目录中创建一个叫做”myid”的文件,文件内容为“1”。此处的1与上面的1对应。

分布式配置

和上面的伪分布式配置是一样的,如果是三台的话,zoo.cfg配置为:

1
2
3
4
5
6
7
tickTime=2000
dataDir=/home/sulin/zookeeper
dataLogDir=/usr/zookeeper/logs
clientPort=2181
server.1=218.196.207.186:2888:3888
server.2=218.196.207.185:2888:3888
server.3=218.196.207.184:2888:3888

然后可以使用rsync指令将第一台机器上的zookeeper文件夹发送至其他两台:

1
2
sudo rsync -a /usr/zookeeper UbuntuB:/usr
sudo rsync -a /usr/zookeeper UbuntuC:/usr

之后在其余两台机器上面手动创建dataDir文件夹,并且新建myid文件,写入自己的序号1/2/3。

然后分别在三台机器上面启动zookeeper服务:

1
bin/zooService.sh start

注意:一定要谨记将logs文件夹重建,因为里面的version-2里面会保存原有记录,此记录有可能与新集群有冲突。
注意:还有dataDir也要重建,原有的节点快照都会导致新集群出现问题。下面就是dataDir忘记重建出现的读取快照失败错误

1
2
ERROR [main:QuorumPeer@453] - Unable to load database on disk
java.io.Exception: The current epoch, 8, is older than the last zxid, 38654705706

下面是一个比较简单的测试程序,此程序改编自ZooKeeper官方文档中的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package net.sulin.hbase.test;  

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.hbase.util.Bytes;
import org.apache.zookeeper.AsyncCallback.StatCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Id;
import org.apache.zookeeper.data.Stat;

/**
*
* @author sulin
* @date 2012-8-2 上午08:59:50
*/
@SuppressWarnings("deprecation")
public class ZooKeeperTest01 implements Watcher, Runnable, StatCallback{

private static String znode = "/test";

public ZooKeeper zk;
public Stat stat;

public ZooKeeperTest01(String hostPort, String znode, String filename) {
try {
zk = new ZooKeeper(hostPort, 5000, this);
} catch (IOException e) {
e.printStackTrace();
}
try {
if(zk.exists(znode, false) == null){
// 创建节点,权限随便设置了。
Id id = new Id("ip", "218.196.207.187");
ACL acl = new ACL(ZooDefs.Perms.ALL, id);
List<ACL> acls = new ArrayList<ACL>();
acls.add(acl);
zk.create(znode, Bytes.toBytes("这是第一次放入的数据"), acls, CreateMode.PERSISTENT);
}
stat = new Stat();
System.out.println("第一次读到的数据: " + Bytes.toString(zk.getData(znode,true,stat)));
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public static void main(String[] args) {
ZooKeeperTest01 temp = new ZooKeeperTest01("218.196.207.185:2181,218.196.207.184:2181,218.196.207.186:2181", znode, "");
new Thread(temp).start();
// 其他线程已启动
try {
temp.zk.setData(znode, Bytes.toBytes("第二次放入的数据"), temp.stat.getVersion());
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}

synchronized (temp) {
try {
temp.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public void run() {
synchronized(this){
try {
wait();
} catch (InterruptedException e) {

}
}
}

public void close(){
synchronized(this){
this.notifyAll();
}
}

/**
* 监视事件被触发时执行此方法。
*/
public void process(WatchedEvent event) {
String path = event.getPath();
if(event.getType() == Event.EventType.None){
// 节点没有发生改变,无节点创建、无接点删除、节点数据未改变、子节点未改变
// 那么说明可能是会话状态发生了改变
switch(event.getState()){
case SyncConnected:
// 此客户端处于连接状态,不需要做任何事
break;
case Expired:
// 会话失效,结束
this.close();
break;
}
}else{
// 状态改变了,检查是否znode节点值改变。如果改变则取出
if(path != null && path.equals(znode)){
zk.exists(znode, true, this, null);
}
}
}

/**
* 状态回调方法,此方法被执行的触发条件是
* 在异步请求exists方法时,如果节点状态已经改变则执行此方法。
*/
public void processResult(int rc, String path, Object ctx, Stat stat) {
boolean exists = false;
/**
* 现在ZooKeeper已经将异常代码换为枚举类型而不是静态int常量
* 可以用KeeperException.Code.get(rc)获取rc的枚举类型。
*/
switch(rc){
case Code.Ok:
// 一切完好
exists = true;
break;
case Code.NoNode:
// 节点不存在
exists = false;
break;
case Code.SessionExpired:
case Code.NoAuth:
// 结束
this.close();
break;
default:
// 其他错误,重新尝试。。。
zk.exists(znode, false, this, null);
return ;
}

byte[] buf = null;
if(exists){
try {
buf = zk.getData(znode, false, null);
} catch (KeeperException e) {
// 前面已经处理了此异常,此处不必处理
} catch (InterruptedException e) {
// 线程中断?事件线程中断?
return;
}
}
// 读到了数据,简单打印看看了事
System.out.println("第二次异步读到的数据:" + Bytes.toString(buf));
}

}

执行的就是创建一个/test节点,之后放入一个数据,读出打印出来,同时设置监视器。

当第二次放入数据时,监视器会被触发,之后异步读取新数据并打印出来:

第一次读到的数据: 这是第一次放入的数据
第二次异步读到的数据:第二次放入的数据