Datalake loads and Kinesis

See the previous tutorial.

A new use case is the initialization of data-lakes. In those cases, systems, applications and microservices are extended with event message emitters to be put on busses like Kafka and Kinesis. Sometimes one is better of to control the initial load of such systems by emitting the current database as messages. A first use case for data-migrator was implemented with AWS Kinesis.

Controlling Kinesis

For Kinesis, two extensions are added: a JSONEmitter and a new transformer. The KinesisTransformer is using Boto3, so see the Boto documentation for how to cope with credentials and profiles (you need to setup ~/.aws/credential and ~/.aws/config files).

from datetime import datetime

from data_migrator.contrib import KinesisTransformer
from data_migrator import models

def Meta(row):
    return {
        "eventType": "model.create",
        "timestamp": datetime.utcnow().isoformat() + 'Z',
        "producedBy": "data-migrator"
    }

class TrialModel(models.Model):
    _meta           = models.DictField(parse=Meta)
    uuid            = models.UUIDField(pos=3)
    value           = models.IntField(pos=8)
    created_by      = models.StringField(pos=10)
    created_at      = models.StringField(pos=9)
    updated_by      = models.StringField(pos=12)
    updated_at      = models.StringField(pos=11)

if __name__ == "__main__":
    KinesisTransformer(models=[TrialModel]).process(
      stream_name='my-stream',
      profile_name='prd',
    )

It does not get more complex than this, but obviously there is a little more to it to understand if the model is ok. The Transformer therefore has two options trial, to do a trial run, meaning do not actually send to Kinesis and output to set the folder to export the model to (name of the file is based on the model_name).

$ python transform_trialmodel.py --help
usage: transform_trialmodel.py [-h] [-o OUTDIR] [-i INPUT] [--debug]
                                    [-q] [-p ROWS] [-t] [--stream STREAM]
                                    [--profile PROFILE]

Basic Transformer parser

optional arguments:
  -h, --help            show this help message and exit
  -o OUTDIR, --outdir OUTDIR
                        output directory
  -i INPUT, --input INPUT
                        input file
  --debug               enter debug mode
  -q, --quiet           quiet mode, no output
  -p ROWS, --rows ROWS  input rows to print
  -t, --trial           trial run only, no actual upload to Kinesis
  --stream STREAM       name of the stream
  --profile PROFILE     name of the profile