人気ブログランキング | 話題のタグを見る

ぬるぽを見かけたら 全力でぶっ叩くのみ


by Denullpo Smasher Hammerson
カレンダー
S M T W T F S
1 2
3 4 5 6 7 8 9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31

PHPでマルチプロセス(俺流)

PHPで複数のプロセスを並列動作させるとき、proc_open‍()を使うというところ
までは容易に調べられるであろう。
(プレビューでopen‍()の部分が引っかかった。なんか面倒な仕様になったもんだ。)
が、実際これで並列動作させるコード書いてみると、大抵ある箇所で壁にぶち当たる。
サブプロセスのstdoutやstderrから結果を拾ってこようとすると、その結果が
得られるまで動作が止まってしまい、非同期処理が実現できなくなる。
この問題を解決する模範解答は存在する(stream_select()デスヨ)のだが、
フレームワーク的に面倒なものがある。

そんなわけで別ルート。
"stdoutやstderrから結果を拾ってくるときに動作が止まる"
という現象さえ回避できれば解決なのだ。
動作が止まるのは、stdoutやstderrに書き込まれた量より多くの出力を得ようとして
指定量に達するまで待ってしまうことにある。即ち、書き込み済みのstdoutやstderrの
量を捕捉できさえすれば、そのぶんだけ指定することで待ち時間をなくすことができる。

そのためには、stdoutやstderrの出力をファイルに書き出せばよい。
そうすればファイルサイズから書き込み済みの量が判明する。
で、PHPにはtmpfile()という便利なファンクションがある。
・重複しない適当な名前でファイルを生成してくれる
・このファイルを読み書き両用でopenしてくれる
・サブプロセスに書き込ませながら結果を読み出すことができる
・用済みになったら自動的にファイルを消してくれる
要件ぴったし。




<?php

// パイプの代用品
class FakePipe{
private $h; // 書き込みハンドル
private $wl; // 書き込み済量
private $rl; // 読み出し済量

public function gethandle(){return $this->h;}

public function __construct(){
$this->rl=0;
$this->wl=0;
$this->h=tmpfile();
// 効果あるかよくわからんけど一応(ぉ
stream_set_blocking($this->h,0);
}

public function __destruct(){
if(!$this->isclosed())$this->close();
}

// close済か調べる
public function isclosed(){
return is_null($this->h);
}

// 用済みになったら呼ぶ
public function close(){
if(!is_null($this->h)){
fclose($this->h);
$this->h=null;
}
}

// 未読部分を読み出す
public function read(){
if(is_null($this->h))return '';

// 終端を調べる
fseek($this->h,0,SEEK_END);
$this->wl=ftell($this->h);
if($this->rl>=$this->wl)return '';
// 未読地点から終端まで読み出す
fseek($this->h,$this->rl,SEEK_SET);
$s=fread($this->h,$this->wl-$this->rl);
$this->rl+=strlen($s);
return $s;
}
}

// サブプロセス
class SubProc{
private $ip; // stdinパイプ
private $op; // stdoutパイプ
private $ep; // stderrパイプ
private $ob; // stdoutバッファ
private $eb; // stderrバッファ
private $proc; // プロセス
private $eoi; // stdin終了済
private $eoo; // stdout,stderr終了済
private $eop; // プロセス終了済

public function __construct($cmd,$inp=false){
$this->eoi=false;
$this->eoo=false;
$this->eop=false;
$this->ob='';
$this->eb='';
$this->op=new FakePipe();
$this->ep=new FakePipe();
$desc=array(
0=>array('FakePipe','r'), // for stdin
1=>$this->op->gethandle(), // for stdout
2=>$this->ep->gethandle(), // for stderr
);
$pipe=array();
$cwd=getcwd();
$this->proc=proc_open‍($cmd,$desc,$pipe,$cwd);
$this->ip=$pipe[0];
// stdinが不要なら、ここで閉じておく
if(!$inp)$this->endwrite();
}

public function __destruct(){
$this->endwrite();
$this->endread();
if(!is_null($this->proc)){
$a=proc_get_status($this->proc);
if($a['running'])proc_terminate($this->proc);
proc_close($this->proc);
}
}

// stdinに渡す
public function writein($s){
if($this->eoi)return;
fwrite($this->ip,$s);
}
// stdinを閉じる
public function endwrite(){
if($this->eoi)return;
$this->eoi=true;
fclose($this->ip);
}

// stdoutから全部読み出す
public function readout(){
$s=$this->ob;
$this->ob='';
return $s;
}
// stderrから全部読み出す
public function readerr(){
$s=$this->eb;
$this->eb='';
return $s;
}
// 1行ずつ読み出す内部処理
private function readline(&$buf){
// \r \n \r\n 全対応ということで
$r=strpos($buf,"\r");
$n=strpos($buf,"\n");
$s=null;
if($r===false){
if($n===false){
// close済のときだけ、改行なしでも最終行として吐き出す
if(!$this->eoo)return null;
$s=$buf;
$buf='';
if($s=='')return null;
}
// \nが改行
$s=substr($buf,0,$n);
$buf=substr($buf,$n+1);
}
else{
if($r+1==$n){
// \r\nが改行
$s=substr($buf,0,$r);
$buf=substr($buf,$n+1);
}
else{
// \rが改行
$s=substr($buf,0,$r);
$buf=substr($buf,$r+1);
}
}
return $s;
}
// stdoutから1行ずつ読み出す
public function readoutline(){return $this->readline($this->ob);}
// stderrから1行ずつ読み出す
public function readerrline(){return $this->readline($this->eb);}
// stdout,stderrを閉じる
public function endread(){
if($this->eoo)return;
$this->op->close();
$this->ep->close();
$this->eoo=true;
}

// プロセス現況確認
public function poll(){
if(!$this->eop){
// stdout,stderr拾ってくる
$this->ob.=$this->op->read();
$this->eb.=$this->ep->read();
// 終わってるか調べる
$a=proc_get_status($this->proc);
if($a['running'])return true;
$this->eop=true;
}
else if(!$this->eoo){
$this->endread();
}
return !$this->eoo;
}
}

