Skip to content

Spike: Kinesis client native python implementation #10156

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

maxhoheiser
Copy link
Member

Motivation

What:

  • replace java kcl used to listen to kinesis stream in ) kinesis_connector with native python implementation

Why:

  • depending on jvm bloats the container
  • using kcl relies on external dependencies for core functionality

We are spinning up a mock Kinesis server (https://github.com/etspaceman/kinesis-mock) and use the java plication aws kinesis client library to connect to this mock server. For services queriing events from the mock kinesis server, we provide a connector function listen_to_kinesis in kinesis_connector

This method starts the kcl for each listener. Kcl starts one or more instances with a worker bound to one or more shards. Each worker processes messages from the shard(s) and sends received messages to a programmatically created Python script that sends the messages to a socket. A listener function running in a separate thread listens to incoming messages on this socket and handles further processing defined by the initiator e.g. Firehose

To track shard lease and assignment shards to worker processes as well as to track checkpoints (latest read message from the shard) kcl uses a dynamoDB “lease table”

used by:

  • community: Firehose
  • ext: Kinesisanalytics, Kinesisanalytics-v2, Quantum Ledger Database

Issues:

Currently, the system does not handle multiple instantiations of the listen_to_kinesis and its child threads subscribing to the same Kinesis event stream. Also, event pipes require more control over

Changes

KinesisConnector class that starts a thread that monitors existing shards and creates shard iterators (recreates after lease time ran out 5min), as well as creates new shard iterators on resharding.

KinesisWorker class, for each shard a separate worker is instantiated in a separate thread that uses the native boto method kinesis_client.get_records to receive events from the specific shard it listens to, it further processes these events by calling the specified callback function.

Data for synchronization between KinesisConnector and each worker is for now only kept in memory (this might need to change for dealing with restoring states if persistence in localstack is enabled. For each shard, a ShardData object is created, and all shards are kept in a dictionary.

Testing

Adds test to kinesis_test for recharging.
Testing for shard timeout and persistence testing is still required

TODO

What's left to do:

  • further abstract KinesisClient to be usable by all services e.g. event pipes
  • deal with data persistence and not re-reading already processed events by callback function

@maxhoheiser maxhoheiser self-assigned this Feb 1, 2024
@maxhoheiser maxhoheiser added this to the Playground milestone Feb 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant