J2EE集群之failover思考

J2EE的服务器集群主要的就是负载均衡和失败转移。负载均衡这个话题都烂大街了,随处可以找到相关文章。但是这些文章中大部分都只谈了负载均衡,顶多再说一下session复制(失败转移的一种解决方案吧)。更有甚者,直接断定“集群中服务器节点宕机丢失的部分session不碍事”。。。

现有方案

像Tomcat的session交叉复制,如果集群中服务器过多对性能的影响肯定非常之大。

像JBoss、WebLogic等的服务器链式session复制,如果一个服务器节点宕机,此节点的下一个服务器节点就得负责两个服务器的用户请求。是不是有点怕人奥。

至于“集群中某个节点宕机就宕机,session丢失无所谓”这样的观点也是挺匪夷所思的,毕竟系统中2000个session同时丢失的后果不是那么容易承担的。

解决思路

为了解决上面这些问题,我自己琢磨了一套方案,感觉挺不错的(不知道网上是不是有类似“轮子”,反正我是没有搜索到)。就共享出来,嘿嘿~

我想到的是缓存服务器和Web服务器双向备份session。

思路实现

双向备份就是集群中多个Web服务器分散保存session,同时在缓存服务器中也分散保存这些session (每个session正常情况下只需要复制一次),至于怎么分散各位且听我慢慢说来。

测试环境

我使用三台server进行模拟测试,分别为ServerA、ServerB、ServerC。三台服务器分别安置tomcat和memcached服务:

  • ServerA:MemcachedA、TomcatA
  • ServerB:MemcachedB、TomcatB
  • ServerC:MemcachedC、TomcatC

session分布

每台server中的tomcat都将自己的session备份在另外两台server中的memcached上:

  • TomcatA的session备份在MemcachedB和MemcachedC中。
  • TomcatB的session备份在MemcachedC和MemcachedA中。
  • TomcatC的session备份在MemcachedB和MemcachedA中。

在集群正常运行情况下,外部ng/apache等采用Stick-Session方式,将携带sessionid的request固定交由同一台tomcat服务处理。如此,tomcat节点的运行就是完全独立的,互不打扰。session复制备份也是隐式的,tomcat通过哈希码将session隐式备份在其他集群节点中的memcached上。

如果Tomcat节点宕机怎么办?

假设TomcatA突然宕机了,那么ng/apache就会无法将请求分发给JSESSIONID相对应的TomcatA上。这时候,ng/apache可以把请求分发给TomcatB或TomcatC,这两台服务器会接收到原属于TomcatA请求。

它们没有原属于TomcatA相应的JSESSIONID,但可以主动从Memcached集群中根据JSESSIONID寻找这个session。如果找到的话,他们便可以把这个session“收录”为自己的,即将JSESSIONID修改为自己的。

这样一来,原属于TomcatA的 会话就可以无缝交由TomcatB/TomcatC提供服务。

如果Memcached节点宕机了怎么办?

假设MemcachedA宕机,那么整个集群中保存在MemcachedA中的Session备份就丢失,那么TomcatB和TomcatC中的Session就没有了备份。

如果这时这两台服务器再挂断的话,Session就真的丢失了。为了保证“整个集群中随时都存有同一个Session的备份”,TomcatB和TomcatC应该为自身的Session负责。他们应该主动把自己的Session再备份一次(可以采用临时新建线程的方式,毕竟宕机不是频繁的么。属于特殊情况),再次备份到其他的可用Memcached节点中即可。

解决它

关于这个想法,网上有一个开源项目叫做memcached-session-manager。我试用了一下它,感觉不怎么好用。它只能说实现了部分我的想法吧,并且测试中仍然存在session。丢失的情况。

除此之外,它也有着令人发指的效率问题!我简单测试了一下,普通的tomcat渲染一个JSP页面处理只耗费62.3ms,而添加上这个插件之后,普通的JSP渲染时间竟然变为2.18s !!!

靠人不如靠自己、我便自己实现了自己的思路。

解决办法

