Snapstream provides a data-flow model to simplify development of stateful streaming applications.
pip install snapstream
We snap
iterables to user functions, and process them in parallel when we call stream
:
We pass the callable print
to print out the return value. Multiple iterables and sinks can be passed.
from snapstream import snap, stream
@snap(range(5), sink=[print])
def handler(msg):
yield f'Hello {msg}'
stream()
Hello 0
Hello 1
Hello 2
Hello 3
Hello 4
To try it out for yourself, spin up a local kafka broker with docker-compose.yml, using localhost:29091
to connect:
docker compose up broker -d
Use the cli tool to inspect Topic/Cache:
snapstream topic emoji --offset -2
>>> timestamp: 2023-04-28T17:31:51.775000+00:00
>>> offset: 0
>>> key:
🏆
snapstream.snap
: bind streams (iterables) and sinks (callables) to user defined handler functionssnapstream.stream
: start streamingsnapstream.Topic
: consume from (iterable), and produce to (callable) kafka using confluent-kafkasnapstream.Cache
: store data to disk using rocksdictsnapstream.Conf
: set global kafka configuration (can be overridden per topic)snapstream.codecs.AvroCodec
: serialize and deserialize avro messagessnapstream.codecs.JsonCodec
: serialize and deserialize json messages