|
| 1 | +--- |
| 2 | +slug: service-script-kafka |
| 3 | +title: Service script pattern in Windmill using Kafka |
| 4 | +authors: [guillaumebouv] |
| 5 | +tags: ['Windmill', 'Kafka', 'Developer tools'] |
| 6 | +image: ./wm_kafka.png |
| 7 | +description: "This example shows how to use a perpetual script to implement a service in Windmill leveraging Kafka." |
| 8 | +--- |
| 9 | +import DocCard from '@site/src/components/DocCard'; |
| 10 | + |
| 11 | +This example shows how to use a [perpetual script](/docs/script_editor/perpetual_scripts) to implement a service in Windmill leveraging [Apache Kafka](https://kafka.apache.org/). Services are processes listening to certain events and triggering actions based on the events they received, and this is now easily achievable in Windmill. |
| 12 | + |
| 13 | +First, we need a messaging service to listen to. Here we will use Kafka, but it can easily be adapted to others. In Windmill, we are going to implement a perpetual script that will listen to events coming from a Kafka topic. On every event received, the perpetual script will spin off a Windmill job with the content of the event being passed as an argument to the job. |
| 14 | + |
| 15 | +For this blog post, the consumer of the event will only print the event content, but you can make it do whatever you want with it (ping a Slack channel, update a database table, etc.) |
| 16 | + |
| 17 | +## Setup |
| 18 | + |
| 19 | +First, we're going to set-up a stack with the following: |
| 20 | +- [Kafka](https://kafka.apache.org/) + [Zookeeper](https://zookeeper.apache.org/) to have a working Kafka instance to play with. |
| 21 | +- A Windmill cluster composed of one server and 2 [workers](/docs/core_concepts/worker_groups). We need 3 workers here to be able to run multiple jobs in parallel (the listener and the producer). If you are fine sending messages to Kafka using the [CLI](/docs/advanced/cli), then one worker will be enough. |
| 22 | + |
| 23 | +We wrote a [docker-compose.yml](https://github.com/windmill-labs/windmill/blob/main/examples/deploy/kafka-job-adapter/docker-compose.yml) to easily build this stack: |
| 24 | +``` |
| 25 | +docker compose up -d |
| 26 | +``` |
| 27 | + |
| 28 | +## Create a Kafka topic |
| 29 | + |
| 30 | +The easiest is to do it via Windmill, but you can also do it with Kafka CLI. Go to [your local Windmill](http://localhost:8000) and create a [Python script](/docs/getting_started/scripts_quickstart/python) with the following content. It simply creates the topic in Kafka and returns. |
| 31 | + |
| 32 | +```python |
| 33 | +from confluent_kafka.admin import AdminClient, NewTopic |
| 34 | + |
| 35 | +def main(topic_name:str = "windmill-events" ): |
| 36 | + admin_client = AdminClient({'bootstrap.servers': 'kafka:9092'}) |
| 37 | + |
| 38 | + new_topic = NewTopic(topic_name) |
| 39 | + topic_created = admin_client.create_topics([new_topic]) |
| 40 | + |
| 41 | + for topic, response in topic_created.items(): |
| 42 | + try: |
| 43 | + response.result() |
| 44 | + print("Topic {} created".format(topic)) |
| 45 | + except Exception as e: |
| 46 | + raise Exception("Failed to create topic {}: {}".format(topic, e)) |
| 47 | +``` |
| 48 | + |
| 49 | +You can then run this script with topic name of your choice. For the rest of this post, we will use the topic `windmill-events`. |
| 50 | + |
| 51 | +<details> |
| 52 | +<summary>Want to do it from the terminal?</summary> |
| 53 | +Run the following command to create the topic from within the Kafka container: |
| 54 | + |
| 55 | +```bash |
| 56 | +docker exec -it $KAFKA_CONTAINER_ID kafka-topics.sh --create --topic windmill-events --bootstrap-server localhost:9092 |
| 57 | +``` |
| 58 | + |
| 59 | +</details> |
| 60 | + |
| 61 | +## Create a topic listener in Windmill |
| 62 | + |
| 63 | +As said in the intro, the purpose of this perpetual script is to listen to the `windmill-events` topic and trigger new Windmill job when a message is received. The content is quite simple: |
| 64 | + |
| 65 | +```python |
| 66 | +from confluent_kafka import Consumer |
| 67 | +import wmill |
| 68 | + |
| 69 | +MSG_CONSUMING_JOB_PATH = "u/admin/consume_message" |
| 70 | + |
| 71 | +def main(kafka_topic: str = "windmill-events"): |
| 72 | + client = Consumer({ |
| 73 | + 'bootstrap.servers': 'kafka:9092', |
| 74 | + 'group.id': 'windmill', |
| 75 | + 'auto.offset.reset': 'earliest' |
| 76 | + }) |
| 77 | + |
| 78 | + client.subscribe([kafka_topic]) |
| 79 | + |
| 80 | + # The counter i is here to force the perpetual script to exit (and be auto-restarted by |
| 81 | + # Windmill) after some time, no matter how many messages it has processed. It's a good |
| 82 | + # practice time-bound jobs in general, and it this particular case it will avoid hitting |
| 83 | + # the maximum logs size |
| 84 | + i = 0 |
| 85 | + while i < 10000: |
| 86 | + i += 1 |
| 87 | + msg = client.poll(timeout=30) # timeout of 60 seconds |
| 88 | + |
| 89 | + if msg is None: |
| 90 | + # print("No message after timeout. Looping") |
| 91 | + continue |
| 92 | + if msg.error(): |
| 93 | + raise Exception("Consumer error: {}".format(msg.error())) |
| 94 | + |
| 95 | + payload = msg.value().decode('utf-8') |
| 96 | + print('Message received ({}). Scheduling consuming job'.format(payload)) |
| 97 | + wmill.run_script_async(hash_or_path=MSG_CONSUMING_JOB_PATH, args={"msg": payload}) |
| 98 | + |
| 99 | + client.close() |
| 100 | + return |
| 101 | +``` |
| 102 | + |
| 103 | +Before [deploying the script](/docs/core_concepts/draft_and_deploy), don't forget to toggle the "Perpetual Script" toggle in the [script settings](/docs/script_editor/settings). As a Perpetual script, Windmill will make sure to restart a new job every time one finishes. Here is a short tutorial on how to enable "Perpetual Scripts": |
| 104 | + |
| 105 | +<iframe |
| 106 | + style={{ aspectRatio: '16/9' }} |
| 107 | + src="https://www.youtube.com/embed/5uw3JWiIFp0?vq=hd1080" |
| 108 | + title="Perpetual Scripts" |
| 109 | + frameBorder="0" |
| 110 | + allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share" |
| 111 | + allowFullScreen |
| 112 | + className="border-2 rounded-xl object-cover w-full dark:border-gray-800" |
| 113 | +></iframe> |
| 114 | + |
| 115 | +<br/> |
| 116 | + |
| 117 | +Lastly, we need to create `u/admin/consume_message` script. As said previously, here it only prints the message content: |
| 118 | + |
| 119 | +```python |
| 120 | +def main( |
| 121 | + msg: str |
| 122 | +): |
| 123 | + print("A message has been received: {}".format(msg)) |
| 124 | +``` |
| 125 | + |
| 126 | +The listener script can now be started. It will run perpetually. |
| 127 | + |
| 128 | +## Publish messages to the Kafka topic |
| 129 | + |
| 130 | +Finally, to prove that the above works, we need to publish messages to the Kafka topic. It can be done with Kafka CLI, but why not doing it in Windmill? Here is a script that will publish 10 messages with random sleep in between: |
| 131 | + |
| 132 | +```python |
| 133 | +from confluent_kafka import Producer |
| 134 | +import wmill |
| 135 | +import random |
| 136 | +import time |
| 137 | + |
| 138 | +NUMBER_OR_MSGS = 10 |
| 139 | +MAX_SLEEP_SECS = 10 |
| 140 | + |
| 141 | +def main(kafka_topic: str = "windmill-events", msg: str = "Hello World!"): |
| 142 | + for i in range(NUMBER_OR_MSGS): |
| 143 | + sleep_secs = random.randint(0, MAX_SLEEP_SECS) |
| 144 | + print("Sleeping for {}s".format(sleep_secs)) |
| 145 | + time.sleep(sleep_secs) |
| 146 | + |
| 147 | + client = Producer({ |
| 148 | + 'bootstrap.servers': 'kafka:9092', |
| 149 | + }) |
| 150 | + |
| 151 | + client.poll(0) |
| 152 | + client.produce(kafka_topic, msg.encode('utf-8'), callback=delivery_callback) |
| 153 | + client.flush() |
| 154 | + return |
| 155 | + |
| 156 | +def delivery_callback(err, msg): |
| 157 | + if err is not None: |
| 158 | + raise Exception('Publishing message failed: {}'.format(err)) |
| 159 | + else: |
| 160 | + print('Message delivered') |
| 161 | +``` |
| 162 | + |
| 163 | +<details> |
| 164 | +<summary>Want to do it from the terminal?</summary> |
| 165 | +Run the following log into the Kafka container and run the `kafka-console-producer.sh` helper: |
| 166 | + |
| 167 | +```bash |
| 168 | +docker exec -it $KAFKA_CONTAINER_ID kafka-console-producer.sh --topic windmill-events --bootstrap-server localhost:9092 |
| 169 | +``` |
| 170 | + |
| 171 | +One line is one message sent to the topic. |
| 172 | +</details> |
| 173 | + |
| 174 | +On every message, the listener will trigger the consuming script with the message payload, and Windmill will restart it immediately! |
| 175 | + |
| 176 | +## Learn more |
| 177 | + |
| 178 | +To learn more about Perpetual Scripts, you can visit our dedicated docs page: |
| 179 | + |
| 180 | +<div class="grid grid-cols-2 gap-6 mb-4"> |
| 181 | + <DocCard |
| 182 | + title="Running Services with Perpetual Scripts" |
| 183 | + description="Perpetual scripts restart upon ending unless canceled." |
| 184 | + href="/blog/service-script-kafka" |
| 185 | + /> |
| 186 | +</div> |
0 commit comments