Skip to content

Commit bb9251a

Browse files
committed
bug #32413 [Messenger] fix publishing headers set on AmqpStamp (Tobion)
This PR was merged into the 4.3 branch. Discussion ---------- [Messenger] fix publishing headers set on AmqpStamp | Q | A | ------------- | --- | Branch? | 4.3 | Bug fix? | yes | New feature? | no <!-- please update src/**/CHANGELOG.md files --> | BC breaks? | no <!-- see https://symfony.com/bc --> | Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files --> | Tests pass? | yes <!-- please add some, will be required by reviewers --> | Fixed tickets | | License | MIT | Doc PR | Dispatching a message using an AmqpStamp and setting extra headers with it didn't work. We need to merge the `$attribute['headers']`, not the attributes itself as that will ignore the stamp headers because it's not recursive. I also made the AmqpStamp a NonSendableStampInterface because it's pointless to serialize the stamp because the stamp already set's the attributes for publishing. Commits ------- 50b3ec4 [Messenger] fix publishing headers set on AmqpStamp
2 parents 330b77c + 50b3ec4 commit bb9251a

File tree

3 files changed

+24
-10
lines changed

3 files changed

+24
-10
lines changed

src/Symfony/Component/Messenger/Tests/Transport/AmqpExt/ConnectionTest.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,21 @@ public function testObfuscatePasswordInDsn()
418418
$connection->channel();
419419
}
420420

421+
public function testAmqpStampHeadersAreUsed()
422+
{
423+
$factory = new TestAmqpFactory(
424+
$this->createMock(\AMQPConnection::class),
425+
$this->createMock(\AMQPChannel::class),
426+
$this->createMock(\AMQPQueue::class),
427+
$amqpExchange = $this->createMock(\AMQPExchange::class)
428+
);
429+
430+
$amqpExchange->expects($this->once())->method('publish')->with('body', null, AMQP_NOPARAM, ['headers' => ['Foo' => 'X', 'Bar' => 'Y']]);
431+
432+
$connection = Connection::fromDsn('amqp://localhost', [], $factory);
433+
$connection->publish('body', ['Foo' => 'X'], 0, new AmqpStamp(null, AMQP_NOPARAM, ['headers' => ['Bar' => 'Y']]));
434+
}
435+
421436
public function testItCanPublishWithTheDefaultRoutingKey()
422437
{
423438
$factory = new TestAmqpFactory(

src/Symfony/Component/Messenger/Transport/AmqpExt/AmqpStamp.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@
1111

1212
namespace Symfony\Component\Messenger\Transport\AmqpExt;
1313

14-
use Symfony\Component\Messenger\Stamp\StampInterface;
14+
use Symfony\Component\Messenger\Stamp\NonSendableStampInterface;
1515

1616
/**
1717
* @author Guillaume Gammelin <ggammelin@gmail.com>
1818
* @author Samuel Roze <samuel.roze@gmail.com>
1919
*
2020
* @experimental in 4.3
2121
*/
22-
final class AmqpStamp implements StampInterface
22+
final class AmqpStamp implements NonSendableStampInterface
2323
{
2424
private $routingKey;
2525
private $flags;

src/Symfony/Component/Messenger/Transport/AmqpExt/Connection.php

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,7 @@ public function publish(string $body, array $headers = [], int $delay = 0, AmqpS
191191
$this->exchange(),
192192
$body,
193193
$this->getRoutingKeyForMessage($amqpStamp),
194-
[
195-
'headers' => $headers,
196-
],
194+
$headers,
197195
$amqpStamp
198196
);
199197
}
@@ -223,20 +221,21 @@ private function publishWithDelay(string $body, array $headers, int $delay, Amqp
223221
$this->getDelayExchange(),
224222
$body,
225223
$this->getRoutingKeyForDelay($delay, $routingKey),
226-
[
227-
'headers' => $headers,
228-
],
224+
$headers,
229225
$amqpStamp
230226
);
231227
}
232228

233-
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $attributes = [], AmqpStamp $amqpStamp = null)
229+
private function publishOnExchange(\AMQPExchange $exchange, string $body, string $routingKey = null, array $headers = [], AmqpStamp $amqpStamp = null)
234230
{
231+
$attributes = $amqpStamp ? $amqpStamp->getAttributes() : [];
232+
$attributes['headers'] = array_merge($headers, $attributes['headers'] ?? []);
233+
235234
$exchange->publish(
236235
$body,
237236
$routingKey,
238237
$amqpStamp ? $amqpStamp->getFlags() : AMQP_NOPARAM,
239-
array_merge($amqpStamp ? $amqpStamp->getAttributes() : [], $attributes)
238+
$attributes
240239
);
241240
}
242241

0 commit comments

Comments
 (0)