Dativa Pipeline API on AWS

Dativa Pipeline API on AWS

Previous: Overview | Next: Sample Data

The Dativa Pipeline API can be implemented through the AWS marketplace using a simple RESTful API that processes files on S3.

Prerequisites

  • The data to be processed should be in CSV format
  • The data to be processed should be stored in an S3 bucket
  • The S3 bucket should be accessible by Dativa's AWS account

To give Dativa permission to access your bucket you will need to grant cross-account S3 access by applying a Bucket Policy similar to the one below. It can be applied using these instructions:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Example permissions",
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::538486622005:root"
            },
            "Action": "s3:*",
            "Resource": "arn:aws:s3:::your-bucket-name-here/*"
        }
    ]
}

If you use separate source and destination buckets then both will require this policy to be added.

Basic Usage

The API can be tested on our Developer Portal (registration required).

https://pipeline-api.dativa.com/clean

Processing

The parameters to locate the data and describe what processing is to take place are posted as a JSON object in the body of the 'clean' API call. As a minimum the source of the data and the rules to define the processing are required. Optionally a different destination S3 location can be defined.

For the sake of this example we will use the following predefined parameters (edit to use your own bucket and file names):

{
    "source": {
        "s3_url":"https://s3-us-west-2.amazonaws.com/your-bucket-name-here/your-data-file.csv",
        "encoding": "UTF-8"
    },
    "rules": [
        {
            "append_results": false,
            "params": {
                "attempt_closest_match": true,
                "default_value": "N/A",
                "fallback_mode": "use_default",
                "is_unique": false,
                "lookalike_match": false,
                "maximum_length": 1000,
                "minimum_length": 0,
                "regex": ".+",
                "skip_blank": false,
                "string_distance_threshold": 0.7
            },
            "rule_type": "String",
            "field": "0"
        }
    ]
}

Posting the above JSON body to the 'clean' API function, with your API key in the header should result in a response code of 202 (the request has been accepted for processing, but the processing has not been completed yet) and a response body similar to the following:

{
    "job_id": "2e2a6186-32a9-4613-938b-8393f09beb3e",
    "status": "PREPARING",
    "reason": "Method called"
}

You can monitor the progress of the asynchronous task using the 'status' call, providing the 'job_id' returned above:

https://pipeline-api.dativa.com/status/2e2a6186-32a9-4613-938b-8393f09beb3e

This should result in a response body similar to the following:

{
  "job_id": "2e2a6186-32a9-4613-938b-8393f09beb3e",
  "status": "COMPLETED",
  "reason": "Processing successful",
  "report": [
    {
      "date": "2018-05-25 14:53:43.596488",
      "source_file": "https://s3-us-west-2.amazonaws.com/pipeline-api-deploy/test-data/generic/names_blank.csv",
      "field": "Name",
      "rule": "String",
      "category": "replaced",
      "description": "Replaced with default value",
      "modified_file": "https://s3-us-west-2.amazonaws.com/pipeline-api-deploy/test-data/generic/names_blank.csv.report.replaced.name"
    }
  ],
  "source": {
    "s3_url": "https://s3-us-west-2.amazonaws.com/pipeline-api-deploy/test-data/generic/names_blank.csv",
    "delimiter": ",",
    "encoding": "UTF-8",
    "header": 0,
    "skiprows": 0,
    "quotechar": "\""
  },
  "destination": {
    "s3_url": "https://s3-us-west-2.amazonaws.com/pipeline-api-deploy/test-data/generic/names_blank.csv.cleaned",
    "delimiter": ",",
    "encoding": "UTF-8"
  },
  "seconds_taken": "0.1261005401611328"
}

The 'status and 'reason' fields may differ depending on the current state of the processing. The 'status' method can be polled until the 'COMPLETED' or 'ERROR' states have been reached, signifying the end of processing.

Note that the responses from both API calls above can contain the following items:

  • job_id - A unique ID allowing the status of the job to be retrieved for up to 30 days after processing
  • status - The current status of the processing e.g. PREPARING, LOADING, RUNNING, SAVING, COMPLETED or ERROR.
  • reason - An explanation of the status code
  • report - A detailed list of the changes made to the data according to the rules supplied. This report is also placed in the destination location, named after the original data file with a '.report' suffix appended.
  • source - Confirmation of the source of the data, including any assumptions made about the formatting.
  • destination - Destination of the processed data. If a destination location was not specified the processed data is put in the source directory with a '.cleaned' suffix appended.

Full Usage

Clean

  • URL: https://pipeline-api.dativa.com/clean
  • Method: POST
  • Parameters: None
  • Data: Clean Request Schema (see below)
  • Response: Clean Response Schema (see below)

Example:

curl -X POST --header 'Content-Type: application/json' --header 'Accept: application/json' --header 'x-api-key: your-api-key' -d '{ \
    "source": { \
        "s3_url":"https://s3-us-west-2.amazonaws.com/your-bucket-name/your-file-name.csv", \ 
        "delimiter": ",", \
        "encoding": "UTF-8", \
        "header": 0, \
        "skiprows": 0, \
        "quotechar": "\"", \
    }, \ 
    "destination": { \ 
        "s3_url":"https://s3-us-west-2.amazonaws.com/different-bucket-name/different-file-name.csv", \ 
        "delimiter": ",", \
        "encoding": "UTF-8" \ 
    }, \ 
    "rules": [ \ 
        { \ 
            "append_results": false, \ 
            "params": { \ 
                "attempt_closest_match": true, \ 
                "default_value": "N/A", \ 
                "fallback_mode": "use_default", \ 
                "is_unique": false, \ 
                "lookalike_match": false, \ 
                "maximum_length": 1000, \ 
                "minimum_length": 0, \ 
                "regex": ".+", \ 
                "skip_blank": false, \ 
                "string_distance_threshold": 0.7 \ 
            }, \ 
            "rule_type": "String", \ 
            "field": "0" \ 
        } \ 
    ] \ 
 }' 'https://pipeline-api.dativa.com/clean'

Status

  • URL: https://pipeline-api.dativa.com/status/job_id
  • Method: GET
  • Parameters: Job ID
  • Data: None
  • Response: Status Response Schema (see below)
  • Notes: Status information for a job is available for 30 days after processing

Example:

curl -X GET --header 'Accept: application/json' --header 'x-api-key: your-api-key' 'https://pipeline-api.dativa.com/status/your-job-id'

Clean Request Schema

The request schema for the 'clean' method is as follows.

{
    "$schema": "http://json-schema.org/draft-06/schema#",
    "title": "Clean Request Schema",
    "type": "object",
    "properties": {
        "source": {
            "type": "object",
            "properties": {
                "s3_url": {
                    "type": "string"
                },
                "encoding": {
                    "type": "string"
                },
                "delimiter": {
                    "type": "string"
                },
                "header": {
                    "type": "integer"
                },
                "skiprows": {
                    "type": "integer"
                },
                "quotechar": {
                    "type": "string"
                }
            },
            "required": [
                "s3_url"
            ]
        },
        "destination": {
            "type": "object",
            "properties": {
                "s3_url": {
                    "type": "string"
                },
                "encoding": {
                    "type": "string"
                },
                "delimiter": {
                    "type": "string"
                }
            },
            "required": [
                "s3_url"
            ]
        },
        "rules": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "rule_type": {
                        "type": "string"
                    },
                    "field": {
                        "type": "string"
                    },
                    "params": {
                        "type": "object",
                        "properties": {}
                    }
                },
                "required": [
                    "rule_type",
                    "params"
                ]
            }
        }
    },
    "required": [
        "source",
        "rules"
    ]
}

Clean Response Schema

The response schema for the 'clean' method is as follows:

{
    "$schema": "http://json-schema.org/draft-06/schema#",
    "title": "Clean Response Schema",
    "type": "object",
    "properties": {
        "job_id": {
            "type": "string"
        },
        "report": {
            "type": "array",
            "items": {
                "type": "string"
            }
        },
        "source": {
            "type": "object",
            "properties": {
                "s3_url": {
                    "type": "string"
                },
                "encoding": {
                    "type": "string"
                },
                "delimiter": {
                    "type": "string"
                },
                "header": {
                    "type": "integer"
                },
                "skiprows": {
                    "type": "integer"
                },
                "quotechar": {
                    "type": "string"
                }
            },
            "required": [
                "s3_url"
            ]
        },
        "destination": {
            "type": "object",
            "properties": {
                "s3_url": {
                    "type": "string"
                },
                "encoding": {
                    "type": "string"
                },
                "delimiter": {
                    "type": "string"
                }
            },
            "required": [
                "s3_url"
            ]
        }
    },
    "required": [
        "job_id",
        "source",
        "destination"
    ]
}

Status Response Schema

The response schema for the 'status' method is as follows:

