Skip to content

Commit c63e9e0

Browse files
gbouvhcourdent
andauthored
BP: Service pattern in Windmill (windmill-labs#442)
* BP: Service pattern in Windmill * Update links & form --------- Co-authored-by: hcourdent <henri@windmill.dev>
1 parent 1448e42 commit c63e9e0

File tree

4 files changed

+201
-1
lines changed

4 files changed

+201
-1
lines changed
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
---
2+
slug: service-script-kafka
3+
title: Service script pattern in Windmill using Kafka
4+
authors: [guillaumebouv]
5+
tags: ['Windmill', 'Kafka', 'Developer tools']
6+
image: ./wm_kafka.png
7+
description: "This example shows how to use a perpetual script to implement a service in Windmill leveraging Kafka."
8+
---
9+
import DocCard from '@site/src/components/DocCard';
10+
11+
This example shows how to use a [perpetual script](/docs/script_editor/perpetual_scripts) to implement a service in Windmill leveraging [Apache Kafka](https://kafka.apache.org/). Services are processes listening to certain events and triggering actions based on the events they received, and this is now easily achievable in Windmill.
12+
13+
First, we need a messaging service to listen to. Here we will use Kafka, but it can easily be adapted to others. In Windmill, we are going to implement a perpetual script that will listen to events coming from a Kafka topic. On every event received, the perpetual script will spin off a Windmill job with the content of the event being passed as an argument to the job.
14+
15+
For this blog post, the consumer of the event will only print the event content, but you can make it do whatever you want with it (ping a Slack channel, update a database table, etc.)
16+
17+
## Setup
18+
19+
First, we're going to set-up a stack with the following:
20+
- [Kafka](https://kafka.apache.org/) + [Zookeeper](https://zookeeper.apache.org/) to have a working Kafka instance to play with.
21+
- A Windmill cluster composed of one server and 2 [workers](/docs/core_concepts/worker_groups). We need 3 workers here to be able to run multiple jobs in parallel (the listener and the producer). If you are fine sending messages to Kafka using the [CLI](/docs/advanced/cli), then one worker will be enough.
22+
23+
We wrote a [docker-compose.yml](https://github.com/windmill-labs/windmill/blob/main/examples/deploy/kafka-job-adapter/docker-compose.yml) to easily build this stack:
24+
```
25+
docker compose up -d
26+
```
27+
28+
## Create a Kafka topic
29+
30+
The easiest is to do it via Windmill, but you can also do it with Kafka CLI. Go to [your local Windmill](http://localhost:8000) and create a [Python script](/docs/getting_started/scripts_quickstart/python) with the following content. It simply creates the topic in Kafka and returns.
31+
32+
```python
33+
from confluent_kafka.admin import AdminClient, NewTopic
34+
35+
def main(topic_name:str = "windmill-events" ):
36+
admin_client = AdminClient({'bootstrap.servers': 'kafka:9092'})
37+
38+
new_topic = NewTopic(topic_name)
39+
topic_created = admin_client.create_topics([new_topic])
40+
41+
for topic, response in topic_created.items():
42+
try:
43+
response.result()
44+
print("Topic {} created".format(topic))
45+
except Exception as e:
46+
raise Exception("Failed to create topic {}: {}".format(topic, e))
47+
```
48+
49+
You can then run this script with topic name of your choice. For the rest of this post, we will use the topic `windmill-events`.
50+
51+
<details>
52+
<summary>Want to do it from the terminal?</summary>
53+
Run the following command to create the topic from within the Kafka container:
54+
55+
```bash
56+
docker exec -it $KAFKA_CONTAINER_ID kafka-topics.sh --create --topic windmill-events --bootstrap-server localhost:9092
57+
```
58+
59+
</details>
60+
61+
## Create a topic listener in Windmill
62+
63+
As said in the intro, the purpose of this perpetual script is to listen to the `windmill-events` topic and trigger new Windmill job when a message is received. The content is quite simple:
64+
65+
```python
66+
from confluent_kafka import Consumer
67+
import wmill
68+
69+
MSG_CONSUMING_JOB_PATH = "u/admin/consume_message"
70+
71+
def main(kafka_topic: str = "windmill-events"):
72+
client = Consumer({
73+
'bootstrap.servers': 'kafka:9092',
74+
'group.id': 'windmill',
75+
'auto.offset.reset': 'earliest'
76+
})
77+
78+
client.subscribe([kafka_topic])
79+
80+
# The counter i is here to force the perpetual script to exit (and be auto-restarted by
81+
# Windmill) after some time, no matter how many messages it has processed. It's a good
82+
# practice time-bound jobs in general, and it this particular case it will avoid hitting
83+
# the maximum logs size
84+
i = 0
85+
while i < 10000:
86+
i += 1
87+
msg = client.poll(timeout=30) # timeout of 60 seconds
88+
89+
if msg is None:
90+
# print("No message after timeout. Looping")
91+
continue
92+
if msg.error():
93+
raise Exception("Consumer error: {}".format(msg.error()))
94+
95+
payload = msg.value().decode('utf-8')
96+
print('Message received ({}). Scheduling consuming job'.format(payload))
97+
wmill.run_script_async(hash_or_path=MSG_CONSUMING_JOB_PATH, args={"msg": payload})
98+
99+
client.close()
100+
return
101+
```
102+
103+
Before [deploying the script](/docs/core_concepts/draft_and_deploy), don't forget to toggle the "Perpetual Script" toggle in the [script settings](/docs/script_editor/settings). As a Perpetual script, Windmill will make sure to restart a new job every time one finishes. Here is a short tutorial on how to enable "Perpetual Scripts":
104+
105+
<iframe
106+
style={{ aspectRatio: '16/9' }}
107+
src="https://www.youtube.com/embed/5uw3JWiIFp0?vq=hd1080"
108+
title="Perpetual Scripts"
109+
frameBorder="0"
110+
allow="accelerometer; autoplay; clipboard-write; encrypted-media; gyroscope; picture-in-picture; web-share"
111+
allowFullScreen
112+
className="border-2 rounded-xl object-cover w-full dark:border-gray-800"
113+
></iframe>
114+
115+
<br/>
116+
117+
Lastly, we need to create `u/admin/consume_message` script. As said previously, here it only prints the message content:
118+
119+
```python
120+
def main(
121+
msg: str
122+
):
123+
print("A message has been received: {}".format(msg))
124+
```
125+
126+
The listener script can now be started. It will run perpetually.
127+
128+
## Publish messages to the Kafka topic
129+
130+
Finally, to prove that the above works, we need to publish messages to the Kafka topic. It can be done with Kafka CLI, but why not doing it in Windmill? Here is a script that will publish 10 messages with random sleep in between:
131+
132+
```python
133+
from confluent_kafka import Producer
134+
import wmill
135+
import random
136+
import time
137+
138+
NUMBER_OR_MSGS = 10
139+
MAX_SLEEP_SECS = 10
140+
141+
def main(kafka_topic: str = "windmill-events", msg: str = "Hello World!"):
142+
for i in range(NUMBER_OR_MSGS):
143+
sleep_secs = random.randint(0, MAX_SLEEP_SECS)
144+
print("Sleeping for {}s".format(sleep_secs))
145+
time.sleep(sleep_secs)
146+
147+
client = Producer({
148+
'bootstrap.servers': 'kafka:9092',
149+
})
150+
151+
client.poll(0)
152+
client.produce(kafka_topic, msg.encode('utf-8'), callback=delivery_callback)
153+
client.flush()
154+
return
155+
156+
def delivery_callback(err, msg):
157+
if err is not None:
158+
raise Exception('Publishing message failed: {}'.format(err))
159+
else:
160+
print('Message delivered')
161+
```
162+
163+
<details>
164+
<summary>Want to do it from the terminal?</summary>
165+
Run the following log into the Kafka container and run the `kafka-console-producer.sh` helper:
166+
167+
```bash
168+
docker exec -it $KAFKA_CONTAINER_ID kafka-console-producer.sh --topic windmill-events --bootstrap-server localhost:9092
169+
```
170+
171+
One line is one message sent to the topic.
172+
</details>
173+
174+
On every message, the listener will trigger the consuming script with the message payload, and Windmill will restart it immediately!
175+
176+
## Learn more
177+
178+
To learn more about Perpetual Scripts, you can visit our dedicated docs page:
179+
180+
<div class="grid grid-cols-2 gap-6 mb-4">
181+
<DocCard
182+
title="Running Services with Perpetual Scripts"
183+
description="Perpetual scripts restart upon ending unless canceled."
184+
href="/blog/service-script-kafka"
185+
/>
186+
</div>
Loading

docs/assets/script_editor/cancel.png

-23 KB
Loading

docs/script_editor/perpetual_scripts.md renamed to docs/script_editor/perpetual_scripts.mdx

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import DocCard from '@site/src/components/DocCard';
2+
13
# Running Services with Perpetual Scripts
24

35
Perpetual scripts restart upon ending unless canceled.
@@ -24,4 +26,16 @@ Canceling one [job](../core_concepts/20_jobs/index.mdx) from a perpetual script
2426

2527
You can also click on "Scale down to zero" in the "Current runs" tab.
2628

27-
![Scale down to zero](../assets/script_editor/scale_down_to_zero.png "Scale down to zero")
29+
![Scale down to zero](../assets/script_editor/scale_down_to_zero.png "Scale down to zero")
30+
31+
## Tutorial
32+
33+
To learn more about Perpetual Scripts, you can visit our tutorial on how to use a perpetual script to implement a service in Windmill leveraging [Apache Kafka](https://kafka.apache.org/):
34+
35+
<div class="grid grid-cols-2 gap-6 mb-4">
36+
<DocCard
37+
title="Running Services with Perpetual Scripts"
38+
description="Perpetual scripts restart upon ending unless canceled."
39+
href="/docs/script_editor/perpetual_scripts"
40+
/>
41+
</div>

0 commit comments

Comments
 (0)