服务器

位置:IT落伍者 >> 服务器 >> 浏览文章

简单多线程服务器实现


发布日期:2018年10月12日
 
简单多线程服务器实现

闲来没事本来是在学习nio框架的突然发现对最原始的多线程服务器都不是很了解遂自己写了个简单的例子

package testmutithreadserverold;

import javaioIOException;

import ServerSocket;

import Socket;

import testmutithreadserveroldthreadpoolThreadPool;

/**

* 简单阻塞式多线程服务器(线程池处理)

*

* @author zhangjun

*

*/

public class Server {

private int port;

private ServerSocket serverSocket;

private ThreadPool threadPool;

private PortListenThread listener;

public Server(int port) {

thisport = port;

threadPool = new ThreadPool();

}

public void start() {

try {

serverSocket = new ServerSocket(port);

listener = new PortListenThread();

listenerstart();

} catch (IOException e) {

eprintStackTrace();

}

}

public void shutdown() {

threadPoolshutdown();

listenerfinish();

}

private class PortListenThread extends Thread {

private Boolean finish = false;

@Override

public void run() {

while (!finish) {

try {

final Socket socket = serverSocketaccept();

threadPoolexecute(new Runnable() {

@Override

public void run() {

new TestMessage(socket)execute();

}

});

} catch (IOException e) {

eprintStackTrace();

}

}

}

public void finish() {

finish = true;

}

}

public static void main(String[] args) {

int port = ;

Systemoutprintln(server is listening on port: + port);

new Server(port)start();

}

}

这个Server调用的是自己实现的一个基于任务队列的简单线程池

package testmutithreadserveroldthreadpool;

import javautilLinkedList;

/**

* 简单线程池 (基于工作队列的同步线程池)

*

* @author zhangjun

*

*/

public class ThreadPool extends ThreadGroup {

private final static String THREADPOOL = thread pool;

private final static String WORKTHREAD = work thread ;

private final static int DEFAULTSIZE = RuntimegetRuntime()

availableProcessors() + ;

private LinkedList<Runnable> taskQueue;

private boolean isPoolClose = false;

public ThreadPool() {

this(DEFAULTSIZE);

}

public ThreadPool(int size) {

super(THREADPOOL);

setDaemon(true);

taskQueue = new LinkedList<Runnable>();

initWorkThread(size);

}

private void initWorkThread(int size) {

for (int i = ; i < size; i++) {

new WorkThread(WORKTHREAD + i)start();

}

try {

Threadsleep( * size);

} catch (InterruptedException e) {

}

}

public synchronized void execute(Runnable task) {

if (isPoolClose) {

throw new IllegalStateException();

}

if (task != null) {

taskQueueadd(task);

notify();

}

}

private synchronized Runnable getTask() throws InterruptedException {

if (taskQueuesize() == ) {

if (isPoolClose) {

return null;

}

wait();

}

if (taskQueuesize() == ) {

return null;

}

return taskQueueremoveFirst();

}

public void shutdown() {

waitFinish();

synchronized (this) {

isPoolClose = true;

interrupt();

taskQueueclear();

}

}

private void waitFinish() {

synchronized (this) {

isPoolClose = true;

notifyAll();

}

Thread[] threads = new Thread[activeCount()];

enumerate(threads);

try {

for (Thread t : threads) {

tjoin();

}

} catch (InterruptedException e) {

//swallow this

}

}

private class WorkThread extends Thread {

public WorkThread(String name) {

super(ThreadPoolthis name);

}

@Override

public void run() {

while (!isInterrupted()) {

Runnable task = null;

try {

task = getTask();

} catch (InterruptedException e) {

//swallow this

}

if (task == null) {

return;

}

try {

taskrun();

} catch (Throwable e) {

eprintStackTrace();

}

}

}

}

}

当然也可以直接使用concurrent的线程池代码几乎不用改变

package testncurrent;

import javaioIOException;

import ServerSocket;

import Socket;

import ncurrentExecutorService;

import ncurrentExecutors;

import testmutithreadserveroldTestMessage;

/**

* 简单阻塞式多线程服务器(线程池处理)

*

* @author zhangjun

*

*/

public class Server {

private int port;

private ServerSocket serverSocket;

private ExecutorService threadPool;

private PortListenThread listener;

public Server(int port) {

thisport = port;

threadPool = ExecutorsnewFixedThreadPool();

}

public void start() {

try {

serverSocket = new ServerSocket(port);

listener = new PortListenThread();

listenerstart();

} catch (IOException e) {

eprintStackTrace();

}

}

public void shutdown() {

threadPoolshutdown();

listenerfinish();

}

private class PortListenThread extends Thread {

private Boolean finish = false;

@Override

public void run() {

while (!finish) {

try {

final Socket socket = serverSocketaccept();

threadPoolexecute(new Runnable() {

@Override

public void run() {

new TestMessage(socket)execute();

}

});

} catch (IOException e) {

eprintStackTrace();

}

}

}

public void finish() {

finish = true;

}

}

public static void main(String[] args) {

int port = ;

Systemoutprintln(server is listening on port: + port);

new Server(port)start();

}

}

里边我构造了一个Message接口

package testmutithreadserverold;

/**

* 通用消息接口

*

* @author zhangjun

*

*/

public interface Message {

void execute();

}

以及实现了一个测试消息类

package testmutithreadserverold;

import javaioBufferedReader;

import javaioIOException;

import javaioInputStreamReader;

import javaioPrintWriter;

import Socket;

/**

* 测试消息

*

* @author zhangjun

*

*/

public class TestMessage implements Message {

private Socket socket;

public TestMessage(Socket socket) {

thissocket = socket;

}

@Override

public void execute() {

try {

BufferedReader in = new BufferedReader(new InputStreamReader(socket

getInputStream()));

PrintWriter out = new PrintWriter(socketgetOutputStream() true);

String s;

while ((s = inreadLine()) != null) {

Systemoutprintln(received message: + s);

if (sequals(quit)) {

break;

}

outprintln(hello + s);

}

} catch (IOException e) {

eprintStackTrace();

} finally {

try {

if (!socketisClosed()) {

socketclose();

}

} catch (IOException e) {

}

}

}

}

代码很简单就不用多解释什么了下一步打算用nio在自己写个非阻塞的服务器

               

上一篇:一个用JAVA写测算服务器响应速度程序

下一篇:用Java实现多线程服务器程序