我通过替换了Tomcat原有的StandardManager(MemcachedManager替换)、StandardSession(MemcachedSession替换)来实现session的集群复制,然后加入了隐式的HttpSession 备份操作。同时也可以根据需要,从Memcached中获取其他节点现有的session。

我还在Context中加入了一个Valve,该Valve会在每次请求之后根据session的属性是否被修改来判断是否更新缓存(如果session未被修改,那么就完全不必要更新Memcached中缓存的session咯)。 不涉及setAttrubute、removeAttribute的操作是不会触发缓存更新的,并且setAttribute如果设置重复值也是不会引起缓存更新的。

在XMemcached中我添加了DisConnect事件的监听器、如果某条connect断开(Memcached服务器节点宕机)的话,监听器会主动将Session再次更新入其他正常运行的Memcached节点中。

实现代码

Memcached客户端运行期监听器。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package net.sulin.mem;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;

import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientStateListener;

/**
* Memcached客户端运行期监听器,专门用于监听连接断开事件。
* 如果某条连接断开、则通知相关组件将与此Memcached节点的相关缓存信息重新分布。
* 以防止出现集群中的重要数据(Session)没有备份。
* @Copyright(C) 2011-8-20 sulin
* @Version 1.0
* @Filename MemcachedListener.java
* @author sulin
* @Update sulin
*/
public class MemcachedListener implements MemcachedClientStateListener {

private MemcachedTool tool;

private List<InetSocketAddress> disConnectionAddrs;

public MemcachedListener(MemcachedTool tool) {
this.tool = tool;
this.disConnectionAddrs = new ArrayList<InetSocketAddress>();
}

public void onDisconnected(MemcachedClient client, InetSocketAddress addr) {
// 防止同一Memcached节点的线程池被重复修复
if(!disConnectionAddrs.contains(addr)){
tool.repairCache();
disConnectionAddrs.add(addr);
}
}

public void onConnected(MemcachedClient client, InetSocketAddress addr) {
// 如果Memcached节点被修复,则将此节点从故障标记中移除
disConnectionAddrs.remove(addr);
}

public void onException(MemcachedClient client, Throwable exception) {
}

public void onShutDown(MemcachedClient arg0) {
}

public void onStarted(MemcachedClient arg0) {
}

}

替换Tomcat默认StandardManager的MemcachedManager

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
package net.sulin.mem;

import java.io.IOException;
import java.util.Enumeration;
import java.util.concurrent.TimeoutException;

import net.rubyeye.xmemcached.exception.MemcachedException;

import org.apache.catalina.Container;
import org.apache.catalina.Session;
import org.apache.catalina.session.ManagerBase;
import org.apache.catalina.session.StandardSession;

