Datalake loads and Kinesis¶
See the previous tutorial.
A new use case is the initialization of data-lakes. In those cases, systems, applications and microservices are extended with event message emitters to be put on busses like Kafka and Kinesis. Sometimes one is better of to control the initial load of such systems by emitting the current database as messages. A first use case for data-migrator was implemented with AWS Kinesis.
Controlling Kinesis¶
For Kinesis, two extensions are added: a JSONEmitter
and a new transformer.
The KinesisTransformer is using Boto3, so see the Boto documentation for
how to cope with credentials and profiles (you need to setup
~/.aws/credential and ~/.aws/config files).
from datetime import datetime
from data_migrator.contrib import KinesisTransformer
from data_migrator import models
def Meta(row):
return {
"eventType": "model.create",
"timestamp": datetime.utcnow().isoformat() + 'Z',
"producedBy": "data-migrator"
}
class TrialModel(models.Model):
_meta = models.DictField(parse=Meta)
uuid = models.UUIDField(pos=3)
value = models.IntField(pos=8)
created_by = models.StringField(pos=10)
created_at = models.StringField(pos=9)
updated_by = models.StringField(pos=12)
updated_at = models.StringField(pos=11)
if __name__ == "__main__":
KinesisTransformer(models=[TrialModel]).process(
stream_name='my-stream',
profile_name='prd',
)
It does not get more complex than this, but obviously there is a little more to it to understand if the model is ok. The Transformer therefore has two options trial, to do a trial run, meaning do not actually send to Kinesis and output to set the folder to export the model to (name of the file is based on the model_name).
$ python transform_trialmodel.py --help
usage: transform_trialmodel.py [-h] [-o OUTDIR] [-i INPUT] [--debug]
[-q] [-p ROWS] [-t] [--stream STREAM]
[--profile PROFILE]
Basic Transformer parser
optional arguments:
-h, --help show this help message and exit
-o OUTDIR, --outdir OUTDIR
output directory
-i INPUT, --input INPUT
input file
--debug enter debug mode
-q, --quiet quiet mode, no output
-p ROWS, --rows ROWS input rows to print
-t, --trial trial run only, no actual upload to Kinesis
--stream STREAM name of the stream
--profile PROFILE name of the profile