Spike: Kinesis client native python implementation #10156
Draft
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Motivation
What:
Why:
We are spinning up a mock Kinesis server (https://github.com/etspaceman/kinesis-mock) and use the java plication aws kinesis client library to connect to this mock server. For services queriing events from the mock kinesis server, we provide a connector function
listen_to_kinesis
inkinesis_connector
This method starts the kcl for each listener. Kcl starts one or more instances with a worker bound to one or more shards. Each worker processes messages from the shard(s) and sends received messages to a programmatically created Python script that sends the messages to a socket. A listener function running in a separate thread listens to incoming messages on this socket and handles further processing defined by the initiator e.g. Firehose
To track shard lease and assignment shards to worker processes as well as to track checkpoints (latest read message from the shard) kcl uses a dynamoDB “lease table”
used by:
Issues:
Currently, the system does not handle multiple instantiations of the
listen_to_kinesis
and its child threads subscribing to the same Kinesis event stream. Also, event pipes require more control overChanges
KinesisConnector class that starts a thread that monitors existing shards and creates shard iterators (recreates after lease time ran out 5min), as well as creates new shard iterators on resharding.
KinesisWorker class, for each shard a separate worker is instantiated in a separate thread that uses the native boto method kinesis_client.get_records to receive events from the specific shard it listens to, it further processes these events by calling the specified callback function.
Data for synchronization between KinesisConnector and each worker is for now only kept in memory (this might need to change for dealing with restoring states if persistence in localstack is enabled. For each shard, a ShardData object is created, and all shards are kept in a dictionary.
Testing
Adds test to kinesis_test for recharging.
Testing for shard timeout and persistence testing is still required
TODO
What's left to do: