Source code for data_migrator.models.fields

#!/usr/bin/env python
# -*- coding: UTF-8 -*-

import uuid
import json
import datetime
from dateutil import parser as p
from functools import partial

from data_migrator.exceptions import ValidationException, DataException
from data_migrator.exceptions import DefinitionException
from data_migrator.utils import isstr

def new_exception(field, exc_class, msg, *args):
    msg = "%s[%s]: " + msg
    return exc_class(msg % ((field.__class__.__name__, + args))

def _replace(format_str, x):
    return format_str.format(*x)

[docs]class BaseField(object): '''Base column definition for the transformation DSL The following arguments are available to all field types. All are optional. Arguments: pos (int): If positive or zero this denotes the column in the source data to select and store in this field. If not set (or negative) the fields is interpreted as not selecting just a column from the source but to take the full row in the parse function name (str): The name of this field. By default this is the name provided in the model declaration. This attribute is to replace that name by the final column name. default: The default value to use if the source column is found to be a ``null`` field or if the parse function returns None. This attribute has default values for Fields that are not Null<xxx>Fields. For example NullStringField has both NULL and empty string as empty value. :class:`~.StringField` only has empty string as empty value. With this field it can be changed to some other standard value. Consider a Country field as string and setting it to the home country by default. key (boolean): If set, this indicates the field is a key field for identification of the object. nullable (str): If set it will match the source column value and consider this a ``None`` value. By default this attribute is set to ``None``. Note that for none Null fields ``None`` will be translated to :attr:`~.default`. replacement: If set, this is a pre-emit replacement function. This could be used to insert dynamic replacement lookup select queries, adding more indirection into the data generation. Value could be either function or a string. required (boolean): If set, this indicates the field is required to be set. parse: If set this is the parsing function to replace the read value into something to use further down the data migration. Use this for example to clean phone numbers, translate country definitions into alpha3 codes, or to translate ID's into values based on a separately loaded lookup table. validate: Expects a function that returns a boolean, and used to validate the input data. Expecting data within a range or a specific format, add a column validator here. Raises :exc:`~.ValidationException` if set and false. max_length (int): In case of :class:`~.StringField` use this to trim string values to maximum length. unique (boolean): If ``True``, *data-migrator* will check uniqueness of intermediate values (after parsing). Default is ``False``. In relationship with the default manager this will keep track of values for this field. The manager can raise exceptions if uniqueness is violated. Note that it is up to the manager to either fail or drop the record if the exception is raised. anonymizer: Add an additional function that will be called at emit to anonymize the data validate_output: A pre-emit validator used to scan the bare output and raise exceptions if output is not as expected. creation_order: An automatically generated attribute used to determine order of specification, and used in the emitting of dataset. ''' creation_order = 0 schema_type = 'object' def __init__(self, pos=-1, name="", default=None, nullable="NULL", key=False, required=False, replacement=None, parse=None, validate=None, anonymize=None, max_length=None, unique=False, validate_output=None): # default value if null self.default = default if default is not None else getattr(self.__class__, 'default', default) # key indicated key field self.key = key # fixed position in the row to read if max_length and self.schema_type != "string": raise DefinitionException("Cannot set max_length on on string") self.max_length = max_length if isinstance(max_length, int) else None # name of this field (will be set in Model class construction) = name # input string that defines null -> None self.nullable = nullable # some function to apply to value self.parse = parse or getattr(self.__class__, 'parse', None) self.pos = int(pos) # replace string to use in output if isstr(replacement): replacement = partial(_replace, replacement) self.replace = getattr(self.__class__, 'replace', replacement) # required indicates must be filled in self.required = required # unique indicates a unique field self.unique = unique # anonymize is the anonymization function self.anonymize = anonymize() if isinstance(anonymize, type) \ else anonymize # some function to apply to value self.validate = validate or getattr(self.__class__, 'validate', None) # output validator self.validate_output = validate_output # creation_order is required for orderdict to retain order of fields self.creation_order = BaseField.creation_order BaseField.creation_order += 1
[docs] def scan(self, row): '''scan row and harvest distinct value. Takes a row of data and parses the required fields out of this. Args: row (list): array of source data Returns: parsed and processed value. Raises: :class:`~.ValidationException`: raised if explicit validation fails. ''' # see if we want to read a column in the row v = None if self.pos >= 0: try: _v = row[self.pos] except: raise DataException('parsing %r, row len %d, index %d not found',, len(row), self.pos) # do null check if enabled if self.nullable is not None and _v == self.nullable: return v v = _v if self.validate and not self.validate(v): raise ValidationException('field %r input data did not validate' % # apply intermediate function on output, default is stripping if self.parse: v = self.parse(v) elif self.parse: v = self.parse(row) or v # delegate to inner function, to reuse this logic return self._value(v)
[docs] def emit(self, v, escaper=None): '''helper function to export this field. Expects a value from the model to be emitted Args: v: value to emit escaper: escaper function to apply on value Returns: emitted value. Raises: :class:`~.ValidationException`: raised if explicit validation fails.''' if self.max_length and isstr(v): v = v[:self.max_length] if v is None: v = self.default if self.default is not None else v if self.validate_output and not self.validate_output(v): raise ValidationException("not able to validate %s=%s" % (, v)) # allow external function (e.g. SQL escape) # anonymize this data if self.anonymize: v = self.anonymize(v) # check if we have a replacement string to take into account if self.replace: if not isinstance(v, tuple): v = (v,) v = self.replace(v) # pylint: disable=not-callable elif escaper: v = escaper(v) return v
[docs] def json_schema(self, name=None): '''generate json_schema representation of this field Args: name: name if not taken from this field Returns: python representation of json schema ''' t = self.schema_type if 'Null' in self.__class__.__name__: t = [t, "null"] t = {'type': t} if self.key: t['key'] = True if self.max_length and self.schema_type == "string": t['maxLength'] = self.max_length if hasattr(self, 'schema_format'): t['format'] = self.schema_format return {name or t}
def _value(self, v): # pylint: disable=R0201 return v
[docs]class HiddenField(BaseField): '''Non emitting Field for validation and checking. a field that accepts, but does not emit. It is useful for uniqueness checked and more. Combine this with a row parse and check the complete row. ''' pass
[docs]class IntField(BaseField): '''Basic integer field handler''' default = 0 schema_type = 'integer' def _value(self, v): return int(v) if isstr(v) else v
IntegerField = IntField
[docs]class DateTimeField(BaseField): '''Basic datetime field handler''' schema_type = 'string' schema_format = 'date-time'
[docs] def __init__(self, f=None, **kwargs): """ Args: f: format of the datetime Default is ``%Y-%m-%dT%H:%M:%SZ`` (RFC3999) """ self.format = f or getattr(self.__class__, 'format', "%Y-%m-%dT%H:%M:%SZ") super(DateTimeField, self).__init__(**kwargs)
def _value(self, v): if isstr(v): if v == "": return None try: v = p.parse(v) except ValueError: raise DataException("%s could not parse date %s",, v) return v def emit(self, v, escaper=None): if v is not None and isinstance(v, datetime.datetime): v = v.strftime(self.format) return super(DateTimeField, self).emit(v, escaper)
[docs]class UTCNowField(DateTimeField): '''UTCNow generating field. a field that generates a ``UTCNow`` ''' def _value(self, v): '''override and automatically set''' return datetime.datetime.utcnow()
[docs]class NullIntField(BaseField): '''Null integer field handler. a field that accepts the column to be integer and can also be None, which is not the same as 0 (zero). ''' schema_type = 'integer' def _value(self, v): return int(v) if isstr(v) else v
[docs]class StringField(BaseField): '''String field handler, a field that accepts the column to be string.''' default = "" schema_type = 'string' def _value(self, v): return v.strip() if isstr(v) else v
[docs]class NullStringField(BaseField): '''Null String field handler. a field that accepts the column to be string and can also be None, which is not the same as empty string (""). ''' schema_type = 'string' def _value(self, v): return v.strip() if isstr(v) else v
[docs]class BooleanField(BaseField): '''Boolean field handler. a bool that takes any cased permutation of true, yes, 1 and translates this into ``True`` or ``False`` otherwise. ''' default = False schema_type = 'boolean' def _value(self, v): try: return v.lower()[0] in ['y', 't', '1'] except (AttributeError, IndexError): return False
class DefaultField(BaseField): '''DefaultField always returns the default value''' def emit(self, v, escaper=None): """Emit is overwritten return default always""" return super(DefaultField, self).emit(self.default, escaper) def _value(self, v): '''override so we can never set''' return self.default
[docs]class NullField(DefaultField): '''NULL returning field by generating None'''
[docs] def json_schema(self, name=None): '''generate json_schema representation of this field''' return {name or {'type': 'null'}}
[docs]class UUIDField(BaseField): '''UUID generating field. a field that generates a ``str(uuid.uuid4())`` ''' schema_type = 'string' def __init__(self, *args, **kwargs): kwargs['default'] = None super(UUIDField, self).__init__(*args, **kwargs) def _value(self, v): '''override and automatically set''' return str(uuid.uuid4())
[docs]class ObjectField(BaseField): '''JSON object field''' default = {} schema_type = 'object'
DictField = ObjectField
[docs]class ArrayField(BaseField): '''JSON array field''' default = [] schema_type = 'array'
ListField = ArrayField
[docs]class JSONField(BaseField): '''a field that takes the values and spits out a JSON encoding string. Great for maps and lists to be stored in a string like db field. '''
[docs] def emit(self, v, escaper=None): """Emit is overwritten to add the to_json option.""" if v is None: v = self.default if self.default is not None else v v = json.dumps(v) return super(JSONField, self).emit(v, escaper)
[docs]class MappingField(BaseField): '''Map based field translator. a field that takes the values translates these according to a map. Great for identity column replacements. If needed output can be translated as ``json``, for example if the map returns lists. '''
[docs] def __init__(self, data_map, as_json=False, strict=False, **kwargs): """ Args: data_map: The data_map needed to translate. Note the fields returns :attr:`~Field.default` if it is not able to map the key. as_json: If ``True``, the field will be output as json encoded. Default is ``False`` strict: If ``True``, the value must by found in the map. Default is ``False`` """ super(MappingField, self).__init__(**kwargs) if strict and self.default: data_map[self.default] = self.default self.data_map = data_map self.as_json = as_json self.strict = strict
[docs] def emit(self, v, escaper=None): """Emit is overwritten to add the to_json option""" if v is None: v = self.default if self.default is not None else v if self.strict: try: v = self.data_map[v] except KeyError: raise DataException("%s - %s not in map" % (, v)) else: v = self.data_map.get(v, self.default if self.default is not None else v) if self.as_json: v = json.dumps(v) return super(MappingField, self).emit(v, escaper)
[docs]class ModelField(BaseField): '''Model relation for hierarchical structures. a field that takes another model to build hierarchical structures. '''
[docs] def __init__(self, fields, strict=None, **kwargs): """ Args: fields: relationship to another model. strict (boolean): model is considered strict. """ super(ModelField, self).__init__(**kwargs) self.strict = strict self.fields = fields
def json_schema(self, name=None): name = name or _res = super(ModelField, self).json_schema()[name] _p = {} if isinstance(self.fields, list): for i in self.fields: _p.update(i.json_schema()) elif isinstance(self.fields, dict): for k, v in self.fields.items(): _p.update(v.json_schema(name=k)) else: _p.update(self.fields.json_schema( _res['properties'] = _p if self.strict is not None: _res['additionalProperties'] = not self.strict return {name: _res}
[docs] def emit(self, v, escaper=None): """Emit is overwritten to add the to_json option""" if v is None: v = self.default if self.default is not None else v else: v = model.emit(v, escaper) ###FIXME: not sure this is correct # anonymize this data if self.anonymize: v = self.anonymize(v) return v