2015-08-17 17:00:26 -07:00
< ? php
namespace GuzzleHttp ;
2015-08-27 12:03:05 -07:00
use GuzzleHttp\Promise\PromisorInterface ;
use Psr\Http\Message\RequestInterface ;
use GuzzleHttp\Promise\EachPromise ;
2015-08-17 17:00:26 -07:00
/**
* Sends and iterator of requests concurrently using a capped pool size .
*
2015-08-27 12:03:05 -07:00
* The pool will read from an iterator until it is cancelled or until the
* iterator is consumed . When a request is yielded , the request is sent after
* applying the " request_options " request options ( if provided in the ctor ) .
2015-08-17 17:00:26 -07:00
*
2015-08-27 12:03:05 -07:00
* When a function is yielded by the iterator , the function is provided the
* " request_options " array that should be merged on top of any existing
* options , and the function MUST then return a wait - able promise .
2015-08-17 17:00:26 -07:00
*/
2015-08-27 12:03:05 -07:00
class Pool implements PromisorInterface
2015-08-17 17:00:26 -07:00
{
2015-08-27 12:03:05 -07:00
/** @var EachPromise */
private $each ;
2015-08-17 17:00:26 -07:00
/**
* @ param ClientInterface $client Client used to send the requests .
2015-08-27 12:03:05 -07:00
* @ param array | \Iterator $requests Requests or functions that return
* requests to send concurrently .
* @ param array $config Associative array of options
* - concurrency : ( int ) Maximum number of requests to send concurrently
* - options : Array of request options to apply to each request .
* - fulfilled : ( callable ) Function to invoke when a request completes .
* - rejected : ( callable ) Function to invoke when a request is rejected .
2015-08-17 17:00:26 -07:00
*/
public function __construct (
ClientInterface $client ,
$requests ,
2015-08-27 12:03:05 -07:00
array $config = []
2015-08-17 17:00:26 -07:00
) {
2015-08-27 12:03:05 -07:00
// Backwards compatibility.
if ( isset ( $config [ 'pool_size' ])) {
$config [ 'concurrency' ] = $config [ 'pool_size' ];
} elseif ( ! isset ( $config [ 'concurrency' ])) {
$config [ 'concurrency' ] = 25 ;
}
if ( isset ( $config [ 'options' ])) {
$opts = $config [ 'options' ];
unset ( $config [ 'options' ]);
} else {
$opts = [];
}
$iterable = \GuzzleHttp\Promise\iter_for ( $requests );
$requests = function () use ( $iterable , $client , $opts ) {
foreach ( $iterable as $rfn ) {
if ( $rfn instanceof RequestInterface ) {
yield $client -> sendAsync ( $rfn , $opts );
} elseif ( is_callable ( $rfn )) {
yield $rfn ( $opts );
} else {
throw new \InvalidArgumentException ( 'Each value yielded by '
. 'the iterator must be a Psr7\Http\Message\RequestInterface '
. 'or a callable that returns a promise that fulfills '
. 'with a Psr7\Message\Http\ResponseInterface object.' );
}
}
};
$this -> each = new EachPromise ( $requests (), $config );
}
public function promise ()
{
return $this -> each -> promise ();
2015-08-17 17:00:26 -07:00
}
/**
2015-08-27 12:03:05 -07:00
* Sends multiple requests concurrently and returns an array of responses
2015-08-17 17:00:26 -07:00
* and exceptions that uses the same ordering as the provided requests .
*
* IMPORTANT : This method keeps every request and response in memory , and
* as such , is NOT recommended when sending a large number or an
* indeterminate number of requests concurrently .
*
* @ param ClientInterface $client Client used to send the requests
2015-08-27 12:03:05 -07:00
* @ param array | \Iterator $requests Requests to send concurrently .
2015-08-17 17:00:26 -07:00
* @ param array $options Passes through the options available in
* { @ see GuzzleHttp\Pool :: __construct }
*
2015-08-27 12:03:05 -07:00
* @ return array Returns an array containing the response or an exception
* in the same order that the requests were sent .
2015-08-17 17:00:26 -07:00
* @ throws \InvalidArgumentException if the event format is incorrect .
*/
public static function batch (
ClientInterface $client ,
$requests ,
array $options = []
) {
2015-08-27 12:03:05 -07:00
$res = [];
self :: cmpCallback ( $options , 'fulfilled' , $res );
self :: cmpCallback ( $options , 'rejected' , $res );
$pool = new static ( $client , $requests , $options );
$pool -> promise () -> wait ();
ksort ( $res );
return $res ;
2015-08-17 17:00:26 -07:00
}
2015-08-27 12:03:05 -07:00
private static function cmpCallback ( array & $options , $name , array & $results )
2015-08-17 17:00:26 -07:00
{
2015-08-27 12:03:05 -07:00
if ( ! isset ( $options [ $name ])) {
$options [ $name ] = function ( $v , $k ) use ( & $results ) {
$results [ $k ] = $v ;
};
} else {
$currentFn = $options [ $name ];
$options [ $name ] = function ( $v , $k ) use ( & $results , $currentFn ) {
$currentFn ( $v , $k );
$results [ $k ] = $v ;
};
2015-08-17 17:00:26 -07:00
}
}
}