Browse Source

并发多线程完成

tbphp 2 years ago
parent
commit
7bb1a06817
2 changed files with 29 additions and 20 deletions
  1. 26 17
      Swoole/Service.php
  2. 3 3
      sftp-config.json

+ 26 - 17
Swoole/Service.php

@@ -10,8 +10,8 @@ class Service
 {
     public function run()
     {
-        $this->worker(10);
         $this->master();
+        $this->worker(10);
         while (true) {
             sleep(1000);
         }
@@ -20,18 +20,14 @@ class Service
     private function master()
     {
         $master = new Process(function ($p) {
-            // while (true) {
-
-            for ($i = 1; $i <= 1000; $i++) {
+            for ($i = 1; $i <= 999; $i++) {
                 // 有未处理的事务,阻塞等等所有队列完成
                 while ($p->statQueue()['queue_num'] != 0) {
                     usleep(1);
                 };
-                o($i);
                 $url = 'http://wiki.swoole.com/wiki/page/' . $i . '.html';
                 $p->push($url);
             }
-            // }
         });
         $master->useQueue();
         $master->start();
@@ -39,30 +35,39 @@ class Service
 
     private function worker($work_num = 5)
     {
-        $url = '';
         for ($i = 0; $i < $work_num; $i++) {
             $work = new Process(function ($p) {
+                $max  = 50; // 单进程最大curl并发数
                 $curl = curl_multi_init();
                 while (true) {
-                    while (($exec = curl_multi_exec($curl, $else)) == CURLM_CALL_MULTI_PERFORM);
+
+                    // 如果执行错误,循环阻塞
+                    while (($exec = curl_multi_exec($curl, $else)) == CURLM_CALL_MULTI_PERFORM) {
+                        usleep(1);
+                    };
+
+                    // 错误跳出
                     if ($exec != CURLM_OK) {
                         break;
                     }
 
                     // 数据处理
                     while ($data = curl_multi_info_read($curl)) {
-                        $this->dostr($url, curl_multi_getcontent($data['handle']));
+                        $this->doHandle($data['handle']);
                         curl_multi_remove_handle($curl, $data['handle']);
                     }
 
                     if ($else > 0) {
                         curl_multi_select($curl);
+                        // 小于最大并发,可持续接收任务
+                        if ($else < $max && $p->statQueue()['queue_num'] > 0) {
+                            $this->addUrl($curl, $p->pop());
+                        }
                     } else {
-                        // o('#' . $p->pid . ' 空闲');
-                        $url = $p->pop();
-                        // o('#' . $p->pid . ' 接收到任务:' . $url);
-                        $this->addUrl($curl, $url);
+                        o('#' . $p->pid . ' 空闲');
+                        $this->addUrl($curl, $p->pop());
                     }
+
                     usleep(1000);
                 }
                 curl_multi_close($curl);
@@ -91,11 +96,15 @@ class Service
         curl_multi_add_handle($curl, $c);
     }
 
-    private function dostr($url, $body)
+    private function doHandle($handle)
     {
-        preg_match('#<title>(.*?)</title>#i', $body, $match);
-        if (isset($match[1])) {
-            Log::info(str_pad($url, 43) . ' ' . $match[1]);
+        $body = curl_multi_getcontent($handle);
+        $info = curl_getinfo($handle);
+        if ($info['http_code'] == 200) {
+            preg_match('#<title>(.*?)</title>#i', $body, $match);
+            if (isset($match[1])) {
+                Log::info(str_pad($info['url'], 42) . $match[1]);
+            }
         }
     }
 }

+ 3 - 3
sftp-config.json

@@ -2,10 +2,10 @@
     "confirm_downloads": false,
     "confirm_overwrite_newer": false,
     "confirm_sync": false,
-    "host": "192.168.5.45",
-    "password": "426351Tb",
+    "host": "192.168.1.5",
+    "password": "426351tB",
     "port": "22",
-    "remote_path": "/var/www/html/swoole-spider",
+    "remote_path": "/var/www/swoole-spider",
     "sync_skip_deletes": false,
     "type": "sftp",
     "upload_on_save": true,