// stdinを使うプロセスのテスト
$pitest=new SubProc('sort',true);
// いろいろ並列動作テスト
$procs=array(
new SubProc('ping www.excite.co.jp'),
new SubProc('ping www.llanfairpwllgwyngyllgogerychwyrndrobwllllantysiliogogogoch.com'),
new SubProc('xcopy'),
$pitest,
);

$q=12;
do{
// stdinに書き込んでみる
if($q>0){
$pitest->writein(strval($q)."\n");
$q--;
// 完了したらパイプを閉じないとプロセスが終わらないことに注意
if($q==0)$pitest->endwrite();
}

// 各プロセスのstdout,stderr受け取って表示
$f=false;
foreach($procs as $n=>$p){
if($p->poll())$f=true;
// stdoutから読み出して表示
while(1){
$o=$p->readoutline();
if(is_null($o))break;
if($o!='')echo "[$n:$o]\n";
}
// stderrから読み出して表示
while(1){
$e=$p->readerrline();
if(is_null($e))break;
if($e!='')echo "($n:$e)\n";
}
}

// メインスレッドはある程度休ませとかないとサブプロセスに
// 実行権が渡りにくくなるので適当に
usleep(250000);
}while($f); // 1つでも未完了のサブプロセスがある限り繰り返し

echo "- Finish -\n";
?>


これを実行すると、こんな感じ。
4つのプロセスが同時進行し、出力をリアルタイムで受け取れていることがわかる。


[0:www.excite.co.jp [180.235.96.18]に ping を送信しています 32 バイトのデータ:]
[0:180.235.96.18 からの応答: バイト数 =32 時間 =12ms TTL=113]
[2:0 個のファイルをコピーしました]
(2:無効なパラメーターの数です)
[1:www.llanfairpwllgwyngyllgogerychwyrndrobwllllantysiliogogogoch.com [62.128.198.95]に ping を送信しています 32 バイトのデータ:]
[1:62.128.198.95 からの応答: バイト数 =32 時間 =282ms TTL=241]
[0:180.235.96.18 からの応答: バイト数 =32 時間 =12ms TTL=113]
[1:62.128.198.95 からの応答: バイト数 =32 時間 =278ms TTL=241]
[0:180.235.96.18 からの応答: バイト数 =32 時間 =12ms TTL=113]
[1:62.128.198.95 からの応答: バイト数 =32 時間 =284ms TTL=241]
[3:1]
[3:10]
[3:11]
[3:12]
[3:2]
[3:3]
[3:4]
[3:5]
[3:6]
[3:7]
[3:8]
[3:9]
[0:180.235.96.18 からの応答: バイト数 =32 時間 =12ms TTL=113]
[0:180.235.96.18 の ping 統計:]
[0: パケット数: 送信 = 4、受信 = 4、損失 = 0 (0% の損失)、]
[0:ラウンド トリップの概算時間 (ミリ秒):]
[0: 最小 = 12ms、最大 = 12ms、平均 = 12ms]
[1:62.128.198.95 からの応答: バイト数 =32 時間 =276ms TTL=241]
[1:62.128.198.95 の ping 統計:]
[1: パケット数: 送信 = 4、受信 = 4、損失 = 0 (0% の損失)、]
[1:ラウンド トリップの概算時間 (ミリ秒):]
[1: 最小 = 276ms、最大 = 284ms、平均 = 280ms]
- Finish -

by denullpo | 2011-01-27 03:42 | こっち関係