Skip to content

[HttpClient] always yield a LastChunk in AsyncResponse on destruction #37482

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 135 additions & 110 deletions src/Symfony/Component/HttpClient/Response/AsyncResponse.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
use Symfony\Component\HttpClient\Exception\TransportException;
use Symfony\Contracts\HttpClient\ChunkInterface;
use Symfony\Contracts\HttpClient\Exception\ExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\HttpExceptionInterface;
use Symfony\Contracts\HttpClient\Exception\TransportExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;
Expand Down Expand Up @@ -69,7 +70,7 @@ public function getHeaders(bool $throw = true): array
$headers = $this->response->getHeaders(false);

if ($throw) {
$this->checkStatusCode($this->getInfo('http_code'));
$this->checkStatusCode();
}

return $headers;
Expand Down Expand Up @@ -126,31 +127,44 @@ public function cancel(): void
return;
}

$context = new AsyncContext($this->passthru, $client, $this->response, $this->info, $this->content, $this->offset);
if (null === $stream = ($this->passthru)(new LastChunk(), $context)) {
return;
}
try {
foreach (self::passthru($client, $this, new LastChunk()) as $chunk) {
// no-op
}

if (!$stream instanceof \Iterator) {
throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
$this->passthru = null;
} catch (ExceptionInterface $e) {
// ignore any errors when canceling
}
}

