网页主动探测工具-使用Reactor模式

接前文
http://blog.itpub.net/29254281/viewspace-1344706/
http://blog.itpub.net/29254281/viewspace-1347985/
http://blog.itpub.net/29254281/viewspace-2134876/

之前的代码被大神怼了..被怒批 杂乱无章,误人子弟
我估计主要是因为面向对象的程度不够.过程化太明显了

在网上找了一个Reactor模式的例子,又改了改自己的程序.
因为Oracle太笨重了,这回干脆换了MySQL好了

改写之后的程序,在我的电脑上,使用 2线程,最大500连接的配置,性能最好。

  1. import java.io.IOException;  
  2. import java.net.InetSocketAddress;  
  3. import java.net.SocketAddress;  
  4. import java.nio.ByteBuffer;  
  5. import java.nio.channels.SelectionKey;  
  6. import java.nio.channels.Selector;  
  7. import java.nio.channels.SocketChannel;  
  8. import java.nio.charset.Charset;  
  9. import java.sql.Connection;  
  10. import java.sql.DriverManager;  
  11. import java.sql.PreparedStatement;  
  12. import java.sql.SQLException;  
  13. import java.sql.Timestamp;  
  14. import java.util.ArrayList;  
  15. import java.util.HashSet;  
  16. import java.util.Iterator;  
  17. import java.util.List;  
  18. import java.util.Set;  
  19. import java.util.concurrent.BlockingQueue;  
  20. import java.util.concurrent.LinkedBlockingQueue;  
  21. import java.util.concurrent.atomic.AtomicInteger;  
  22. import java.util.regex.Matcher;  
  23. import java.util.regex.Pattern;  
  24.   
  25. class Reactor implements Runnable {  
  26.     public static int GETCOUNT() {  
  27.         return COUNT.get();  
  28.   
  29.     }  
  30.   
  31.     public static int getQueueSize() {  
  32.         return QUEUE.size();  
  33.     }  
  34.   
  35.     private static final AtomicInteger COUNT = new AtomicInteger();  
  36.     private static final AtomicInteger TASKCOUNT = new AtomicInteger();  
  37.   
  38.     public int startTask() {  
  39.         return TASKCOUNT.incrementAndGet();  
  40.     }  
  41.   
  42.     public int finishTask() {  
  43.         return TASKCOUNT.decrementAndGet();  
  44.     }  
  45.   
  46.     public int incrementAndGet() {  
  47.         return COUNT.incrementAndGet();  
  48.     }  
  49.   
  50.     public final Selector selector;  
  51.     private static BlockingQueue QUEUE = new LinkedBlockingQueue();  
  52.   
  53.     public void addTask(Task task) {  
  54.         try {  
  55.             QUEUE.put(task);  
  56.         } catch (InterruptedException e) {  
  57.             e.printStackTrace();  
  58.         }  
  59.     }  
  60.   
  61.     public Reactor() throws IOException {  
  62.         selector = Selector.open();  
  63.     }  
  64.   
  65.     @Override  
  66.     public void run() {  
  67.         try {  
  68.             while (!Thread.interrupted()) {  
  69.                 int maxClient = 500;  
  70.                 Task task = null;  
  71.                 if (TASKCOUNT.get() < maxClient) {  
  72.                     while ((task = (Task) QUEUE.poll()) != null) {  
  73.                         new Connector(this, task).run();  
  74.                         if (TASKCOUNT.get() > maxClient) {  
  75.                             break;  
  76.                         }  
  77.                     }  
  78.                 }  
  79.   
  80.                 selector.select();  
  81.   
  82.                 Set selectionKeys = selector.selectedKeys();  
  83.                 Iterator it = selectionKeys.iterator();  
  84.                 // Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。  
  85.                 while (it.hasNext()) {  
  86.                     // 来一个事件 第一次触发一个accepter线程  
  87.                     // 以后触发SocketReadHandler  
  88.                     SelectionKey selectionKey = it.next();  
  89.                     dispatch(selectionKey);  
  90.                 }  
  91.                 selectionKeys.clear();  
  92.             }  
  93.         } catch (IOException e) {  
  94.             e.printStackTrace();  
  95.         }  
  96.     }  
  97.   
  98.     /** 
  99.      * 运行Acceptor或SocketReadHandler 
  100.      *  
  101.      * @param key 
  102.      */  
  103.     void dispatch(SelectionKey key) {  
  104.         Runnable r = (Runnable) (key.attachment());  
  105.         if (r != null) {  
  106.             r.run();  
  107.         }  
  108.     }  
  109.   
  110. }  
  111.   
  112. class Connector implements Runnable {  
  113.     private Reactor reactor;  
  114.     private Task task;  
  115.   
  116.     public Connector(Reactor reactor, Task task) {  
  117.         this.reactor = reactor;  
  118.         this.task = task;  
  119.     }  
  120.   
  121.     @Override  
  122.     public void run() {  
  123.         try {  
  124.             reactor.startTask();  
  125.             task.setStarttime(System.currentTimeMillis());  
  126.             SocketAddress addr = new InetSocketAddress(task.getHost(), 80);  
  127.             SocketChannel socketChannel = SocketChannel.open();  
  128.             socketChannel.configureBlocking(false);  
  129.             socketChannel.connect(addr);  
  130.   
  131.             BaseHandler base = new BaseHandler();  
  132.             base.setTask(task);  
  133.             base.setSelector(reactor.selector);  
  134.             base.setSocketChannel(socketChannel);  
  135.             base.setReactor(reactor);  
  136.             if (socketChannel != null)// 调用Handler来处理channel  
  137.                 socketChannel.register(reactor.selector, SelectionKey.OP_CONNECT, new SocketWriteHandler(base));  
  138.         } catch (IOException e) {  
  139.             e.printStackTrace();  
  140.         }  
  141.     }  
  142. }  
  143.   
  144. class BaseHandler {  
  145.     private Selector selector;  
  146.     private SocketChannel socketChannel;  
  147.     private Task task;  
  148.     private ByteBuffer byteBuffer = ByteBuffer.allocate(2400);  
  149.     private Reactor reactor;  
  150.   
  151.     public Reactor getReactor() {  
  152.         return reactor;  
  153.     }  
  154.   
  155.     public void setReactor(Reactor reactor) {  
  156.         this.reactor = reactor;  
  157.     }  
  158.   
  159.     public Selector getSelector() {  
  160.         return selector;  
  161.     }  
  162.   
  163.     public void setSelector(Selector selector) {  
  164.         this.selector = selector;  
  165.     }  
  166.   
  167.     public SocketChannel getSocketChannel() {  
  168.         return socketChannel;  
  169.     }  
  170.   
  171.     public void setSocketChannel(SocketChannel socketChannel) {  
  172.         this.socketChannel = socketChannel;  
  173.     }  
  174.   
  175.     public Task getTask() {  
  176.         return task;  
  177.     }  
  178.   
  179.     public void setTask(Task task) {  
  180.         this.task = task;  
  181.     }  
  182.   
  183.     public ByteBuffer getByteBuffer() {  
  184.         return byteBuffer;  
  185.     }  
  186. }  
  187.   
  188. class SocketWriteHandler implements Runnable {  
  189.     BaseHandler baseHandler;  
  190.   
  191.     public SocketWriteHandler(BaseHandler baseHandler) {  
  192.         this.baseHandler = baseHandler;  
  193.         ByteBuffer byteBuffer = baseHandler.getByteBuffer();  
  194.         Task task = baseHandler.getTask();  
  195.         try {  
  196.             byteBuffer.put(("GET " + task.getCurrentPath() + " HTTP/1.0\r\n").getBytes("utf8"));  
  197.             byteBuffer.put(("HOST:" + task.getHost() + "\r\n").getBytes("utf8"));  
  198.             byteBuffer.put(("Accept:*/*\r\n").getBytes("utf8"));  
  199.             byteBuffer.put(("\r\n").getBytes("utf8"));  
  200.             byteBuffer.flip();  
  201.         } catch (IOException e) {  
  202.             e.printStackTrace();  
  203.         }  
  204.     }  
  205.   
  206.     @Override  
  207.     public void run() {  
  208.         try {  
  209.             while (!baseHandler.getSocketChannel().finishConnect()) {  
  210.                 System.out.println("Waiting Connected");  
  211.             }  
  212.   
  213.             baseHandler.getSocketChannel().write(baseHandler.getByteBuffer());  
  214.   
  215.             if (baseHandler.getByteBuffer().hasRemaining()) {  
  216.                 baseHandler.getByteBuffer().compact();  
  217.                 baseHandler.getSocketChannel().register(baseHandler.getSelector(), SelectionKey.OP_WRITE, this);  
  218.                 System.out.println("Continue Write");  
  219.             } else {  
  220.                 baseHandler.getSocketChannel().register(baseHandler.getSelector(), SelectionKey.OP_READ,  
  221.                         new SocketReadHandler(baseHandler));  
  222.                 baseHandler.getByteBuffer().clear();  
  223.             }  
  224.         } catch (IOException e) {  
  225.             e.printStackTrace();  
  226.         }  
  227.     }  
  228.   
  229. }  
  230.   
  231. class SocketReadHandler implements Runnable {  
  232.     Charset charset = Charset.forName("utf8");  
  233.     Charset gbkcharset = Charset.forName("gbk");  
  234.     BaseHandler baseHandler;  
  235.   
  236.     public SocketReadHandler(BaseHandler baseHandler) {  
  237.         this.baseHandler = baseHandler;  
  238.     }  
  239.   
  240.     @Override  
  241.     public void run() {  
  242.         try {  
  243.             SocketChannel channel = baseHandler.getSocketChannel();  
  244.             ByteBuffer byteBuffer = baseHandler.getByteBuffer();  
  245.             Task task = baseHandler.getTask();  
  246.   
  247.             int length;  
  248.             while ((length = channel.read(byteBuffer)) > 0) {  
  249.                 byteBuffer.flip();  
  250.                 task.getContent().append(charset.decode(charset.encode(gbkcharset.decode(byteBuffer))).toString());  
  251.   
  252.                 byteBuffer.compact();  
  253.             }  
  254.             if (length == -1) {  
  255.                 channel.close();  
  256.                 task.setEndtime(System.currentTimeMillis());  
  257.                 baseHandler.getReactor().incrementAndGet();  
  258.                 baseHandler.getReactor().finishTask();  
  259.                 new ParseHandler(task, baseHandler.getReactor()).run();  
  260.   
  261.             } else {  
  262.                 baseHandler.getSocketChannel().register(baseHandler.getSelector(), SelectionKey.OP_READ, this);  
  263.             }  
  264.         } catch (IOException e) {  
  265.             e.printStackTrace();  
  266.         }  
  267.   
  268.     }  
  269.   
  270. }  
  271.   
  272. public class Probe {  
  273.     public static void main(String[] args) throws IOException, InterruptedException {  
  274.         for (int i = 0; i <2; i++) {  
  275.             Reactor reactor = new Reactor();  
  276.             reactor.addTask(new Task("news.163.com", 80, "/index.html"));  
  277.             new Thread(reactor, "ReactorThread_" + i).start();  
  278.         }  
  279.         long start = System.currentTimeMillis();  
  280.         while (true) {  
  281.             Thread.sleep(1000);  
  282.             long end = System.currentTimeMillis();  
  283.             float interval = ((end - start) / 1000);  
  284.             int connectTotal = Reactor.GETCOUNT();  
  285.   
  286.             int persistenceTotal = PersistenceHandler.GETCOUNT();  
  287.   
  288.             int connectps = Math.round(connectTotal / interval);  
  289.             int persistenceps = Math.round(persistenceTotal / interval);  
  290.             System.out.print("\r连接总数:" + connectTotal + " \t每秒连接:" + connectps + "\t连接队列剩余:" + Reactor.getQueueSize()  
  291.                     + " \t持久化总数:" + persistenceTotal + " \t每秒持久化:" + persistenceps + "\t持久化队列剩余:"  
  292.                     + PersistenceHandler.getInstance().getSize());  
  293.         }  
  294.     }  
  295.   
  296. }  
  297.   
  298. class Task {  
  299.     private String host;  
  300.     private int port;  
  301.     private String currentPath;  
  302.     private long starttime;  
  303.     private long endtime;  
  304.   
  305.     private String type;  
  306.     private StringBuilder content = new StringBuilder(2400);  
  307.     private int state;  
  308.     private boolean isValid = true;  
  309.   
  310.     public Task() {  
  311.     }  
  312.   
  313.     public Task(String host, int port, String path) {  
  314.         init(host, port, path);  
  315.     }  
  316.   
  317.     public void init(String host, int port, String path) {  
  318.         this.setCurrentPath(path);  
  319.         this.host = host;  
  320.         this.port = port;  
  321.     }  
  322.   
  323.     public long getStarttime() {  
  324.         return starttime;  
  325.     }  
  326.   
  327.     public void setStarttime(long starttime) {  
  328.         this.starttime = starttime;  
  329.     }  
  330.   
  331.     public long getEndtime() {  
  332.         return endtime;  
  333.     }  
  334.   
  335.     public void setEndtime(long endtime) {  
  336.         this.endtime = endtime;  
  337.     }  
  338.   
  339.     public boolean isValid() {  
  340.         return isValid;  
  341.     }  
  342.   
  343.     public void setValid(boolean isValid) {  
  344.         this.isValid = isValid;  
  345.     }  
  346.   
  347.     public int getState() {  
  348.         return state;  
  349.     }  
  350.   
  351.     public void setState(int state) {  
  352.         this.state = state;  
  353.     }  
  354.   
  355.     public String getCurrentPath() {  
  356.         return currentPath;  
  357.     }  
  358.   
  359.     public void setCurrentPath(String currentPath) {  
  360.         this.currentPath = currentPath;  
  361.         int i = 0;  
  362.         if (currentPath.indexOf("?") != -1) {  
  363.             i = currentPath.indexOf("?");  
  364.         } else {  
  365.             if (currentPath.indexOf("#") != -1) {  
  366.                 i = currentPath.indexOf("#");  
  367.             } else {  
  368.                 i = currentPath.length();  
  369.             }  
  370.         }  
  371.         this.type = currentPath.substring(currentPath.indexOf(".") + 1, i);  
  372.     }  
  373.   
  374.     public long getTaskTime() {  
  375.         return getEndtime() - getStarttime();  
  376.     }  
  377.   
  378.     public String getType() {  
  379.         return type;  
  380.     }  
  381.   
  382.     public void setType(String type) {  
  383.         this.type = type;  
  384.     }  
  385.   
  386.     public String getHost() {  
  387.         return host;  
  388.     }  
  389.   
  390.     public int getPort() {  
  391.         return port;  
  392.     }  
  393.   
  394.     public StringBuilder getContent() {  
  395.         return content;  
  396.     }  
  397.   
  398.     public void setContent(StringBuilder content) {  
  399.         this.content = content;  
  400.     }  
  401.   
  402. }  
  403.   
  404. class ParseHandler implements Runnable {  
  405.     private static final Set SET = new HashSet();  
  406.   
  407.     PersistenceHandler persistencehandler = PersistenceHandler.getInstance();  
  408.   
  409.     List domainlist = new ArrayList();  
  410.   
  411.     Task task;  
  412.   
  413.     private interface Filter {  
  414.         void doFilter(Task fatherTask, Task newTask, String path, Filter chain);  
  415.     }  
  416.   
  417.     private class FilterChain implements Filter {  
  418.         private List list = new ArrayList();  
  419.   
  420.         {  
  421.             addFilter(new TwoLevel());  
  422.             addFilter(new OneLevel());  
  423.             addFilter(new FullPath());  
  424.             addFilter(new Root());  
  425.             addFilter(new Default());  
  426.         }  
  427.   
  428.         private void addFilter(Filter filter) {  
  429.             list.add(filter);  
  430.         }  
  431.   
  432.         private Iterator it = list.iterator();  
  433.   
  434.         @Override  
  435.         public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {  
  436.             if (it.hasNext()) {  
  437.                 ((Filter) it.next()).doFilter(fatherTask, newTask, path, chain);  
  438.             }  
  439.         }  
  440.   
  441.     }  
  442.   
  443.     private class TwoLevel implements Filter {  
  444.   
  445.         @Override  
  446.         public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {  
  447.             if (path.startsWith("../../")) {  
  448.                 String prefix = getPrefix(fatherTask.getCurrentPath(), 3);  
  449.                 newTask.init(fatherTask.getHost(), fatherTask.getPort(), path.replace("../../", prefix));  
  450.             } else {  
  451.                 chain.doFilter(fatherTask, newTask, path, chain);  
  452.             }  
  453.   
  454.         }  
  455.     }  
  456.   
  457.     private class OneLevel implements Filter {  
  458.   
  459.         @Override  
  460.         public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {  
  461.             if (path.startsWith("../")) {  
  462.                 String prefix = getPrefix(fatherTask.getCurrentPath(), 2);  
  463.                 newTask.init(fatherTask.getHost(), fatherTask.getPort(), path.replace("../", prefix));  
  464.             } else {  
  465.                 chain.doFilter(fatherTask, newTask, path, chain);  
  466.             }  
  467.   
  468.         }  
  469.   
  470.     }  
  471.   
  472.     private class FullPath implements Filter {  
  473.   
  474.         @Override  
  475.         public void doFilter(Task fatherTask, Task newTask, String path, Filter chain) {  
  476.             if (path.startsWith("http://")) {  
  477.                 Iterator it = domainlist.iterator();  
  478.                 boolean flag = false;  
  479.                 while (it.hasNext()) {  
  480.                     
    新闻标题:网页主动探测工具-使用Reactor模式
    文章链接:http://pcwzsj.com/article/ipsjcp.html