Skip to content

Commit ce59300

Browse files
committed
Add a RdKafka caster to Var-Dumper
1 parent b350c80 commit ce59300

File tree

5 files changed

+482
-0
lines changed

5 files changed

+482
-0
lines changed

.travis.yml

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,20 @@ before_install:
6262
docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 -p 7006:7006 -p 7007:7007 -e "STANDALONE=true" --name redis-cluster grokzen/redis-cluster:5.0.4
6363
export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005'
6464
65+
- |
66+
# Start Kafka and install an up-to-date librdkafka
67+
docker network create kafka_network
68+
docker pull wurstmeister/zookeeper:3.4.6
69+
docker run -d --network kafka_network --name zookeeper wurstmeister/zookeeper:3.4.6
70+
docker pull wurstmeister/kafka:2.12-2.3.1
71+
docker run -d -p 9092:9092 --network kafka_network -e "KAFKA_AUTO_CREATE_TOPICS_ENABLE=true" -e "KAFKA_ADVERTISED_HOST_NAME=kafka" -e "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181" -e "KAFKA_ADVERTISED_PORT=9092" --name kafka wurstmeister/kafka:2.12-2.3.1
72+
export KAFKA_BROKER=kafka:9092
73+
sudo sh -c 'echo "\n127.0.0.1 kafka\n" >> /etc/hosts'
74+
75+
mkdir /tmp/librdkafka
76+
curl https://codeload.github.com/edenhill/librdkafka/tar.gz/v0.11.6 | tar xzf - -C /tmp/librdkafka
77+
(cd /tmp/librdkafka/librdkafka-0.11.6 && ./configure && make && sudo make install)
78+
6579
- |
6680
# General configuration
6781
set -e
@@ -175,6 +189,7 @@ before_install:
175189
tfold ext.igbinary tpecl igbinary-2.0.8 igbinary.so $INI
176190
tfold ext.zookeeper tpecl zookeeper-0.7.1 zookeeper.so $INI
177191
tfold ext.amqp tpecl amqp-1.9.4 amqp.so $INI
192+
tfold ext.rdkafka tpecl rdkafka-4.0.2 rdkafka.so $INI
178193
tfold ext.redis tpecl redis-4.3.0 redis.so $INI "no"
179194
done
180195
- |

src/Symfony/Component/VarDumper/CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
CHANGELOG
22
=========
33

