Skip to content

Commit 922e6cc

Browse files
authored
docs: Update S3 snippets for Polars 0.20 (windmill-labs#451)
1 parent e81944b commit 922e6cc

File tree

2 files changed

+42
-44
lines changed
  • blog/2023-11-24-data-pipeline-orchestrator
  • docs/core_concepts/11_persistent_storage

2 files changed

+42
-44
lines changed

blog/2023-11-24-data-pipeline-orchestrator/index.mdx

Lines changed: 24 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ In the end, a canonical pipeline step in Windmill will look something like this:
159159

160160
```python
161161
#requirements:
162-
#polars==0.19.19
162+
#polars==0.20.2
163163
#s3fs==2023.12.0
164164
#wmill>=1.229.0
165165

@@ -172,35 +172,36 @@ import wmill
172172
def main(input_dataset: S3Object):
173173
# initialization: connect Polars to the workspace bucket
174174
s3_resource = wmill.get_resource("/path/to/resource")
175-
s3fs_args = wmill.polars_connection_settings().s3fs_args
176-
s3 = s3fs.S3FileSystem(**s3fs_args)
175+
storage_options = wmill.polars_connection_settings().storage_options
177176

178177
# reading data from s3:
179178
bucket = s3_resource["bucket"]
180179
input_dataset_uri = "s3://{}/{}".format(bucket, input_dataset["s3"])
181-
output_dataset_uri = "s3://{}/output.parquet".format(bucket)
182-
with s3.open(input_dataset_uri, mode="rb") as input_dataset, s3.open(output_dataset_uri, mode="rb") as output_dataset:
183-
input = pl.read_parquet(input_dataset)
184-
185-
# transforming the data
186-
output = (
187-
input.filter(pl.col("L_SHIPDATE") >= datetime.datetime(1994, 1, 1))
188-
.filter(
189-
pl.col("L_SHIPDATE")
190-
< datetime.datetime(1994, 1, 1) + datetime.timedelta(days=365)
191-
)
192-
.filter((pl.col("L_DISCOUNT").is_between(0.06 - 0.01, 0.06 + 0.01)))
193-
.filter(pl.col("L_QUANTITY") < 24)
194-
.select([(pl.col("L_EXTENDEDPRICE") * pl.col("L_DISCOUNT")).alias("REVENUE")])
195-
.sum()
196-
.collect()
197-
)
198-
199-
# writing the output back to S3
180+
input = pl.read_parquet(input_dataset_uri, storage_options=storage_options)
181+
182+
# transforming the data
183+
output = (
184+
input.filter(pl.col("L_SHIPDATE") >= datetime.datetime(1994, 1, 1))
185+
.filter(
186+
pl.col("L_SHIPDATE")
187+
< datetime.datetime(1994, 1, 1) + datetime.timedelta(days=365)
188+
)
189+
.filter((pl.col("L_DISCOUNT").is_between(0.06 - 0.01, 0.06 + 0.01)))
190+
.filter(pl.col("L_QUANTITY") < 24)
191+
.select([(pl.col("L_EXTENDEDPRICE") * pl.col("L_DISCOUNT")).alias("REVENUE")])
192+
.sum()
193+
.collect()
194+
)
195+
196+
# writing the output back to S3
197+
s3 = s3fs.S3FileSystem(**wmill.polars_connection_settings().s3fs_args)
198+
output_dataset_filename = "output.parquet"
199+
output_dataset_uri = "s3://{}/{}".format(bucket, output_dataset_filename)
200+
with s3.open(output_dataset_uri, mode="rb") as output_dataset:
200201
output.write_parquet(output_dataset)
201202

202203
# returning the URI of the output for next steps to process it
203-
return S3Object(s3=output_dataset_uri)
204+
return S3Object(s3=output_dataset_filename)
204205
```
205206

206207
The example uses Polars. If you're more into SQL you can use DuckDB, but the code will have the same structure: initialization, reading from S3, transforming, writing back to S3.

docs/core_concepts/11_persistent_storage/index.mdx

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -433,7 +433,7 @@ You can link a Windmill workspace to an S3 bucket and use it as source and/or ta
433433

434434
```python
435435
#requirements:
436-
#polars==0.19.19
436+
#polars==0.20.2
437437
#s3fs==2023.12.0
438438
#wmill>=1.229.0
439439

@@ -444,33 +444,30 @@ import s3fs
444444

445445

446446
def main(input_file: S3Object):
447-
bucket = wmill.get_resource("u/admin/windmill-cloud-demo")["bucket"]
447+
bucket = wmill.get_resource("<PATH_TO_S3_RESOURCE>")["bucket"]
448448

449449
# this will default to the workspace s3 resource
450-
args = wmill.polars_connection_settings().s3fs_args
450+
storage_options = wmill.polars_connection_settings().storage_options
451451
# this will use the designated resource
452-
# args = wmill.polars_connection_settings("<PATH_TO_S3_RESOURCE>").s3fs_args
453-
s3 = s3fs.S3FileSystem(**args)
452+
# storage_options = wmill.polars_connection_settings("<PATH_TO_S3_RESOURCE>").storage_options
454453

454+
# input is a parquet file, we use read_parquet in lazy mode.
455+
# Polars can read various file types, see
456+
# https://pola-rs.github.io/polars/py-polars/html/reference/io.html
455457
input_uri = "s3://{}/{}".format(bucket, input_file["s3"])
456-
output_file = "output/result.parquet"
457-
output_uri = "s3://{}/{}".format(bucket, output_file)
458+
input_df = pl.read_parquet(input_uri, storage_options=storage_options).lazy()
458459

459-
with (
460-
s3.open(input_uri, mode="rb") as input_s3,
461-
s3.open(output_uri, mode="wb") as output_s3,
462-
):
463-
# input is a parquet file, we use read_parquet in lazy mode.
464-
# Polars can read various file types, see
465-
# https://pola-rs.github.io/polars/py-polars/html/reference/io.html
466-
input_df = pl.read_parquet(input_s3).lazy()
467-
468-
# process the Polars dataframe. See Polars docs:
469-
# for dataframe: https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/index.html
470-
# for lazy dataframe: https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html
471-
output_df = input_df.collect()
472-
print(output_df)
460+
# process the Polars dataframe. See Polars docs:
461+
# for dataframe: https://pola-rs.github.io/polars/py-polars/html/reference/dataframe/index.html
462+
# for lazy dataframe: https://pola-rs.github.io/polars/py-polars/html/reference/lazyframe/index.html
463+
output_df = input_df.collect()
464+
print(output_df)
473465

466+
# To write back the result to S3, Polars needs an s3fs connection
467+
s3 = s3fs.S3FileSystem(**wmill.polars_connection_settings().s3fs_args)
468+
output_file = "output/result.parquet"
469+
output_uri = "s3://{}/{}".format(bucket, output_file)
470+
with s3.open(output_uri, mode="wb") as output_s3:
474471
# persist the output dataframe back to S3 and return it
475472
output_df.write_parquet(output_s3)
476473

0 commit comments

Comments
 (0)