Java Socket通信技术在很长的时间里都在使用在不少的程序员眼中都有很多高的评价那么下面我们就看看如何才能掌握这门复杂的编程语言希望大家在今后的Java <> Socket通信技术使用中有所收获
下面就是Java Socket通信技术在解决收发线程互斥的代码介绍
package combillsvr;
import javaioIOException;
import javaioInputStream;
import javaioOutputStream;
import InetSocketAddress;
import Socket;
import SocketException;
import SocketTimeoutException;
import javatextSimpleDateFormat;
import javautilDate;
import javautilProperties;
import javautilTimer;
import javautilTimerTask;
import ncurrentConcurrentHashMap;
import ncurrentTimeUnit;
import ncurrentlocksCondition;
import ncurrentlocksReentrantLock;
import orgapachelogjLogger;
/**
*<p>title: socket通信包装类</p>
*<p>Description: </p>
*<p>CopyRight: CopyRight (c) </p>
*<p>Company: </p>
*<p>Create date: </P>
*author sunnylocus<A mailto:>
</A> * v 初类
* v 对命令收发逻辑及收发线程互斥机制进行了优化
处理命令速度由原来~个/秒提高到~个/秒
*/ public class SocketConnection {
private volatile Socket socket;
private int timeout = *; //超时时间初始值秒
private boolean isLaunchHeartcheck = false;//是否已启动心跳检测
private boolean isNetworkConnect = false; //网络是否已连接
private static String host = ;
private static int port;
static InputStream inStream = null;
static OutputStream outStream = null;
private static Logger log =LoggergetLogger
(SocketConnectionclass);
private static SocketConnection socketConnection = null;
private static javautilTimer heartTimer=null;
//private final Map<String Object> recMsgMap= Collections
synchronizedMap(new HashMap<String Object>());
private final ConcurrentHashMap<String Object> recMsgMap
= new ConcurrentHashMap<String Object>();
private static Thread receiveThread = null;
private final ReentrantLock lock = new ReentrantLock();
private SocketConnection(){
Properties conf = new Properties();
try {
nfload(SocketConnectionclassgetResourceAsStream
(nf));
thistimeout = IntegervalueOf(confgetProperty(timeout));
init(confgetProperty(ip)IntegervalueOf
(confgetProperty(port)));
} catch(IOException e) {
logfatal(socket初始化异常!e);
throw new RuntimeException(socket初始化异常请检查配置参数);
}
}
/**
* 单态模式
*/
public static SocketConnection getInstance() {
if(socketConnection==null) {
synchronized(SocketConnectionclass) {
if(socketConnection==null) {
socketConnection = new SocketConnection();
return socketConnection;
}
}
}
return socketConnection;
}
private void init(String hostint port) throws IOException {
InetSocketAddress addr = new InetSocketAddress(hostport);
socket = new Socket();
synchronized (this) {
(【准备与+addr+建立连接】);
nnect(addr timeout);
(【与+addr+连接已建立】);
inStream = socketgetInputStream();
outStream = socketgetOutputStream();
socketsetTcpNoDelay(true);//数据不作缓沖立即发送
socketsetSoLinger(true );//socket关闭时立即释放资源
socketsetKeepAlive(true);
socketsetTrafficClass(x|x);//高可靠性和最小延迟传输
isNetworkConnect=true;
receiveThread = new Thread(new ReceiveWorker());
receiveThreadstart();
SocketConnectionhost=host;
SocketConnectionport=port;
if(!isLaunchHeartcheck)
launchHeartcheck();
}
}
/**
* 心跳包检测
*/
private void launchHeartcheck() {
if(socket == null)
throw new IllegalStateException(socket is not
established!);
heartTimer = new Timer();
isLaunchHeartcheck = true;
heartTimerschedule(new TimerTask() {
public void run() {
String msgStreamNo = StreamNoGeneratorgetStreamNo(kq);
int mstType =;//心跳包请求
SimpleDateFormat dateformate = new SimpleDateFormat
(yyyyMMddHHmmss);
String msgDateTime = dateformateformat(new Date());
int msgLength =;//消息头长度
String commandstr = +msgLength + mstType + msgStreamNo;
(心跳检测包 > IVR +commandstr);
int reconnCounter = ;
while(true) {
String responseMsg =null;
try {
responseMsg = readReqMsg(commandstr);
} catch (IOException e) {
logerror(IO流异常e);
reconnCounter ++;
}
if(responseMsg!=null) {
(心跳响应包 < IVR +responseMsg);
reconnCounter = ;
break;
} else {
reconnCounter ++;
}
if(reconnCounter >) {//重连次数已达三次判定网络连接中断
重新建立连接连接未被建立时不释放锁
reConnectToCTCC(); break;
}
}
}
} * ***);
}
/**
* 重连与目标IP建立重连
*/
private void reConnectToCTCC() {
new Thread(new Runnable(){
public void run(){
(重新建立与+host+:+port+的连接);
//清理工作中断计时器中断接收线程恢复初始变量
heartTimercancel();
isLaunchHeartcheck=false;
isNetworkConnect = false;
receiveThreadinterrupt();
try {
socketclose();
} catch (IOException e) {logerror(重连时关闭socket连
接发生IO流异常e);}
//
synchronized(this){
for(; ;){
try {
ThreadcurrentThread();
Threadsleep( * );
init(hostport);
thisnotifyAll();
break ;
} catch (IOException e) {
logerror(重新建立连接未成功e);
} catch (InterruptedException e){
logerror(重连线程中断e);
}
}
}
}
})start();
}
/**
* 发送命令并接受响应
* @param requestMsg
* @return
* @throws SocketTimeoutException
* @throws IOException
*/
public String readReqMsg(String requestMsg) throws IOException {
if(requestMsg ==null) {
return null;
}
if(!isNetworkConnect) {
synchronized(this){
try {
thiswait(*); //等待秒如果网络还没有恢复抛出IO流异常
if(!isNetworkConnect) {
throw new IOException(网络连接中断!);
}
} catch (InterruptedException e) {
logerror(发送线程中断e);
}
}
}
String msgNo = requestMsgsubstring( + );//读取流水号
outStream = socketgetOutputStream();
outStreamwrite(requestMsggetBytes());
outStreamflush();
Condition msglock = locknewCondition(); //消息锁
//注册等待接收消息
recMsgMapput(msgNo msglock);
try {
locklock();
msglockawait(timeoutTimeUnitMILLISECONDS);
} catch (InterruptedException e) {
logerror(发送线程中断e);
} finally {
lockunlock();
}
Object respMsg = recMsgMapremove(msgNo); //响应信息
if(respMsg!=null &&(respMsg != msglock)) {
//已经接收到消息注销等待成功返回消息
return (String) respMsg;
} else {
logerror(msgNo+ 超时未收到响应消息);
throw new SocketTimeoutException(msgNo+ 超时未收到响应消息);
}
}
public void finalize() {
if (socket != null) {
try {
socketclose();
} catch (IOException e) {
eprintStackTrace();
}
}
}
//消息接收线程
private class ReceiveWorker implements Runnable {
String intStr= null;
public void run() {
while(!Threadinterrupted()){
try {
byte[] headBytes = new byte[];
if(inStreamread(headBytes)==){
logwarn(读到流未尾对方已关闭流!);
reConnectToCTCC();//读到流未尾对方已关闭流
return;
}
byte[] tmp =new byte[];
tmp = headBytes;
String tempStr = new String(tmp)trim();
if(tempStr==null || tempStrequals()) {
logerror(received message is null);
ntinue;
}
intStr = new String(tmp);
int totalLength =IntegerparseInt(intStr);
//
byte[] msgBytes = new byte[totalLength];
inStreamread(msgBytes);
String resultMsg = new String(headBytes)+ new
String(msgBytes);
//抽出消息ID
String msgNo = resultMsgsubstring( + );
Condition msglock =(Condition) recMsgMapget(msgNo);
if(msglock ==null) {
logwarn(msgNo+序号可能已被注销!响应消息丢弃);
recMsgMapremove(msgNo);
ntinue;
}
recMsgMapput(msgNo resultMsg);
try{
locklock();
msglocksignalAll();
}finally {
lockunlock();
}
}catch(SocketException e){
logerror(服务端关闭sockete);
reConnectToCTCC();
} catch(IOException e) {
logerror(接收线程读取响应数据时发生IO流异常e);
} catch(NumberFormatException e){
logerror(收到没良心包String转int异常异常字符:+intStr);
}
}
}
}
}
以上就是对Java Socket通信技术中收发线程互斥的详细解决方法希望大家有所领悟