vb编程设计计算器:Mina编程的两个注意点

来源:百度文库 编辑:中财网 时间:2024/04/29 11:46:06
1. 首先,这是一个nio的框架,仍然是采用reactor模式,知道这一点后,那么编程就没有什么难的。nio的编程,无外乎就是这些套路, 再进一步说,网络编程,也就是这些套路了。
2. 那么剩下编程的注意点,也就是编解码的处理以及最后的业务逻辑的处理。
2.1 编解码的注意点:因为在网络编程中,client和server之间,往往需要完整的接收到一条消息的后,才交给业务逻辑处理。具体可以参看http://jimmee.iteye.com/blog/617544,其中,我们常常是继承自CumulativeProtocolDecoder来实现自己的解码器,主要的docode的方法,其作用是将本次数据和上次接收到的数据(如果doDecode方法没有处理完的话),统一放到一个buffer中,之后扔给 doDecode方法处理,若处理完之后,还有剩下的数据,则继续缓存。
Java代码 ',1)"> 
/**
* Cumulates content of in into internal buffer and forwards
* decoding request to {@link #doDecode(IoSession, IoBuffer, ProtocolDecoderOutput)}.
doDecode() is invoked repeatedly until it returns false
* and the cumulative buffer is compacted after decoding ends.
*
* @throws IllegalStateException if your doDecode() returned
*                               true not consuming the cumulative buffer.
*/
public void decode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
if (!session.getTransportMetadata().hasFragmentation()) {
while (in.hasRemaining()) {
if (!doDecode(session, in, out)) {
break;
}
}
return;
}
boolean usingSessionBuffer = true;
IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);
// If we have a session buffer, append data to that; otherwise
// use the buffer read from the network directly.
if (buf != null) {
boolean appended = false;
// Make sure that the buffer is auto-expanded.
if (buf.isAutoExpand()) {
try {
buf.put(in);
appended = true;
} catch (IllegalStateException e) {
// A user called derivation method (e.g. slice()),
// which disables auto-expansion of the parent buffer.
} catch (IndexOutOfBoundsException e) {
// A user disabled auto-expansion.
}
}
if (appended) {
buf.flip();
} else {
// Reallocate the buffer if append operation failed due to
// derivation or disabled auto-expansion.
buf.flip();
IoBuffer newBuf = IoBuffer.allocate(
buf.remaining() + in.remaining()).setAutoExpand(true);
newBuf.order(buf.order());
newBuf.put(buf);
newBuf.put(in);
newBuf.flip();
buf = newBuf;
// Update the session attribute.
session.setAttribute(BUFFER, buf);
}
} else {
buf = in;
usingSessionBuffer = false;
}
// 上面操作完后,得到的buf是包含以前积累的数据(如果没有读取处理掉),再加上本次得到
// 的数据,最后扔给doDecode处理。
for (;;) {
int oldPos = buf.position();
boolean decoded = doDecode(session, buf, out);
if (decoded) {
if (buf.position() == oldPos) {
throw new IllegalStateException(
"doDecode() can't return true when buffer is not consumed.");
}
if (!buf.hasRemaining()) {
break;
}
} else {
break;
}
}
// if there is any data left that cannot be decoded, we store
// it in a buffer in the session and next time this decoder is
// invoked the session buffer gets appended to
if (buf.hasRemaining()) {
if (usingSessionBuffer && buf.isAutoExpand()) {
buf.compact();
} else {
storeRemainingInSession(buf, session);
}
} else {
if (usingSessionBuffer) {
removeSessionBuffer(session);
}
}
}
可以看一个具体的实现:PrefixedStringDecoder的 doDecode方法,这个decode是消息长度+具体字节流的消息格式。
Java代码 ',2)"> 
protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
if (in.prefixedDataAvailable(prefixLength, maxDataLength)) {
String msg = in.getPrefixedString(prefixLength, charset.newDecoder());
out.write(msg);
return true;
}
return false;
}
其中, prefixedDataAvailable方法是判断得到IoBuffer里的数据是否满足一条消息了,如果再进去看这个方法的实现化,读取是,使用ByteBuffer的绝对位置的读取方法(这种读取不影响position的值的);当已经有一条完整的消息时,则用getPrefixedString读取(使用的是ByteBuffer的相对位置的读取方法,这会影响position的值,从而实际的消费掉数据).
2.2 业务逻辑的处理注意点,一般都会使用一个线程池处理。这里就有一个问题,有时候同一个连接的消息处理,是希望按照顺序来进行的。这也很简单,不需要保证一个连接的所有的业务处理都限定在一个固定的线程中,但是需要保证当有消息需要处理时,这些消息的处理,都在同一个线程中完成。当然了,mina中也有了相应的实现OrderedThreadPoolExecutor。实现原理很简单:
一个连接(session)对应一个消息队列,同时此session也放到一个队列中,说明这个session有消息需要处理。具体来说,就是使用SessionTasksQueue来表示一个Session的要处理的消息的队列;
使用
Java代码 ',3)"> 
/** A queue used to store the available sessions */
private final BlockingQueue waitingSessions = new LinkedBlockingQueue();
waitingSessions表示有消息需要处理的session的队列。
Java代码 ',4)"> 
/**
* {@inheritDoc}
*/
@Override
public void execute(Runnable task) {
if (shutdown) {
rejectTask(task);
}
// Check that it's a IoEvent task
checkTaskType(task);
IoEvent event = (IoEvent) task;
// Get the associated session
IoSession session = event.getSession();
// 得到保存session消息的队列
SessionTasksQueue sessionTasksQueue = getSessionTasksQueue(session);
Queue tasksQueue = sessionTasksQueue.tasksQueue;
boolean offerSession;
// propose the new event to the event queue handler. If we
// use a throttle queue handler, the message may be rejected
// if the maximum size has been reached.
boolean offerEvent = eventQueueHandler.accept(this, event);
if (offerEvent) {
// Ok, the message has been accepted
synchronized (tasksQueue) {
// Inject the event into the executor taskQueue
tasksQueue.offer(event);
// 如果session中的消息已经处理完了,说明没有线程在处理这个session,
// 重新提交给线程池处理
if (sessionTasksQueue.processingCompleted) {
sessionTasksQueue.processingCompleted = false;
offerSession = true;
} else {
offerSession = false;
}
if (LOGGER.isDebugEnabled()) {
print(tasksQueue, event);
}
}
} else {
offerSession = false;
}
if (offerSession) {
// As the tasksQueue was empty, the task has been executed
// immediately, so we can move the session to the queue
// of sessions waiting for completion.
waitingSessions.offer(session);
}
addWorkerIfNecessary();
if (offerEvent) {
eventQueueHandler.offered(this, event);
}
对应的,看一下:
Java代码 ',5)"> 
private void runTasks(SessionTasksQueue sessionTasksQueue) {
for (;;) {
Runnable task;
Queue tasksQueue = sessionTasksQueue.tasksQueue;
synchronized (tasksQueue) {
task = tasksQueue.poll();
// 当已经没有任务时,处理此session的线程结束掉
if (task == null) {
sessionTasksQueue.processingCompleted = true;
break;
}
}
eventQueueHandler.polled(OrderedThreadPoolExecutor.this, (IoEvent) task);
runTask(task);
}
}
小结:个人认为,只要对nio编程熟悉,对mina框架,只要明白了以上两点,就不再有什么大问题了。