try {
foreach ($stream as $chunk) {
if ($chunk->isLast()) {
break;
}
public function __destruct()
{
$httpException = null;

if ($this->initializer && null === $this->getInfo('error')) {
try {
$this->getHeaders(true);
} catch (HttpExceptionInterface $httpException) {
// no-op
}
}

$stream->next();
if ($this->passthru && null === $this->getInfo('error')) {
$this->info['canceled'] = true;
$this->info['error'] = 'Response has been canceled.';

if ($stream->valid()) {
throw new \LogicException('A chunk passthru cannot yield after the last chunk.');
try {
foreach (self::passthru($this->client, $this, new LastChunk()) as $chunk) {
// no-op
}
} catch (ExceptionInterface $e) {
// ignore any errors when destructing
}
}

$stream = $this->passthru = null;
} catch (ExceptionInterface $e) {
// ignore any errors when canceling
if (null !== $httpException) {
throw $httpException;
}
}

Expand Down Expand Up @@ -201,124 +215,135 @@ public static function stream(iterable $responses, float $timeout = null, string
continue;
}

$context = new AsyncContext($r->passthru, $r->client, $r->response, $r->info, $r->content, $r->offset);
if (null === $stream = ($r->passthru)($chunk, $context)) {
if ($r->response === $response && (null !== $chunk->getError() || $chunk->isLast())) {
throw new \LogicException('A chunk passthru cannot swallow the last chunk.');
}
foreach (self::passthru($r->client, $r, $chunk, $asyncMap) as $chunk) {
yield $r => $chunk;
}

continue;
if ($r->response !== $response && isset($asyncMap[$response])) {
break;
}
$chunk = null;
}

if (!$stream instanceof \Iterator) {
throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
if (null === $chunk->getError() && !$chunk->isLast() && $r->response === $response && null !== $r->client) {
throw new \LogicException('A chunk passthru must yield an "isLast()" chunk before ending a stream.');
}

$responses = [];
foreach ($asyncMap as $response) {
$r = $asyncMap[$response];

if (null !== $r->client) {
$responses[] = $asyncMap[$response];
}
}
}
}

while (true) {
try {
if (null !== $chunk) {
$stream->next();
}

if (!$stream->valid()) {
break;
}
} catch (\Throwable $e) {
$r->info['error'] = $e->getMessage();
$r->response->cancel();

yield $r => $chunk = new ErrorChunk($r->offset, $e);
$chunk->didThrow() ?: $chunk->getContent();
unset($asyncMap[$response]);
break;
}
private static function passthru(HttpClientInterface $client, self $r, ChunkInterface $chunk, \SplObjectStorage $asyncMap = null): \Generator
{
$response = $r->response;
$context = new AsyncContext($r->passthru, $client, $r->response, $r->info, $r->content, $r->offset);
if (null === $stream = ($r->passthru)($chunk, $context)) {
if ($r->response === $response && (null !== $chunk->getError() || $chunk->isLast())) {
throw new \LogicException('A chunk passthru cannot swallow the last chunk.');
}

$chunk = $stream->current();
return;
}
$chunk = null;

if (!$chunk instanceof ChunkInterface) {
throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk)));
}
if (!$stream instanceof \Iterator) {
throw new \LogicException(sprintf('A chunk passthru must return an "Iterator", "%s" returned.', get_debug_type($stream)));
}

if (null !== $chunk->getError()) {
// no-op
} elseif ($chunk->isFirst()) {
$e = $r->openBuffer();

yield $r => $chunk;

if (null === $e) {
continue;
}

$r->response->cancel();
$chunk = new ErrorChunk($r->offset, $e);
} elseif ('' !== $content = $chunk->getContent()) {
if (null !== $r->shouldBuffer) {
throw new \LogicException('A chunk passthru must yield an "isFirst()" chunk before any content chunk.');
}

if (null !== $r->content && \strlen($content) !== fwrite($r->content, $content)) {
$chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content))));
$r->info['error'] = $chunk->getError();
$r->response->cancel();
}
}
while (true) {
try {
if (null !== $chunk) {
$stream->next();
}

if (null === $chunk->getError()) {
$r->offset += \strlen($content);
if (!$stream->valid()) {
break;
}
} catch (\Throwable $e) {
$r->info['error'] = $e->getMessage();
$r->response->cancel();

yield $r => $chunk;
yield $r => $chunk = new ErrorChunk($r->offset, $e);
$chunk->didThrow() ?: $chunk->getContent();
unset($asyncMap[$response]);
break;
}

if (!$chunk->isLast()) {
continue;
}
$chunk = $stream->current();

$stream->next();
if (!$chunk instanceof ChunkInterface) {
throw new \LogicException(sprintf('A chunk passthru must yield instances of "%s", "%s" yielded.', ChunkInterface::class, get_debug_type($chunk)));
}

if ($stream->valid()) {
throw new \LogicException('A chunk passthru cannot yield after an "isLast()" chunk.');
}
if (null !== $chunk->getError()) {
// no-op
} elseif ($chunk->isFirst()) {
$e = $r->openBuffer();

$r->passthru = null;
} else {
if ($chunk instanceof ErrorChunk) {
$chunk->didThrow(false);
} else {
try {
$chunk = new ErrorChunk($chunk->getOffset(), !$chunk->isTimeout() ?: $chunk->getError());
} catch (TransportExceptionInterface $e) {
$chunk = new ErrorChunk($chunk->getOffset(), $e);
}
}
yield $r => $chunk;

yield $r => $chunk;
$chunk->didThrow() ?: $chunk->getContent();
}
if ($r->initializer && null === $r->getInfo('error')) {
// Ensure the HTTP status code is always checked
$r->getHeaders(true);
}

unset($asyncMap[$response]);
break;
if (null === $e) {
continue;
}

$stream = $context = null;
$r->response->cancel();
$chunk = new ErrorChunk($r->offset, $e);
} elseif ('' !== $content = $chunk->getContent()) {
if (null !== $r->shouldBuffer) {
throw new \LogicException('A chunk passthru must yield an "isFirst()" chunk before any content chunk.');
}

if ($r->response !== $response && isset($asyncMap[$response])) {
break;
if (null !== $r->content && \strlen($content) !== fwrite($r->content, $content)) {
$chunk = new ErrorChunk($r->offset, new TransportException(sprintf('Failed writing %d bytes to the response buffer.', \strlen($content))));
$r->info['error'] = $chunk->getError();
$r->response->cancel();
}
}

if (null === $chunk->getError() && !$chunk->isLast() && $r->response === $response && null !== $r->client) {
throw new \LogicException('A chunk passthru must yield an "isLast()" chunk before ending a stream.');
}
if (null === $chunk->getError()) {
$r->offset += \strlen($content);

$responses = [];
foreach ($asyncMap as $response) {
$r = $asyncMap[$response];
yield $r => $chunk;

if (null !== $r->client) {
$responses[] = $asyncMap[$response];
if (!$chunk->isLast()) {
continue;
}

$stream->next();

if ($stream->valid()) {
throw new \LogicException('A chunk passthru cannot yield after an "isLast()" chunk.');
}

$r->passthru = null;
} else {
if ($chunk instanceof ErrorChunk) {
$chunk->didThrow(false);
} else {
try {
$chunk = new ErrorChunk($chunk->getOffset(), !$chunk->isTimeout() ?: $chunk->getError());
} catch (TransportExceptionInterface $e) {
$chunk = new ErrorChunk($chunk->getOffset(), $e);
}
}

yield $r => $chunk;
$chunk->didThrow() ?: $chunk->getContent();
}

unset($asyncMap[$response]);
break;
}
}

Expand Down
19 changes: 19 additions & 0 deletions src/Symfony/Component/HttpClient/Tests/AsyncDecoratorTraitTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Symfony\Component\HttpClient\Response\AsyncContext;
use Symfony\Component\HttpClient\Response\AsyncResponse;
use Symfony\Contracts\HttpClient\ChunkInterface;
use Symfony\Contracts\HttpClient\Exception\ClientExceptionInterface;
use Symfony\Contracts\HttpClient\HttpClientInterface;
use Symfony\Contracts\HttpClient\ResponseInterface;

Expand Down Expand Up @@ -163,4 +164,22 @@ public function testProcessingHappensOnce()
$this->assertTrue($chunk->isLast());
$this->assertSame(1, $lastChunks);
}

public function testLastChunkIsYieldOnHttpExceptionAtDestructTime()
{
$lastChunk = null;
$client = $this->getHttpClient(__FUNCTION__, function (ChunkInterface $chunk, AsyncContext $context) use (&$lastChunk) {
$lastChunk = $chunk;

yield $chunk;
});

try {
$client->request('GET', 'http://localhost:8057/404');
$this->fail(ClientExceptionInterface::class.' expected');
} catch (ClientExceptionInterface $e) {
}

$this->assertTrue($lastChunk->isLast());
}
}