{
    "$schema": "http://json-schema.org/draft-06/schema#",
    "title": "Status Schema",
    "type": "object",
    "properties": {
        "job_id": {
            "type": "string"
        },
        "status": {
            "type": "string"
        },
        "reason": {
            "type": "string"
        },
        "report": {
            "type": "array",
            "items": {
                "type": "string"
            }
        },
        "source": {
            "type": "object",
            "properties": {
                "s3_url": {
                    "type": "string"
                },
                "encoding": {
                    "type": "string"
                },
                "delimiter": {
                    "type": "string"
                },
                "header": {
                    "type": "integer"
                },
                "skiprows": {
                    "type": "integer"
                },
                "quotechar": {
                    "type": "string"
                },
                "strip_whitespace": {
                    "type": "boolean"
                }
            },
            "required": [
                "s3_url"
            ]
        },
        "destination": {
            "type": "object",
            "properties": {
                "s3_url": {
                    "type": "string"
                },
                "encoding": {
                    "type": "string"
                },
                "delimiter": {
                    "type": "string"
                }
            },
            "required": [
                "s3_url"
            ]
        }
    },
    "required": [
        "job_id",
        "status"
    ]
}
  • job_id - A unique ID assigned by the 'clean' call to identify a particular processing run on a particular file.
  • report - Line by line description of the changes made to the source file during the data processing. Includes date, dource file, field changed, rule used, category of change, description of change, and the modified file.
  • source - Definition of the source file to process including the S3 location and optional details about the data such as its encoding, delimiter character, quote character etc.
  • destination - Definition of the destination file of the processed data including the S3 location and optional details about the data encoding and delimiter character used.
  • s3_url - Fully qualified S3 location of the data file e.g. https://s3-<region>.amazonaws.com/<bucket-name>/<prefix>/<file-name>.csv
  • encoding - A string representing the encoding to use in the output file, defaults to 'UTF8'.
  • delimiter - Field delimiter character for the data file, defaults to ','.
  • header - Row number of data file containing column names, deafults to zero
  • skiprows - Integer defining the number of lines to skip at the start of the file, defaults to zero
  • quotechar - The character used to denote the start and end of a quoted item, defaults to '"'

Frequently Asked Questions

  • Why do I have to add a bucket policy to my S3 bucket? The Dativa Pipeline API needs access to read new data from your source S3 bucket and write processed data to your destination bucket. An easy way to do this is to add a bucket policy that allows Dativa's AWS account access to your bucket(s). However, if you have alternative requirements then please get in touch to discuss.

  • How much data can I process in one file? Processing on the API is limited by time, approximately 30 seconds maximum per 'clean' API call. The amount of data you can process in this time depends on the complexity of the rules being applied to the data. For complex processing you may need to split data files into smaller chunks.

  • Can I process multiple files using one API call? At the moment the API only supports processing one file at a time.

  • Why does the 'clean' API call block for so long? The 'clean' API call is synchronous so blocks whilst the data processing is underway.

  • I'm getting a timeout error. What's the problem? Processing on the API is limited by time, approximately 30 seconds maximum per 'clean' API call. For complex processing you may need to split data files into smaller chunks.

  • Can I process anything other than CSV files (e.g. JSON or XML)? Currently the API only supports CSV format files.

  • How much does the service cost? Please see our listing in the AWS Marketplace for details on pricing.

  • Do you have any libraries to help me use the API? We currently don't provide any libraries to support the use of the API.

  • I have a problem. How do I get support? Please email api-support@dativa.com to create a support ticket.

Previous: Overview | Next: Sample Data

Related documentation

  • Dativa Pipeline API: Sample Data - Sample files to demonstrate usage of the Dativa Pipeline API (more)
  • Dativa Pipeline API: Validating basic data types - Validating incoming datasets for basic string, number, and date type formatting and range checks using the Dativa Data Pipeline API (more)
  • Dativa Pipeline API: Anonymizing data - The Dativa Pipeline API support tokenization, hashing, and encyrption of incoming datasets for anonymisation and pseudonymization (more)
  • Dativa Pipeline API: Referential Integrity - Using the Dativa Pipeline API to validate data against other known good datasets to ensure referential integrity (more)
  • Dativa Pipeline API: Handling invalid data - Invalid data can be quarantined or automatically fixed by the Dativa Data Pipeline API (more)
  • Dativa Pipeline API: Working with session data - The Dativa Pipeline API can check for gaps and overlaps in session data and automatically fix them (more)
  • Dativa Pipeline API: Reporting and monitoring data quality - The Dativa Pipeline API logs data that does not meet the defined rules and quarantines bad data (more)
  • Dativa Pipeline API: Full API reference - A field by field breakdown of the full functionality of the Dativa Data Pipeline API (more)

Need help? Get in touch...

Sign up below and one of our data consultants will get right back to you


Dativa is a global consulting firm providing data consulting and engineering services to companies that want to build and implement strategies to put data to work. We work with primary data generators, businesses harvesting their own internal data, data-centric service providers, data brokers, agencies, media buyers and media sellers.

145 Marina Boulevard
San Rafael
California
94901

Registered in Delaware

Thames Tower
Station Road
Reading
RG1 1LX

Registered in England & Wales, number 10202531