闲来没事本来是在学习nio框架的突然发现对最原始的多线程服务器都不是很了解遂自己写了个简单的例子
package testmutithreadserverold;
import javaioIOException;
import ServerSocket;
import Socket;
import testmutithreadserveroldthreadpoolThreadPool;
/**
* 简单阻塞式多线程服务器(线程池处理)
*
* @author zhangjun
*
*/
public class Server {
private int port;
private ServerSocket serverSocket;
private ThreadPool threadPool;
private PortListenThread listener;
public Server(int port) {
thisport = port;
threadPool = new ThreadPool();
}
public void start() {
try {
serverSocket = new ServerSocket(port);
listener = new PortListenThread();
listenerstart();
} catch (IOException e) {
eprintStackTrace();
}
}
public void shutdown() {
threadPoolshutdown();
listenerfinish();
}
private class PortListenThread extends Thread {
private Boolean finish = false;
@Override
public void run() {
while (!finish) {
try {
final Socket socket = serverSocketaccept();
threadPoolexecute(new Runnable() {
@Override
public void run() {
new TestMessage(socket)execute();
}
});
} catch (IOException e) {
eprintStackTrace();
}
}
}
public void finish() {
finish = true;
}
}
public static void main(String[] args) {
int port = ;
Systemoutprintln(server is listening on port: + port);
new Server(port)start();
}
}
这个Server调用的是自己实现的一个基于任务队列的简单线程池
package testmutithreadserveroldthreadpool;
import javautilLinkedList;
/**
* 简单线程池 (基于工作队列的同步线程池)
*
* @author zhangjun
*
*/
public class ThreadPool extends ThreadGroup {
private final static String THREADPOOL = thread pool;
private final static String WORKTHREAD = work thread ;
private final static int DEFAULTSIZE = RuntimegetRuntime()
availableProcessors() + ;
private LinkedList<Runnable> taskQueue;
private boolean isPoolClose = false;
public ThreadPool() {
this(DEFAULTSIZE);
}
public ThreadPool(int size) {
super(THREADPOOL);
setDaemon(true);
taskQueue = new LinkedList<Runnable>();
initWorkThread(size);
}
private void initWorkThread(int size) {
for (int i = ; i < size; i++) {
new WorkThread(WORKTHREAD + i)start();
}
try {
Threadsleep( * size);
} catch (InterruptedException e) {
}
}
public synchronized void execute(Runnable task) {
if (isPoolClose) {
throw new IllegalStateException();
}
if (task != null) {
taskQueueadd(task);
notify();
}
}
private synchronized Runnable getTask() throws InterruptedException {
if (taskQueuesize() == ) {
if (isPoolClose) {
return null;
}
wait();
}
if (taskQueuesize() == ) {
return null;
}
return taskQueueremoveFirst();
}
public void shutdown() {
waitFinish();
synchronized (this) {
isPoolClose = true;
interrupt();
taskQueueclear();
}
}
private void waitFinish() {
synchronized (this) {
isPoolClose = true;
notifyAll();
}
Thread[] threads = new Thread[activeCount()];
enumerate(threads);
try {
for (Thread t : threads) {
tjoin();
}
} catch (InterruptedException e) {
//swallow this
}
}
private class WorkThread extends Thread {
public WorkThread(String name) {
super(ThreadPoolthis name);
}
@Override
public void run() {
while (!isInterrupted()) {
Runnable task = null;
try {
task = getTask();
} catch (InterruptedException e) {
//swallow this
}
if (task == null) {
return;
}
try {
taskrun();
} catch (Throwable e) {
eprintStackTrace();
}
}
}
}
}
当然也可以直接使用concurrent的线程池代码几乎不用改变
package testncurrent;
import javaioIOException;
import ServerSocket;
import Socket;
import ncurrentExecutorService;
import ncurrentExecutors;
import testmutithreadserveroldTestMessage;
/**
* 简单阻塞式多线程服务器(线程池处理)
*
* @author zhangjun
*
*/
public class Server {
private int port;
private ServerSocket serverSocket;
private ExecutorService threadPool;
private PortListenThread listener;
public Server(int port) {
thisport = port;
threadPool = ExecutorsnewFixedThreadPool();
}
public void start() {
try {
serverSocket = new ServerSocket(port);
listener = new PortListenThread();
listenerstart();
} catch (IOException e) {
eprintStackTrace();
}
}
public void shutdown() {
threadPoolshutdown();
listenerfinish();
}
private class PortListenThread extends Thread {
private Boolean finish = false;
@Override
public void run() {
while (!finish) {
try {
final Socket socket = serverSocketaccept();
threadPoolexecute(new Runnable() {
@Override
public void run() {
new TestMessage(socket)execute();
}
});
} catch (IOException e) {
eprintStackTrace();
}
}
}
public void finish() {
finish = true;
}
}
public static void main(String[] args) {
int port = ;
Systemoutprintln(server is listening on port: + port);
new Server(port)start();
}
}
里边我构造了一个Message接口
package testmutithreadserverold;
/**
* 通用消息接口
*
* @author zhangjun
*
*/
public interface Message {
void execute();
}
以及实现了一个测试消息类
package testmutithreadserverold;
import javaioBufferedReader;
import javaioIOException;
import javaioInputStreamReader;
import javaioPrintWriter;
import Socket;
/**
* 测试消息
*
* @author zhangjun
*
*/
public class TestMessage implements Message {
private Socket socket;
public TestMessage(Socket socket) {
thissocket = socket;
}
@Override
public void execute() {
try {
BufferedReader in = new BufferedReader(new InputStreamReader(socket
getInputStream()));
PrintWriter out = new PrintWriter(socketgetOutputStream() true);
String s;
while ((s = inreadLine()) != null) {
Systemoutprintln(received message: + s);
if (sequals(quit)) {
break;
}
outprintln(hello + s);
}
} catch (IOException e) {
eprintStackTrace();
} finally {
try {
if (!socketisClosed()) {
socketclose();
}
} catch (IOException e) {
}
}
}
}
代码很简单就不用多解释什么了下一步打算用nio在自己写个非阻塞的服务器