Skip to content

Commit 19cc3d8

Browse files
authored
docs: Add code snippets to persistent storage page (windmill-labs#440)
1 parent 257cb8b commit 19cc3d8

File tree

2 files changed

+233
-46
lines changed
  • blog/2023-11-24-data-pipeline-orchestrator
  • docs/core_concepts/11_persistent_storage

2 files changed

+233
-46
lines changed

blog/2023-11-24-data-pipeline-orchestrator/index.mdx

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,8 @@ In Windmill, you can just do:
119119

120120
```
121121
conn = duckdb.connect()
122-
s3_resource = wmill.get_resource("/path/to/resource")
123-
conn.execute(wmill.duckdb_connection_settings(s3_resource)["connection_settings_str"])
122+
# path/to/resource arg is optional and by default the workspace s3 resource will be used
123+
conn.execute(wmill.duckdb_connection_settings("/path/to/resource")["connection_settings_str"])
124124
125125
conn.sql("SELECT * FROM read_parquet(s3://windmill_bucket/file.parquet)")
126126
```
@@ -147,8 +147,8 @@ with s3.open("s3://windmill_bucket/file.parquet", mode="rb") as f:
147147
becomes in Windmill:
148148

149149
```python
150-
s3_resource = wmill.get_resource("/path/to/resource")
151-
s3 = s3fs.S3FileSystem(**wmill.polars_connection_settings(s3_resource))
150+
# /path/to/resource arg is optional and by default the workspace s3 resource will be used
151+
s3 = s3fs.S3FileSystem(**wmill.polars_connection_settings("/path/to/resource")["s3fs_args"])
152152
with s3.open("s3://windmill_bucket/file.parquet", mode="rb") as f:
153153
dataframe = pl.read_parquet(f)
154154
```
@@ -167,7 +167,7 @@ s3object = dict
167167
def main(input_dataset: s3object):
168168
# initialization: connect Polars to the workspace bucket
169169
s3_resource = wmill.get_resource("/path/to/resource")
170-
s3 = s3fs.S3FileSystem(wmill.duckdb_connection_settings(s3_resource))
170+
s3 = s3fs.S3FileSystem(wmill.polars_connection_settings("/path/to/resource")["s3fs_args"])
171171

172172
# reading data from s3:
173173
bucket = s3_resource["bucket"]

docs/core_concepts/11_persistent_storage/index.mdx

Lines changed: 228 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import DocCard from '@site/src/components/DocCard';
2+
import Tabs from '@theme/Tabs';
3+
import TabItem from '@theme/TabItem';
24

35
# Persistent Storage
46

@@ -10,13 +12,13 @@ In the context of Windmill, the stakes are: **where to effectively store and man
1012

1113
When it comes to storing data manipulated by Windmil, it is recommended to only store Windmill-specific elements ([resources](../3_resources_and_types/index.mdx), [variables](../2_variables_and_secrets/index.mdx) etc.). To store data, it is recommended to use external storage service providers that can be accessed from Windmill.
1214

13-
<br/>
15+
<br />
1416

1517
This present document gives a list of trusted services to use alongside Windmill.
1618

1719
:::
1820

19-
<br/>
21+
<br />
2022

2123
There are 4 kinds of persistent storage in Windmill:
2224

@@ -98,7 +100,7 @@ States are what enable Flows to watch for changes in most event watching scenari
98100

99101
The convenience functions do this are:
100102

101-
*TypeScript*
103+
_TypeScript_
102104

103105
- `getState()` which retrieves an object of any type (internally a simple
104106
Resource) at a path determined by `getStatePath`, which is unique to the user
@@ -108,9 +110,9 @@ The convenience functions do this are:
108110

109111
> Please note it requires [importing](../../advanced/6_imports/index.md) the wmill client library from Deno/Bun.
110112
111-
<br/>
113+
<br />
112114

113-
*Python*
115+
_Python_
114116

115117
- `get_state()` which retrieves an object of any type (internally a simple
116118
Resource) at a path determined by `get_state_path`, which is unique to the user
@@ -120,14 +122,14 @@ The convenience functions do this are:
120122

121123
> Please note it requires [importing](../../advanced/6_imports/index.md) the wmill client library from Python.
122124
123-
<br/>
125+
<br />
124126

125127
<div class="grid grid-cols-2 gap-6 mb-4">
126-
<DocCard
127-
title="States"
128-
description="A state is an object stored as a resource of the resource type `state` which is meant to persist across distinct executions of the same script."
129-
href="/docs/core_concepts/resources_and_types#states"
130-
/>
128+
<DocCard
129+
title="States"
130+
description="A state is an object stored as a resource of the resource type `state` which is meant to persist across distinct executions of the same script."
131+
href="/docs/core_concepts/resources_and_types#states"
132+
/>
131133
</div>
132134

133135
#### Resources
@@ -205,20 +207,20 @@ For Postgres databases (best for structured data storage and retrieval, where yo
205207
4. From Windmill, add your Supabase connection string as a [Postgresql resource](https://hub.windmill.dev/resource_types/114/postgresql) and [Execute queries](https://hub.windmill.dev/scripts/postgresql/1294/execute-query-and-return-results-postgresql). Tip: you might need to set the `sslmode` to "disable".
206208

207209
<video
208-
className="border-2 rounded-xl object-cover w-full h-full dark:border-gray-800"
209-
controls
210-
src="/videos/supabase_postgres_integration.mp4"
210+
className="border-2 rounded-xl object-cover w-full h-full dark:border-gray-800"
211+
controls
212+
src="/videos/supabase_postgres_integration.mp4"
211213
/>
212214

213-
<br/>
215+
<br />
214216

215217
You can also integrate Supabase [directly through its API](../../integrations/supabase.md#through-supabase-api).
216218

217219
:::tip
218220

219221
You can find examples and premade Supabase scripts on [Windmill Hub](https://hub.windmill.dev/integrations/supabase).
220222

221-
<br/>
223+
<br />
222224

223225
More tutorials on Supabase:
224226

@@ -243,18 +245,18 @@ More tutorials on Supabase:
243245
4. From Windmill, add your Neon.tech connection string as a [Postgresql resource](https://hub.windmill.dev/resource_types/114/postgresql) and [Execute queries](https://hub.windmill.dev/scripts/postgresql/1294/execute-query-and-return-results-postgresql).
244246

245247
<video
246-
className="border-2 rounded-xl object-cover w-full h-full dark:border-gray-800"
247-
controls
248-
src="/videos/neon_integration.mp4"
248+
className="border-2 rounded-xl object-cover w-full h-full dark:border-gray-800"
249+
controls
250+
src="/videos/neon_integration.mp4"
249251
/>
250252

251-
<br/>
253+
<br />
252254

253255
:::tip
254256

255257
Adding the connection string as a Postgres resource requires to parse it.
256258

257-
<br/>
259+
<br />
258260

259261
For example, for `psql postgres://daniel:<password>@ep-restless-rice.us-east-2.aws.neon.tech/neondb`, that would be:
260262

@@ -277,22 +279,6 @@ Where the sslmode should be "require" and Neon uses the default PostgreSQL port,
277279

278280
On heavier data objects & unstructured data storage, Amazon S3 (Simple Storage Service) and its alternatives Cloudflare R2 and MinIO are highly scalable and durable object storage service that provides secure, reliable, and cost-effective storage for a wide range of data types and use cases.
279281

280-
### Windmill embedded integration with S3, Polars and DuckDB for data pipelines
281-
282-
Run your ETLs on-prem up to 5x faster using Windmill compared to Spark while simplifying your infra.
283-
284-
You can link a Windmill workspace to an S3 bucket and use it as source and/or target of your processing steps seamlessly, without any boilerplate.
285-
286-
See our page dedicated to Data Pipelines in Windmill:
287-
288-
<div class="grid grid-cols-2 gap-6 mb-4">
289-
<DocCard
290-
title="Data Pipelines"
291-
description="We have integrated with Polars and DuckDB for in-memory data processing and S3 for external storage."
292-
href="/docs/core_concepts/data_pipelines"
293-
/>
294-
</div>
295-
296282
### Use Amazon S3, R2 and MinIO directly
297283

298284
Amazon S3, Cloudflare R2 and MinIO all follow the same API schema and therefore have a [common Windmill resource type](https://hub.windmill.dev/resource_types/42/).
@@ -302,12 +288,12 @@ Amazon S3, Cloudflare R2 and MinIO all follow the same API schema and therefore
302288
[Amazon S3](https://aws.amazon.com/s3/) (Simple Storage Service) is a scalable and durable object storage service offered by Amazon Web Services (AWS), designed to provide developers and businesses with an effective way to store and retrieve any amount of data from anywhere on the web.
303289

304290
<video
305-
className="border-2 rounded-xl object-cover w-full h-full dark:border-gray-800"
306-
controls
307-
src="/videos/s3_objects_in_bucket.mp4"
291+
className="border-2 rounded-xl object-cover w-full h-full dark:border-gray-800"
292+
controls
293+
src="/videos/s3_objects_in_bucket.mp4"
308294
/>
309295

310-
<br/>
296+
<br />
311297

312298
1. [Sign-up to AWS](https://aws.amazon.com/resources/create-account/).
313299

@@ -341,6 +327,207 @@ For best performance, [install MinIO locally](https://min.io/docs/minio/kubernet
341327

342328
Then from Windmill, just [fill the S3 resource type](../../integrations/s3.md).
343329

330+
#### Windmill code snippets
331+
332+
<Tabs className="unique-tabs">
333+
<TabItem value="deno" label="TypeScript (Deno)" attributes={{className: "text-xs p-4 !mt-0 !ml-0"}}>
334+
335+
```ts
336+
import * as wmill from 'npm:windmill-client@1';
337+
import { S3Client } from 'https://deno.land/x/s3_lite_client@0.2.0/mod.ts';
338+
339+
type s3object = object;
340+
341+
export async function main(inputFile: s3object) {
342+
const s3Resource = await wmill.getResource('<PATH_TO_S3_RESOURCE>');
343+
const s3Client = new S3Client(s3Resource);
344+
const outputFile = 'output/hello.txt';
345+
346+
// read object from S3
347+
const getObjectResponse = await s3Client.getObject(inputFile['s3']);
348+
const inputObjContent = await getObjectResponse.text();
349+
console.log(inputObjContent);
350+
351+
// write object to S3
352+
await s3Client.putObject(outputFile, 'Hello Windmill!');
353+
354+
// list objects from bucket
355+
for await (const obj of s3Client.listObjects({ prefix: 'output/' })) {
356+
console.log(obj.key);
357+
}
358+
359+
return {
360+
s3: outputFile
361+
};
362+
}
363+
```
364+
365+
</TabItem>
366+
<TabItem value="python" label="Python" attributes={{className: "text-xs p-4 !mt-0 !ml-0"}}>
367+
368+
```python
369+
import wmill
370+
import boto3
371+
372+
s3object = dict
373+
374+
375+
def main(input_file: s3object):
376+
s3_resource = wmill.get_resource("<PATH_TO_S3_RESOURCE>")
377+
bucket = s3_resource["bucket"]
378+
s3client = boto3.client(
379+
"s3",
380+
region_name=s3_resource["region"],
381+
aws_access_key_id=s3_resource["accessKey"],
382+
aws_secret_access_key=s3_resource["secretKey"],
383+
)
384+
output_file = "output/hello.txt"
385+
386+
# read object from S3 and print its content
387+
input_obj = s3client.get_object(Bucket=bucket, Key=input_file["s3"])["Body"].read()
388+
print(input_obj)
389+
390+
# write object to s3
391+
s3client.put_object(Bucket=bucket, Key=output_file, Body="Hello Windmill!")
392+
393+
# download file to the job temporary folder:
394+
s3client.download_file(
395+
Bucket=bucket, Key=input_file["s3"], Filename="./download.txt"
396+
)
397+
with open("./download.txt", mode="rb") as downloaded_file:
398+
print(downloaded_file.read())
399+
400+
# upload file from temporary folder to S3
401+
uploaded_file = "output/uploaded.txt"
402+
with open("./upload.txt", mode="wb") as file_to_upload:
403+
file_to_upload.write(str.encode("Hello Windmill!"))
404+
s3client.upload_file(Bucket=bucket, Key=uploaded_file, Filename="./upload.txt")
405+
406+
# see https://boto3.amazonaws.com/v1/documentation/api/latest/guide/s3-examples.html
407+
# and https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/s3.html
408+
# for more code examples (listing object, deleting files, etc)
409+
410+
return [
411+
s3object({"s3": output_file}),
412+
s3object({"s3": uploaded_file}),
413+
]
414+
```
415+
416+
</TabItem>
417+
</Tabs>
418+
419+
### Windmill embedded integration with S3, Polars and DuckDB for data pipelines
420+
421+
Run your ETLs on-prem up to 5x faster using Windmill compared to Spark while simplifying your infra.
422+
423+
You can link a Windmill workspace to an S3 bucket and use it as source and/or target of your processing steps seamlessly, without any boilerplate.
424+
425+
<Tabs className="unique-tabs">
426+
<TabItem value="polars" label="Polars" attributes={{className: "text-xs p-4 !mt-0 !ml-0"}}>
427+
428+
```python
429+
import wmill
430+
import polars as pl
431+
import s3fs
432+
433+
s3object = dict
434+
435+
436+
def main(input_file: s3object):
437+
s3 = s3fs.S3FileSystem(
438+
# this will default to the workspace s3 resource
439+
**wmill.polars_connection_settings()["s3fs_args"]
440+
# this will use the designated resource
441+
# **wmill.polars_connection_settings("<PATH_TO_S3_RESOURCE>")["s3fs_args"]
442+
)
443+
444+
bucket = "<S3_BUCKET_NAME>"
445+
input_uri = "s3://{}/{}".format(bucket, input_file["s3"])
446+
output_file = "output/result.parquet"
447+
output_uri = "s3://{}/{}".format(bucket, output_file)
448+
449+
with (
450+
s3.open(input_uri, mode="rb") as input_s3,
451+
s3.open(output_uri, mode="wb") as output_s3,
452+
):
453+
# input is a parquet file, we use read_parquet in lazy mode.
454+
# Polars can read various file types, see
455+
# https://pola-rs.github.io/polars/py-polars/html/reference/io.html
456+
input_df = pl.read_parquet(input_s3).lazy()
457+
458+
# process the Polars dataframe. See Polars docs:
459+
# for dataframe: https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/index.html
460+
# for lazy dataframe: https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html
461+
output_df = input_df.collect()
462+
print(output_df)
463+
464+
# persist the output dataframe back to S3 and return it
465+
output_df.write_parquet(output_s3)
466+
return s3object({"s3": output_file})
467+
```
468+
469+
</TabItem>
470+
<TabItem value="duckdb" label="DuckDB" attributes={{className: "text-xs p-4 !mt-0 !ml-0"}}>
471+
472+
```python
473+
import wmill
474+
import duckdb
475+
476+
s3object = dict
477+
478+
479+
def main(input_file: s3object):
480+
# create a DuckDB database in memory
481+
# see https://duckdb.org/docs/api/python/dbapi
482+
conn = duckdb.connect()
483+
# connect duck db to the S3 bucket - this will default to the workspace s3 resource
484+
conn.execute(wmill.duckdb_connection_settings()["connection_settings_str"])
485+
# this will use the designated resource
486+
# conn.execute(wmill.duckdb_connection_settings("<PATH_TO_S3_RESOURCE>")["connection_settings_str"])
487+
488+
bucket = "<S3_BUCKET_NAME>"
489+
input_uri = "s3://{}/{}".format(bucket, input_file["s3"])
490+
output_file = "output/result.parquet"
491+
output_uri = "s3://{}/{}".format(bucket, output_file)
492+
493+
# Run queries directly on the parquet file
494+
query_result = conn.sql(
495+
"""
496+
SELECT * FROM read_parquet('{}')
497+
""".format(
498+
input_uri
499+
)
500+
)
501+
query_result.show()
502+
503+
# Write the result of a query to a different parquet file on S3
504+
conn.execute(
505+
"""
506+
COPY (
507+
SELECT COUNT(*) FROM read_parquet('{input_uri}')
508+
) TO '{output_uri}' (FORMAT 'parquet');
509+
""".format(
510+
input_uri=input_uri, output_uri=output_uri
511+
)
512+
)
513+
514+
conn.close()
515+
return s3object({"s3": output_file})
516+
```
517+
518+
</TabItem>
519+
</Tabs>
520+
521+
For more info, see our page dedicated to Data Pipelines in Windmill:
522+
523+
<div class="grid grid-cols-2 gap-6 mb-4">
524+
<DocCard
525+
title="Data Pipelines"
526+
description="We have integrated with Polars and DuckDB for in-memory data processing and S3 for external storage."
527+
href="/docs/core_concepts/data_pipelines"
528+
/>
529+
</div>
530+
344531
## Key-Value Stores: MongoDB Atlas, Redis, Upstash
345532

346533
Key-value stores are a popular choice for managing non-structured data, providing a flexible and scalable solution for various data types and use cases. In the context of Windmill, you can use MongoDB Atlas, Redis, and Upstash to store and manipulate non-structured data effectively.

0 commit comments

Comments
 (0)