4+
5.1.0
5+
-----
6+
7+
* added `RdKafka` support
8+
49
4.4.0
510
-----
611

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\VarDumper\Caster;
13+
14+
use RdKafka;
15+
use RdKafka\Conf;
16+
use RdKafka\Exception as RdKafkaException;
17+
use RdKafka\KafkaConsumer;
18+
use RdKafka\Message;
19+
use RdKafka\Metadata\Broker as BrokerMetadata;
20+
use RdKafka\Metadata\Partition as PartitionMetadata;
21+
use RdKafka\Metadata\Topic as TopicMetadata;
22+
use RdKafka\Topic;
23+
use RdKafka\TopicConf;
24+
use RdKafka\TopicPartition;
25+
use Symfony\Component\VarDumper\Cloner\Stub;
26+
27+
/**
28+
* Casts RdKafka related classes to array representation.
29+
*
30+
* @author Romain Neutron <imprec@gmail.com>
31+
*
32+
* @experimental
33+
*/
34+
class RdKafkaCaster
35+
{
36+
public static function castKafkaConsumer(KafkaConsumer $c, array $a, Stub $stub, $isNested)
37+
{
38+
$prefix = Caster::PREFIX_VIRTUAL;
39+
40+
try {
41+
$assignment = $c->getAssignment();
42+
} catch (RdKafkaException $e) {
43+
$assignment = [];
44+
}
45+
46+
$a += [
47+
$prefix.'subscription' => $c->getSubscription(),
48+
$prefix.'assignment' => self::prepareAssignment($assignment),
49+
];
50+
51+
$a += self::extractMetadata($c);
52+
53+
return $a;
54+
}
55+
56+
public static function castTopic(Topic $c, array $a, Stub $stub, $isNested)
57+
{
58+
$prefix = Caster::PREFIX_VIRTUAL;
59+
60+
$a += [
61+
$prefix.'name' => $c->getName(),
62+
];
63+
64+
return $a;
65+
}
66+
67+
public static function castTopicPartition(TopicPartition $c, array $a)
68+
{
69+
$prefix = Caster::PREFIX_VIRTUAL;
70+
71+
$a += [
72+
$prefix.'offset' => $c->getOffset(),
73+
$prefix.'partition' => $c->getPartition(),
74+
$prefix.'topic' => $c->getTopic(),
75+
];
76+
77+
return $a;
78+
}
79+
80+
public static function castMessage(Message $c, array $a, Stub $stub, $isNested)
81+
{
82+
$prefix = Caster::PREFIX_VIRTUAL;
83+
84+
$a += [
85+
$prefix.'errstr' => $c->errstr(),
86+
];
87+
88+
return $a;
89+
}
90+
91+
public static function castConf(Conf $c, array $a, Stub $stub, $isNested)
92+
{
93+
$prefix = Caster::PREFIX_VIRTUAL;
94+
95+
foreach ($c->dump() as $key => $value) {
96+
$a[$prefix.$key] = $value;
97+
}
98+
99+
return $a;
100+
}
101+
102+
public static function castTopicConf(TopicConf $c, array $a, Stub $stub, $isNested)
103+
{
104+
$prefix = Caster::PREFIX_VIRTUAL;
105+
106+
foreach ($c->dump() as $key => $value) {
107+
$a[$prefix.$key] = $value;
108+
}
109+
110+
return $a;
111+
}
112+
113+
public static function castRdKafka(\RdKafka $c, array $a, Stub $stub, $isNested)
114+
{
115+
$prefix = Caster::PREFIX_VIRTUAL;
116+
117+
$a += [
118+
$prefix.'out_q_len' => $c->getOutQLen(),
119+
];
120+
121+
$a += self::extractMetadata($c);
122+
123+
return $a;
124+
}
125+
126+
private static function prepareAssignment(iterable $partitions)
127+
{
128+
return array_map(function (TopicPartition $topicPartition) {
129+
return self::castTopicPartition($topicPartition, []);
130+
}, $partitions);
131+
}
132+
133+
private static function extractMetadata($c)
134+
{
135+
try {
136+
$m = $c->getMetadata(true, null, 500);
137+
} catch (RdKafkaException $e) {
138+
$m = null;
139+
}
140+
141+
if (!$m) {
142+
return [];
143+
}
144+
145+
return [
146+
'orig_broker_id' => $m->getOrigBrokerId(),
147+
'orig_broker_name' => $m->getOrigBrokerName(),
148+
'brokers' => array_map(function (BrokerMetadata $broker) {
149+
return [
150+
'id' => $broker->getId(),
151+
'host' => $broker->getHost(),
152+
'port' => $broker->getPort(),
153+
];
154+
}, iterator_to_array($m->getBrokers())),
155+
'topics' => array_map(function (TopicMetadata $topic) {
156+
return [
157+
'name' => $topic->getTopic(),
158+
'partitions' => array_map(function (PartitionMetadata $partition) {
159+
return [
160+
'id' => $partition->getId(),
161+
'err' => $partition->getErr(),
162+
'leader' => $partition->getLeader(),
163+
];
164+
}, iterator_to_array($topic->getPartitions())),
165+
];
166+
}, iterator_to_array($m->getTopics())),
167+
];
168+
}
169+
}

src/Symfony/Component/VarDumper/Cloner/AbstractCloner.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,14 @@ abstract class AbstractCloner implements ClonerInterface
160160
':persistent stream' => ['Symfony\Component\VarDumper\Caster\ResourceCaster', 'castStream'],
161161
':stream-context' => ['Symfony\Component\VarDumper\Caster\ResourceCaster', 'castStreamContext'],
162162
':xml' => ['Symfony\Component\VarDumper\Caster\XmlResourceCaster', 'castXml'],
163+
164+
'RdKafka' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castRdKafka'],
165+
'RdKafka\Conf' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castConf'],
166+
'RdKafka\KafkaConsumer' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castKafkaConsumer'],
167+
'RdKafka\Message' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castMessage'],
168+
'RdKafka\Topic' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopic'],
169+
'RdKafka\TopicPartition' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopicPartition'],
170+
'RdKafka\TopicConf' => ['Symfony\Component\VarDumper\Caster\RdKafkaCaster', 'castTopicConf'],
163171
];
164172

165173
protected $maxItems = 2500;

0 commit comments

Comments
 (0)