/**
* 此类用于代替Tomcat默认的Manager,它专门用于使用Memcached存储备份Session。
* 同时Tomcat本身的Session内存存储功能仍然保留(findSession()方法首先从内存
* 中获取Session,然后再从Memcached集群中获取Session)
* @Copyright(C) 2011-8-20 sulin
* @Version 1.0
* @Filename MemcachedManager.java
* @author sulin
* @Update sulin
*/
public class MemcachedManager extends ManagerBase {

// Memcached更新队列最大长度
private int queueSize = 200;

// Memcached服务器节点地址【ip:port】
private String addr;

// XMemcached客户端连接池大小
private int poolSize;

// XMemcached支持的协议
private String protocol;

// 当前可用Memcached客户端对象
private MemcachedTool tool = null;

/**
* 构造方法
*/
public MemcachedManager() {
setDistributable(true); // attribute必须可序列化
this.protocol = "text";
}

/**
* 获取Session
*/
public Session findSession(String id) throws IOException {
if(id == null)
return null;
Session result = super.findSession(id);
// 如果result为null、尝试从Memcached集群中获取此Session
if(result == null){
try {
result = getTool().getFromCache(id, 1000);
// 如果获取到Session,将它转换为当前服务器节点Session
if(result != null)
if(result instanceof MemcachedSession){
MemcachedSession newSession = (MemcachedSession) createSession(generateSessionId());
Enumeration<?> names = ((MemcachedSession) result).getAttributeNames();
while(names.hasMoreElements()){
String key = (String) names.nextElement();
newSession.setAttribute(key, ((MemcachedSession) result).getAttribute(key));
}
add(newSession);
result = newSession;
}
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (MemcachedException e) {
e.printStackTrace();
}
}
return result;
}

/**
* 自用的findSession、防止从缓存中读取数据
* @param id
* @param ext
* @return
*/
public Session findSession(String id, boolean ext) {
if(ext){
try {
return super.findSession(id);
} catch (IOException e) {
e.printStackTrace();
}
}else{
try {
return findSession(id);
} catch (IOException e) {
e.printStackTrace();
}
}
return null;
}

/**
* 删除Session
*/
public void remove(Session session) {
super.remove(session);
/** 此处已被优化省略
if(session.isValid()){
try {
getTool().deleteFromCache(session.getId(), 1000);
} catch (TimeoutException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (MemcachedException e) {
e.printStackTrace();
}
}
*/
}

/**
* 新建Session
*/
protected StandardSession getNewSession() {
return new MemcachedSession(this);
}

/**
* 将内置阀门添加入管道中
*/
public void setContainer(Container container) {
container.getPipeline().addValve(new MemcachedValve());
super.setContainer(container);
}

/**
* 执行Manager关闭工作
*/
public void destroy() {
if(this.tool != null){
try {
tool.close();
} catch (IOException e) {
e.printStackTrace();
}
}
super.destroy();
}

/** 尚未覆写方法 */
public int getRejectedSessions() {
return 0;
}
public void setRejectedSessions(int rejectedSessions) {
}
public void load() throws ClassNotFoundException, IOException {
}
public void unload() throws IOException {
}

/****************************setter、getter**************************/
public int getQueueSize() {
return queueSize;
}
public void setQueueSize(int queueSize) {
this.queueSize = queueSize;
}
public String getAddr() {
return addr;
}
public void setAddr(String addr) {
this.addr = addr;
}
public int getPoolSize() {
return poolSize;
}
public void setPoolSize(int poolSize) {
this.poolSize = poolSize;
}
public MemcachedTool getTool() {
// 判断tool是否为null,如果为null则创建这个tool
if(tool == null){
try {
tool = new MemcachedTool(this);
} catch (IOException e) {
e.printStackTrace();
}
}
return tool;
}
public void setMaxInactiveInterval(int maxInactiveInterval) {
// 设置Session最大有效期
this.maxInactiveInterval = maxInactiveInterval*60;
}
public void setProtocol(String protocol) {
if(!"text".equals(protocol) && !"binary".equals(protocol)){
throw new RuntimeException("protocol["+protocol+"] does not exist");
}
this.protocol = protocol;
}
public String getProtocol() {
return protocol;
}

}

替换Tomcat自带StandardSession的MemcachedSession

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package net.sulin.mem;

import org.apache.catalina.Manager;
import org.apache.catalina.session.StandardSession;

/**
* 覆写StandardSession类,将Tomcat的Session实现类中添加一个
* isChange标志位,在其他地方需要根据这个isChange标志位判断是否需要
* 将此Session对象更新入Memcached服务器集群中。
* @Copyright(C) 2011-8-20 sulin
* @Version 1.0
* @Filename MemcachedSession.java
* @author sulin
* @Update sulin
*/
public class MemcachedSession extends StandardSession {

private static final long serialVersionUID = 8052215026550611559L;

// 当前Request中此Session是否被修改
private transient boolean isChange;

/**
* 构造方法
*/
public MemcachedSession(Manager manager) {
super(manager);
}

/**
* 修改属性
*/
public void setAttribute(String name, Object value) {
Object obj = getAttribute(name);
if(obj!=null)
if(obj.equals(value))
return; // attribute值相同、不需要修改
super.setAttribute(name, value);
setChange(true);
}

/**
* 删除属性
*/
public void removeAttribute(String name) {
System.out.println(name);
Object obj = getAttribute(name);
if(obj==null)
return; // attribute不存在、不需要修改
super.removeAttribute(name);
setChange(true);
}

/****************** getter、setter *****************/
public void setChange(boolean isChange) {
this.isChange = isChange;
}
public boolean isChange() {
return isChange;
}

}

Tomcat与Memcached的连接桥梁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package net.sulin.mem;

import java.io.IOException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import javax.servlet.http.HttpSession;

import org.apache.catalina.Session;

import com.google.code.yanf4j.core.impl.StandardSocketOption;

import net.rubyeye.xmemcached.MemcachedClient;
import net.rubyeye.xmemcached.MemcachedClientBuilder;
import net.rubyeye.xmemcached.XMemcachedClientBuilder;
import net.rubyeye.xmemcached.command.BinaryCommandFactory;
import net.rubyeye.xmemcached.exception.MemcachedException;
import net.rubyeye.xmemcached.utils.AddrUtil;

/**
* Tomcat应用程序与Memcached集群的连接桥梁。
* 用于提供相关的添加数据、修改数据、删除数据操作。
* 并且它也负责在某一天连接意外断开时通过重置Session分布
* 来避免数据出现没有备份(备份机宕机)的情况。
* @Copyright(C) 2011-8-20 sulin
* @Version 1.0
* @Filename MemcachedTool.java
* @author sulin
* @Update sulin
*/
public class MemcachedTool {

// 更新缓存操作队列
private BlockingQueue<HttpSession> queue;

// Session在缓存中的过期时间(分钟)
private int expire = 30;

// 客户端对象
private MemcachedClient client = null;

// Manager对象
private MemcachedManager manager;

/**
* 初始化Memcached客户端,并设置相关参数
* 这些参数需要通过MemcachedManager配置
* @param manager MemcachedManager对象
* @throws IOException
*/
public MemcachedTool(MemcachedManager manager) throws IOException{
this.manager = manager;
// 初始化操作队列
this.queue = new ArrayBlockingQueue<HttpSession>(this.manager.getQueueSize());
// 设置Session在缓存中的超时时间
this.expire = this.manager.getMaxInactiveInterval();
// 初始化MemcachedClient
MemcachedClientBuilder builder = new XMemcachedClientBuilder(
AddrUtil.getAddresses(this.manager.getAddr()));
builder.setConnectionPoolSize(this.manager.getPoolSize());
builder.setSocketOption(StandardSocketOption.SOSNDBUF, 16*1024);
builder.setSocketOption(StandardSocketOption.TCPNODELAY, true);
builder.addStateListener(new MemcachedListener(this));
if("binary".equals(manager.getProtocol())){
builder.setCommandFactory(new BinaryCommandFactory());
}
this.client = builder.build();
this.client.setEnableHeartBeat(false);
// 启动备份Session线程
backupProcess();
}

/**
* 将Session放入更新队列【Session更新入缓存为异步操作】
* @param session 需要放入队列的Session
* @param timeout 超时时间、秒
* @throws InterruptedException
*/
public void saveToCache(HttpSession session, int timeout)
throws InterruptedException{
queue.offer(session, timeout, TimeUnit.SECONDS);
}

/**
* 尝试从缓存中取出指定sessionId的Session【同步操作】
* @param sessionId 取出的Session的id
* @param timeout 超时时间、秒
* @throws TimeoutException
* @throws InterruptedException
* @throws MemcachedException
*/
public Session getFromCache(String sessionId, int timeout)
throws TimeoutException, InterruptedException, MemcachedException {
return client.get(sessionId, timeout);
}

/**
* 尝试从缓存中删除指定sessionId的Session【同步操作】
* @param sessionId
* @throws TimeoutException
* @throws InterruptedException
* @throws MemcachedException
*/
public void deleteFromCache(String sessionId, long timeout)
throws TimeoutException, InterruptedException, MemcachedException{
client.delete(sessionId, timeout);
}

/**
* 当某个连接断开时、可以通过此方法将所有Session重新缓存
* 特殊情况特殊对待,此处忽略新建线程带来的压力(这个方法一般很少需要调用)。
*/
protected void repairCache(){
new Thread(new Runnable() {
public void run() {
Session[] sessions = manager.findSessions();
for(Session session : sessions){
try {
client.set(session.getId(), manager.getMaxInactiveInterval(), session);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}).start();
}

/**
* 关闭当前Memcached客户端
* @throws IOException
*/
public void close() throws IOException{
client.shutdown();
}

// 更新Session队列的备份线程
private void backupProcess(){
new Thread(new Runnable() {
public void run() {
while(true){
// 获取队列中的Session
HttpSession session = null;
synchronized(queue){
try {
session = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(session == null)
continue;
// 将取出的Session保存或更新入 Memcached 中
try {
MemcachedSession temp = (MemcachedSession) session;
if(temp.isChange()){
client.set(session.getId(), expire*60, session);
}else{
if("binary".equals(manager.getProtocol())){
client.touch(session.getId(), expire*60);
}
}
temp.setChange(false);
// 存入失败,将Session重新放入队列。等待下次循环
} catch (Exception e) {
synchronized(queue){
queue.offer(session);
}
e.printStackTrace();
}
}
}
}).start();
}

}

插入Tomcat的Valve

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package net.sulin.mem;

import java.io.IOException;

import javax.servlet.ServletException;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpSession;

import org.apache.catalina.Manager;
import org.apache.catalina.Session;
import org.apache.catalina.connector.Request;
import org.apache.catalina.connector.Response;
import org.apache.catalina.valves.ValveBase;

/**
* Memcached阀门,用于在请求收尾时将此请求相关Session进行更新操作
* @Copyright(C) 2011-8-20 sulin
* @Version 1.0
* @Filename MemcachedValve.java
* @author sulin
* @Update sulin
*/
public class MemcachedValve extends ValveBase {

public void invoke(Request request, Response response)
throws IOException, ServletException {
getNext().invoke(request, response);
afterInvoke(request, response);
}

/**
* Request处理后、进行Session的更新缓存处理
* @param request
*/
public void afterInvoke(Request request, Response response){
// 获取sessionId
Session session = request.getSessionInternal();
if(session == null)
return;
// Session不为空、判断是否需要更新入缓存并执行相应操作
if(session instanceof MemcachedSession){
MemcachedSession temp = (MemcachedSession) session;
if(!temp.getId().equals(request.getRequestedSessionId())){
Cookie cookie = new Cookie("JSESSIONID", session.getId());
cookie.setPath("/");
response.addCookie(cookie);
}
Manager manager = session.getManager();
if(manager instanceof MemcachedManager){
try {
((MemcachedManager)manager).getTool().saveToCache((HttpSession)session, 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

}

配置Tomcat

将以上代码打包后放入lib中,然后在context.xml中添加如下配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
<Manager 	className="net.sulin.mem.MemcachedManager"
queueSize="300"
addr="127.0.0.1:11211 127.0.0.1:11212 127.0.0.1:11213"
poolSize="3"
protocol="text"
maxInactiveInterval="30"/>
<!--
className : 替换的Manager
queueSize : 我采用了异步队列更新 HttpSession 缓存,队列用于存储需要更新的HttpSession对象,如果值太小的话。队列空间不够用
addr : 就是 Memcached 的服务器地址和端口了,每个之间用空格隔开(XMemcached就是这样处理的,我不想费事)
poolSize : XMemcached 客户端与 Memcached 服务器的连接池,据XMemcached 作者说即便是高并发也 30 之内吧,因为这是用NIO处理的,不需要太多连接。
protocol : XMemcached 客户端与 Memcached 服务器之间的数据传输协议,可以是“text”也可以是“binary”, binary 的话它可以使用 XMemcached 的touch方法等, 但是需要 1.4 以上版本的XMemcached。 我没使用
maxInactiveInterval :HttpSession过期时间, 分钟为单位
-->

大功告成~~~

总结

添加了这个session复制功能之后的Tomcat居然比正常的Tomcat处理JSP页面还快。处理一个正常同样JSP页面只需要57.5ms,哈哈,快了几毫秒(应该是测试数据误差)。

在测试中,我胡乱关闭Tomcat服务和Memcached服务。但每次请求的session都没有遇见丢失的情况。嘿嘿,感觉挺不错的,性能上也是看不到影响。

但是就目前而言,这是没有经受过任何生产环境考验的!