java

位置:IT落伍者 >> java >> 浏览文章

Java Socket通信技术收发线程互斥的解决方法


发布日期:2019年06月28日
 
Java Socket通信技术收发线程互斥的解决方法

Java Socket通信技术在很长的时间里都在使用在不少的程序员眼中都有很多高的评价那么下面我们就看看如何才能掌握这门复杂的编程语言希望大家在今后的Java <> Socket通信技术使用中有所收获

下面就是Java Socket通信技术在解决收发线程互斥的代码介绍

package combillsvr;

import javaioIOException;

import javaioInputStream;

import javaioOutputStream;

import InetSocketAddress;

import Socket;

import SocketException;

import SocketTimeoutException;

import javatextSimpleDateFormat;

import javautilDate;

import javautilProperties;

import javautilTimer;

import javautilTimerTask;

import ncurrentConcurrentHashMap;

import ncurrentTimeUnit;

import ncurrentlocksCondition;

import ncurrentlocksReentrantLock;

import orgapachelogjLogger;

/**

*<p>title: socket通信包装类</p>

*<p>Description: </p>

*<p>CopyRight: CopyRight (c) </p>

*<p>Company: </p>

*<p>Create date: </P>

*author sunnylocus<A mailto:>

</A> * v 初类

* v 对命令收发逻辑及收发线程互斥机制进行了优化

处理命令速度由原来~个/秒提高到~个/秒

*/ public class SocketConnection {

private volatile Socket socket;

private int timeout = *; //超时时间初始值

private boolean isLaunchHeartcheck = false;//是否已启动心跳检测

private boolean isNetworkConnect = false; //网络是否已连接

private static String host = ;

private static int port;

static InputStream inStream = null;

static OutputStream outStream = null;

private static Logger log =LoggergetLogger

(SocketConnectionclass);

private static SocketConnection socketConnection = null;

private static javautilTimer heartTimer=null;

//private final Map<String Object> recMsgMap= Collections

synchronizedMap(new HashMap<String Object>());

private final ConcurrentHashMap<String Object> recMsgMap

= new ConcurrentHashMap<String Object>();

private static Thread receiveThread = null;

private final ReentrantLock lock = new ReentrantLock();

private SocketConnection(){

Properties conf = new Properties();

try {

nfload(SocketConnectionclassgetResourceAsStream

(nf));

thistimeout = IntegervalueOf(confgetProperty(timeout));

init(confgetProperty(ip)IntegervalueOf

(confgetProperty(port)));

} catch(IOException e) {

logfatal(socket初始化异常!e);

throw new RuntimeException(socket初始化异常请检查配置参数);

}

}

/**

* 单态模式

*/

public static SocketConnection getInstance() {

if(socketConnection==null) {

synchronized(SocketConnectionclass) {

if(socketConnection==null) {

socketConnection = new SocketConnection();

return socketConnection;

}

}

}

return socketConnection;

}

private void init(String hostint port) throws IOException {

InetSocketAddress addr = new InetSocketAddress(hostport);

socket = new Socket();

synchronized (this) {

(【准备与+addr+建立连接】);

nnect(addr timeout);

(【与+addr+连接已建立】);

inStream = socketgetInputStream();

outStream = socketgetOutputStream();

socketsetTcpNoDelay(true);//数据不作缓沖立即发送

socketsetSoLinger(true );//socket关闭时立即释放资源

socketsetKeepAlive(true);

socketsetTrafficClass(x|x);//高可靠性和最小延迟传输

isNetworkConnect=true;

receiveThread = new Thread(new ReceiveWorker());

receiveThreadstart();

SocketConnectionhost=host;

SocketConnectionport=port;

if(!isLaunchHeartcheck)

launchHeartcheck();

}

}

/**

* 心跳包检测

*/

private void launchHeartcheck() {

if(socket == null)

throw new IllegalStateException(socket is not

established!);

heartTimer = new Timer();

isLaunchHeartcheck = true;

heartTimerschedule(new TimerTask() {

public void run() {

String msgStreamNo = StreamNoGeneratorgetStreamNo(kq);

int mstType =;//心跳包请求

SimpleDateFormat dateformate = new SimpleDateFormat

(yyyyMMddHHmmss);

String msgDateTime = dateformateformat(new Date());

int msgLength =;//消息头长度

String commandstr = +msgLength + mstType + msgStreamNo;

(心跳检测包 > IVR +commandstr);

int reconnCounter = ;

while(true) {

String responseMsg =null;

try {

responseMsg = readReqMsg(commandstr);

} catch (IOException e) {

logerror(IO流异常e);

reconnCounter ++;

}

if(responseMsg!=null) {

(心跳响应包 < IVR +responseMsg);

reconnCounter = ;

break;

} else {

reconnCounter ++;

}

if(reconnCounter >) {//重连次数已达三次判定网络连接中断

重新建立连接连接未被建立时不释放锁

reConnectToCTCC(); break;

}

}

}

} * ***);

}

/**

* 重连与目标IP建立重连

*/

private void reConnectToCTCC() {

new Thread(new Runnable(){

public void run(){

(重新建立与+host+:+port+的连接);

//清理工作中断计时器中断接收线程恢复初始变量

heartTimercancel();

isLaunchHeartcheck=false;

isNetworkConnect = false;

receiveThreadinterrupt();

try {

socketclose();

} catch (IOException e) {logerror(重连时关闭socket连

接发生IO流异常e);}

//

synchronized(this){

for(; ;){

try {

ThreadcurrentThread();

Threadsleep( * );

init(hostport);

thisnotifyAll();

break ;

} catch (IOException e) {

logerror(重新建立连接未成功e);

} catch (InterruptedException e){

logerror(重连线程中断e);

}

}

}

}

})start();

}

/**

* 发送命令并接受响应

* @param requestMsg

* @return

* @throws SocketTimeoutException

* @throws IOException

*/

public String readReqMsg(String requestMsg) throws IOException {

if(requestMsg ==null) {

return null;

}

if(!isNetworkConnect) {

synchronized(this){

try {

thiswait(*); //等待如果网络还没有恢复抛出IO流异常

if(!isNetworkConnect) {

throw new IOException(网络连接中断!);

}

} catch (InterruptedException e) {

logerror(发送线程中断e);

}

}

}

String msgNo = requestMsgsubstring( + );//读取流水号

outStream = socketgetOutputStream();

outStreamwrite(requestMsggetBytes());

outStreamflush();

Condition msglock = locknewCondition(); //消息锁

//注册等待接收消息

recMsgMapput(msgNo msglock);

try {

locklock();

msglockawait(timeoutTimeUnitMILLISECONDS);

} catch (InterruptedException e) {

logerror(发送线程中断e);

} finally {

lockunlock();

}

Object respMsg = recMsgMapremove(msgNo); //响应信息

if(respMsg!=null &&(respMsg != msglock)) {

//已经接收到消息注销等待成功返回消息

return (String) respMsg;

} else {

logerror(msgNo+ 超时未收到响应消息);

throw new SocketTimeoutException(msgNo+ 超时未收到响应消息);

}

}

public void finalize() {

if (socket != null) {

try {

socketclose();

} catch (IOException e) {

eprintStackTrace();

}

}

}

//消息接收线程

private class ReceiveWorker implements Runnable {

String intStr= null;

public void run() {

while(!Threadinterrupted()){

try {

byte[] headBytes = new byte[];

if(inStreamread(headBytes)==){

logwarn(读到流未尾对方已关闭流!);

reConnectToCTCC();//读到流未尾对方已关闭流

return;

}

byte[] tmp =new byte[];

tmp = headBytes;

String tempStr = new String(tmp)trim();

if(tempStr==null || tempStrequals()) {

logerror(received message is null);

ntinue;

}

intStr = new String(tmp);

int totalLength =IntegerparseInt(intStr);

//

byte[] msgBytes = new byte[totalLength];

inStreamread(msgBytes);

String resultMsg = new String(headBytes)+ new

String(msgBytes);

//抽出消息ID

String msgNo = resultMsgsubstring( + );

Condition msglock =(Condition) recMsgMapget(msgNo);

if(msglock ==null) {

logwarn(msgNo+序号可能已被注销!响应消息丢弃);

recMsgMapremove(msgNo);

ntinue;

}

recMsgMapput(msgNo resultMsg);

try{

locklock();

msglocksignalAll();

}finally {

lockunlock();

}

}catch(SocketException e){

logerror(服务端关闭sockete);

reConnectToCTCC();

} catch(IOException e) {

logerror(接收线程读取响应数据时发生IO流异常e);

} catch(NumberFormatException e){

logerror(收到没良心包String转int异常异常字符:+intStr);

}

}

}

}

}

以上就是对Java Socket通信技术中收发线程互斥的详细解决方法希望大家有所领悟

               

上一篇:通过注解简化spring aop织入点的指定

下一篇:用Solstice Enterprise Manager建立Java网络管理应用程序