如何解决JavaSocket通信技术收发线程互斥的问题

本篇内容介绍了“如何解决Java Socket通信技术收发线程互斥的问题”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

创新互联长期为超过千家客户提供的网站建设服务,团队从业经验10年,关注不同地域、不同群体,并针对不同对象提供差异化的产品和服务;打造开放共赢平台,与合作伙伴共同营造健康的互联网生态环境。为曲阜企业提供专业的成都网站制作、成都网站设计,曲阜网站改版等技术服务。拥有十余年丰富建站经验和众多成功案例,为您定制开发。

Java Socket通信技术在很长的时间里都在使用,在不少的程序员眼中都有很多高的评价。那么下面我们就看看如何才能掌握这门复杂的编程语言,希望大家在今后的Java Socket通信技术使用中有所收获。

下面就是Java Socket通信技术在解决收发线程互斥的代码介绍。

  1. package com.bill99.svr;   

  2. import java.io.IOException;   

  3. import java.io.InputStream;   

  4. import java.io.OutputStream;   

  5. import java.net.InetSocketAddress;   

  6. import java.net.Socket;   

  7. import java.net.SocketException;   

  8. import java.net.SocketTimeoutException;   

  9. import java.text.SimpleDateFormat;   

  10. import java.util.Date;   

  11. import java.util.Properties;   

  12. import java.util.Timer;   

  13. import java.util.TimerTask;   

  14. import java.util.concurrent.ConcurrentHashMap;   

  15. import java.util.concurrent.TimeUnit;   

  16. import java.util.concurrent.locks.Condition;   

  17. import java.util.concurrent.locks.ReentrantLock;   

  18. import org.apache.log4j.Logger;   

  19. /**   

  20. *

    title: socket通信包装类

       

  21. *

    Description: 

       

  22. *

    CopyRight: CopyRight (c) 2009

       

  23. *

    Company: 99bill.com

       

  24. *

    Create date: 2009-10-14

       

  25. *author sunnylocus   

  26.  * v0.10 2009-10-14 初类   

  27. * v0.11 2009-11-12 对命令收发逻辑及收发线程互斥机制进行了优化,
    处理命令速度由原来8~16个/秒提高到25~32个/秒   

  28. */ public class SocketConnection {   

  29. private volatile Socket socket;   

  30. private int timeout = 1000*10; //超时时间,初始值10秒   

  31. private boolean isLaunchHeartcheck = false;//是否已启动心跳检测   

  32. private boolean isNetworkConnect = false; //网络是否已连接   

  33. private static String host = "";   

  34. private static int port;   

  35. static InputStream inStream = null;   

  36. static OutputStream outStream = null;   

  37. private static Logger log =Logger.getLogger
    (SocketConnection.class);   

  38. private static SocketConnection socketConnection = null;   

  39. private static java.util.Timer heartTimer=null;     

  40. //private final Map recMsgMap= Collections.
    synchronizedMap(new HashMap());   

  41. private final ConcurrentHashMap recMsgMap 
    = new ConcurrentHashMap();   

  42. private static Thread receiveThread = null;   

  43. private final ReentrantLock lock = new ReentrantLock();   

  44. private SocketConnection(){   

  45. Properties conf = new Properties();   

  46. try {   

  47. conf.load(SocketConnection.class.getResourceAsStream
    ("test.conf"));   

  48. this.timeout = Integer.valueOf(conf.getProperty("timeout"));   

  49. init(conf.getProperty("ip"),Integer.valueOf
    (conf.getProperty("port")));   

  50. } catch(IOException e) {   

  51. log.fatal("socket初始化异常!",e);   

  52. throw new RuntimeException("socket初始化异常,请检查配置参数");   

  53. }   

  54. }   

  55. /**   

  56. * 单态模式   

  57. */   

  58. public static SocketConnection getInstance() {   

  59. if(socketConnection==null) {   

  60. synchronized(SocketConnection.class) {   

  61. if(socketConnection==null) {   

  62. socketConnection = new SocketConnection();   

  63. return socketConnection;   

  64. }   

  65. }   

  66. }   

  67. return socketConnection;   

  68. }   

  69. private void init(String host,int port) throws IOException {   

  70. InetSocketAddress addr = new InetSocketAddress(host,port);   

  71. socket = new Socket();   

  72. synchronized (this) {   

  73. log.info("【准备与"+addr+"建立连接】");   

  74. socket.connect(addr, timeout);   

  75. log.info("【与"+addr+"连接已建立】");   

  76. inStream = socket.getInputStream();   

  77. outStream = socket.getOutputStream();   

  78. socket.setTcpNoDelay(true);//数据不作缓冲,立即发送   

  79. socket.setSoLinger(true, 0);//socket关闭时,立即释放资源   

  80. socket.setKeepAlive(true);   

  81. socket.setTrafficClass(0x04|0x10);//高可靠性和最小延迟传输   

  82. isNetworkConnect=true;   

  83. receiveThread = new Thread(new ReceiveWorker());   

  84. receiveThread.start();   

  85. SocketConnection.host=host;   

  86. SocketConnection.port=port;   

  87. if(!isLaunchHeartcheck)   

  88. launchHeartcheck();   

  89. }   

  90. }   

  91. /**   

  92. * 心跳包检测   

  93. */   

  94. private void launchHeartcheck() {   

  95. if(socket == null)   

  96. throw new IllegalStateException("socket is not 
    established!");   

  97. heartTimer = new Timer();   

  98. isLaunchHeartcheck = true;   

  99. heartTimer.schedule(new TimerTask() {   

  100. public void run() {   

  101. String msgStreamNo = StreamNoGenerator.getStreamNo("kq");   

  102. int mstType =9999;//999-心跳包请求   

  103. SimpleDateFormat dateformate = new SimpleDateFormat
    ("yyyyMMddHHmmss");   

  104. String msgDateTime = dateformate.format(new Date());   

  105. int msgLength =38;//消息头长度   

  106. String commandstr = "00" +msgLength + mstType + msgStreamNo;   

  107. log.info("心跳检测包 -> IVR "+commandstr);   

  108. int reconnCounter = 1;   

  109. while(true) {   

  110. String responseMsg =null;   

  111. try {   

  112. responseMsg = readReqMsg(commandstr);   

  113. } catch (IOException e) {   

  114. log.error("IO流异常",e);   

  115. reconnCounter ++;   

  116. }   

  117. if(responseMsg!=null) {   

  118. log.info("心跳响应包 <- IVR "+responseMsg);   

  119. reconnCounter = 1;   

  120. break;   

  121. } else {   

  122. reconnCounter ++;   

  123. }   

  124. if(reconnCounter >3) {//重连次数已达三次,判定网络连接中断,
    重新建立连接。连接未被建立时不释放锁   

  125. reConnectToCTCC(); break;   

  126. }   

  127. }   

  128. }   

  129. },1000 * 60*1,1000*60*2);   

  130. }   

  131. /**   

  132. * 重连与目标IP建立重连   

  133. */   

  134. private void reConnectToCTCC() {   

  135. new Thread(new Runnable(){   

  136. public void run(){   

  137. log.info("重新建立与"+host+":"+port+"的连接");   

  138. //清理工作,中断计时器,中断接收线程,恢复初始变量   

  139. heartTimer.cancel();   

  140. isLaunchHeartcheck=false;   

  141. isNetworkConnect = false;   

  142. receiveThread.interrupt();   

  143. try {   

  144. socket.close();   

  145. } catch (IOException e1) {log.error("重连时,关闭socket连
    接发生IO流异常",e1);}   

  146. //----------------   

  147. synchronized(this){   

  148. for(; ;){   

  149. try {   

  150. Thread.currentThread();   

  151. Thread.sleep(1000 * 1);   

  152. init(host,port);   

  153. this.notifyAll();   

  154. break ;   

  155. } catch (IOException e) {   

  156. log.error("重新建立连接未成功",e);   

  157. } catch (InterruptedException e){   

  158. log.error("重连线程中断",e);   

  159. }   

  160. }   

  161. }   

  162. }   

  163. }).start();   

  164. }   

  165. /**   

  166. * 发送命令并接受响应   

  167. * @param requestMsg   

  168. * @return   

  169. * @throws SocketTimeoutException   

  170. * @throws IOException   

  171. */   

  172. public String readReqMsg(String requestMsg) throws IOException {   

  173. if(requestMsg ==null) {   

  174. return null;   

  175. }   

  176. if(!isNetworkConnect) {   

  177. synchronized(this){   

  178. try {   

  179. this.wait(1000*5); //等待5秒,如果网络还没有恢复,抛出IO流异常   

  180. if(!isNetworkConnect) {   

  181. throw new IOException("网络连接中断!");   

  182. }   

  183. } catch (InterruptedException e) {   

  184. log.error("发送线程中断",e);   

  185. }   

  186. }   

  187. }   

  188. String msgNo = requestMsg.substring(8, 8 + 24);//读取流水号   

  189. outStream = socket.getOutputStream();   

  190. outStream.write(requestMsg.getBytes());   

  191. outStream.flush();   

  192. Condition msglock = lock.newCondition(); //消息锁   

  193. //注册等待接收消息   

  194. recMsgMap.put(msgNo, msglock);   

  195. try {   

  196. lock.lock();   

  197. msglock.await(timeout,TimeUnit.MILLISECONDS);   

  198. } catch (InterruptedException e) {   

  199. log.error("发送线程中断",e);   

  200. } finally {   

  201. lock.unlock();   

  202. }   

  203. Object respMsg = recMsgMap.remove(msgNo); //响应信息   

  204. if(respMsg!=null &&(respMsg != msglock)) {   

  205. //已经接收到消息,注销等待,成功返回消息   

  206. return (String) respMsg;   

  207. } else {   

  208. log.error(msgNo+" 超时,未收到响应消息");   

  209. throw new SocketTimeoutException(msgNo+" 超时,未收到响应消息");   

  210. }   

  211. }   

  212. public void finalize() {   

  213. if (socket != null) {   

  214. try {   

  215. socket.close();   

  216. } catch (IOException e) {   

  217. e.printStackTrace();   

  218. }   

  219. }   

  220. }   

  221. //消息接收线程   

  222. private class ReceiveWorker implements Runnable {   

  223. String intStr= null;   

  224. public void run() {   

  225. while(!Thread.interrupted()){   

  226. try {   

  227. byte[] headBytes = new byte[4];   

  228. if(inStream.read(headBytes)==-1){   

  229. log.warn("读到流未尾,对方已关闭流!");   

  230. reConnectToCTCC();//读到流未尾,对方已关闭流   

  231. return;   

  232. }   

  233. byte[] tmp =new byte[4];   

  234. tmp = headBytes;   

  235. String tempStr = new String(tmp).trim();   

  236. if(tempStr==null || tempStr.equals("")) {   

  237. log.error("received message is null");   

  238. continue;   

  239. }   

  240. intStr = new String(tmp);   

  241. int totalLength =Integer.parseInt(intStr);   

  242. //----------------   

  243. byte[] msgBytes = new byte[totalLength-4];   

  244. inStream.read(msgBytes);   

  245. String resultMsg = new String(headBytes)+ new 
    String(msgBytes);   

  246. //抽出消息ID   

  247. String msgNo = resultMsg.substring(8, 8 + 24);   

  248. Condition msglock =(Condition) recMsgMap.get(msgNo);   

  249. if(msglock ==null) {   

  250. log.warn(msgNo+"序号可能已被注销!响应消息丢弃");   

  251. recMsgMap.remove(msgNo);   

  252. continue;   

  253. }   

  254. recMsgMap.put(msgNo, resultMsg);   

  255. try{   

  256. lock.lock();   

  257. msglock.signalAll();   

  258. }finally {   

  259. lock.unlock();   

  260. }   

  261. }catch(SocketException e){   

  262. log.error("服务端关闭socket",e);   

  263. reConnectToCTCC();   

  264. } catch(IOException e) {   

  265. log.error("接收线程读取响应数据时发生IO流异常",e);   

  266. } catch(NumberFormatException e){   

  267. log.error("收到没良心包,String转int异常,异常字符:"+intStr);   

  268. }   

  269. }   

  270. }   

  271. }   

“如何解决Java Socket通信技术收发线程互斥的问题”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!


文章标题:如何解决JavaSocket通信技术收发线程互斥的问题
URL标题:http://pcwzsj.com/article/pjdgsc.html