Source code for data_migrator.transform

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import sys
import os
import csv
import logging

from data_migrator import __version__
from data_migrator.exceptions import DataException, ValidationException
from data_migrator.utils import configure_logging
from data_migrator.utils import configure_parser
from data_migrator.emitters import MySQLEmitter


[docs]class Transformer(object): '''Main transformation engine Use this class as your main entry to build your Transformer >>> if __name__ == "__main__": >>> t = transform.Transformer(models=[Model]) >>> t.process() '''
[docs] def __init__(self, models=None, reader=None, dataset=None, argparser=None, outdir=None, emitter=MySQLEmitter): ''' Args: models (list): list of all models to be processed in this transformer reader: reference to and external reader if not default dataset: a tablib dataset to read from argparse: reference to another argument parser if not default_parser outdir: output directory for results, otherwise scan from argparser emitter: emitter to be used for this transformation Note that the order of models is relevant for the generation ''' self.outdir = outdir self.models = models or [] self.emitter = emitter self.dataset = dataset self.print_rows = 0 self.argparser = argparser self.reader = reader self.max_pos = max([x._meta.max_pos for x in models])
[docs] def process(self): '''Main processing loop''' self.log = configure_logging() self.parser = self.argparser or configure_parser() self._specific_args() self._interpret_cmdline() self.log.info("data_migrator pipeline starting") self.log.debug("version: %s", __version__) self._get_header() self._open_input() self._read_input() self._write_output() self.log.info("data_migrator pipeline done")
def _specific_args(self): pass def _interpret_cmdline(self): self.args = self.parser.parse_args(sys.argv[1:]) self.outdir = self.outdir or self.args.outdir if self.args.debug: self.log.setLevel(logging.DEBUG) self.print_rows = self.args.rows if self.args.quiet: self.log.setLevel(logging.CRITICAL) if self.dataset: self.log.debug("reading from dataset") self.reader = self.dataset elif self.reader: self.log.debug("reading from external reader") self.reader = self.reader(self.args) elif self.args.input == '<stdin>': self.log.debug("reading from <stdin>") self.reader = csv.reader(sys.stdin, delimiter='\t') else: self.log.debug("reading from file: %s", self.args.input) self.reader = csv.reader(open(self.args.input), delimiter='\t') def _get_header(self): try: self.in_headers = self.reader.headers except AttributeError: self.in_headers = next(self.reader, []) def _open_input(self): if len(self.in_headers) <= self.max_pos: raise DataException( 'Data in has %d columns, too little for max position %d', len(self.in_headers), self.max_pos ) self.log.debug("input has %d columns", len(self.in_headers)) def _read_input(self): self.rows = 0 self.log.info( "models: %s", ", ".join([x._meta.model_name for x in self.models]) ) for x in self.models: if x.objects.unique_values: self.log.info( "%s: unique columns [%s]", x._meta.model_name, ",".join([x for x in x.objects.unique_values]) ) if self.print_rows: self.log.debug("printing first %d rows of input", self.print_rows) for row in self.reader: if self.print_rows > 0: self.log.debug("%d: %s", self.print_rows, row) self.print_rows -= 1 self.rows += 1 res = [] for o in self.models: try: scanned = o.objects.scan_row(row=row, previous=res) res.append(scanned) except DataException: if o._meta.fail_on_data_exception: self.critical("Error in data[%d]: %s", self.rows, row) sys.exit(1) self.log.warning("Error in data[%d]: %s", self.rows, row) except: #pylint: disable=W0702 self.log.critical("Uncaught exception in data: %s", row) sys.exit(1) self.log.debug("headers of input: %s", ",".join(self.in_headers)) def _write_output(self): for m in self.models: _emitter = ( getattr(m._meta, 'emitter', self.emitter) or self.emitter )(manager=m.objects) f, file_name = self._filehandle(_emitter) self.log.debug( '%s: stats %s', m._meta.model_name, ", ".join( ["%s=%d" % (k, v) for k, v in m.objects.stats().items()] ) ) for l in _emitter.preamble(headers=self.in_headers): f.write(l + '\n') lineno = 0 for l in m.objects.all(): try: _out = _emitter.emit(l) for x in _out: f.write(x+'\n') except AssertionError as err: raise ValidationException("object: %d, %s" % (lineno, err)) lineno += 1 for l in _emitter.postamble(): f.write(l + '\n') if f != sys.stdout: self.log.info("Closing file %s", file_name) f.close() self.log.info( "%s: %d records emitted", m._meta.model_name, len(m.objects) ) def _filehandle(self, e): if self.outdir: _filename = e.filename() _filename = os.path.normpath(self.outdir + "/" + _filename) self.log.debug('%s: opening %r', e.meta.model_name, _filename) f = open(_filename, "w") else: self.log.debug('%s: writing to stdout', e._meta.model_name) f = sys.stdout return f, _filename