Dlp Dataflow Deidentification Save

Multi Cloud Data Tokenization Solution By Using Dataflow and Cloud DLP

Project README

Inspect, de-identify, and re-identify sensitive data using the Cloud DLP API and Dataflow

This repo contains a reference implementation for an end-to-end data tokenization solution. This solution does the following:

  • Passes data through Dataflow pipelines that perform inspection, de-identification, and re-identification through the Cloud Data Loss Prevention API (DLP API).
  • Migrates the processed data to BigQuery.

Table of Contents

Architecture

This solution comprises the following pipelines. To view the job graphs of these pipelines, see Dataflow DAG.

  1. Inspection and de-identification
  2. Re-identification

Inspection and de-identification

Reference architecture

You can use this pipeline for Avro, CSV, JSONL, ORC, Parquet and TSV files stored or ingested in Cloud Storage or an Amazon S3 bucket. This pipeline uses the State and Timer APIs to batch and process the files optimally. The results of the inspection and de-identification processes are written in a BigQuery table.

Re-identification

Reference architecture

The re-identification pipeline is used to read data from a BigQuery table and publish the re-identified data to a secure Pub/Sub topic.

Concepts

The following resources describe key concepts related to the DLP API and Dataflow. The DLP API is part of Sensitive Data Protection.

  1. Using Sensitive Data Protection with BigQuery
  2. De-identification and re-identification of PII in large-scale datasets using Sensitive Data Protection
  3. Sensitive Data Protection templates
  4. Data masking/tokenization from Cloud Storage to BigQuery using a template
  5. Inspect Google Cloud storage and databases for sensitive data
  6. Dataflow pipeline options
  7. Sensitive Data Protection quotas and limits

Costs

This tutorial uses billable components of Google Cloud, including the following:

Use the pricing calculator to generate a cost estimate based on your projected usage.

Tutorial

Prerequisites

  1. Create a Google Cloud project.

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Use the link below to open Cloud Shell.

    Open in Cloud Shell

  4. Run the following commands to set up the data tokenization solution in your Google Cloud project:

    gcloud config set project <project_id>
    sh setup-data-tokeninzation-solution-v2.sh
    

    The setup-data-tokenization-solution-v2.sh script performs the following tasks:

    • Creates a bucket ({project-id}-demo-data) in the us-central1 region and uploads a sample dataset with mock PII data.

    • Creates a BigQuery dataset (demo_dataset) in the US multi-region to store the tokenized data.

    • Creates a KMS-wrapped key (KEK) by creating an automatic TEK (token encryption key).

    • Creates inspection, de-identification, and re-identification templates with the KEK and crypto-based transformations.

    • Creates a service account (with a custom role) for running the DLP API pipeline.

    • Enables Pub/Sub and creates a Pub/Sub topic that will be notified whenever a new file is added in the Cloud Storage bucket.

    • Emits a set_env.sh script, which you can use to set temporary environment variables while triggering the DLP API pipelines.

  5. Run set_env.sh:

    source set_env.sh
    

Compile the code

./gradlew build

Run the samples

Inspection

You can run the inspection pipeline to do one or both of the following:

  • Inspect only existing files in the Cloud Storage bucket.
  • Inspect only new files added to the Cloud Storage bucket after the pipeline is triggered.
Inspect new and existing files

To trigger a streaming inspection Dataflow pipeline that processes all the CSV files in the DATA_STORAGE_BUCKET bucket (specified in the filePattern parameter), run the following command:

./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \
-Pargs=" --region=${REGION} \
--project=${PROJECT_ID} \
--streaming --enableStreamingEngine \
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \
--dataset=${BQ_DATASET_NAME}   \
--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \
--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \
--batchSize=200000 \
--DLPMethod=INSPECT \
--serviceAccount=${SERVICE_ACCOUNT_EMAIL} \
--gcsNotificationTopic=projects/${PROJECT_ID}/topics/${GCS_NOTIFICATION_TOPIC}"
Inspect only existing files

To trigger a batch inspection Dataflow pipeline that processes only the CSV files that are currently present in the DATA_STORAGE_BUCKET bucket (specified in the filePattern parameter), run the following command:

./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \
-Pargs=" --region=${REGION} \
--project=${PROJECT_ID} \
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \
--dataset=${BQ_DATASET_NAME}   \
--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \
--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \
--batchSize=200000 \
--DLPMethod=INSPECT \
--serviceAccount=${SERVICE_ACCOUNT_EMAIL}"

Any files that are added to the bucket after the pipeline was triggered are not inspected.

Inspect only new files

To trigger a streaming inspection Dataflow pipeline that processes new CSV files that are added to the DATA_STORAGE_BUCKET bucket (specified in the filePattern parameter), run the following command:

./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \
-Pargs=" --region=${REGION} \
--project=${PROJECT_ID} \
--streaming --enableStreamingEngine \
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \
--dataset=${BQ_DATASET_NAME}   \
--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \
--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \
--batchSize=200000 \
--DLPMethod=INSPECT \
--serviceAccount=${SERVICE_ACCOUNT_EMAIL} \
--gcsNotificationTopic=projects/${PROJECT_ID}/topics/${GCS_NOTIFICATION_TOPIC}
--processExistingFiles=false"

Any files that were added to the bucket before the pipeline was triggered are not inspected.

Inspection results

You can find the inspection results in the BigQuery dataset specified in the dataset parameter.

If --processExistingFiles is set to false and --gcsNotificationTopic is not provided, then the pipeline fails with an error similar to the following:

Exception in thread "main" java.lang.IllegalArgumentException: Either --processExistingFiles should be set to true or --gcsNotificationTopic should be provided
        at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.runInspectAndDeidPipeline(DLPTextToBigQueryStreamingV2.java:113)
        at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.run(DLPTextToBigQueryStreamingV2.java:93)
        at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.main(DLPTextToBigQueryStreamingV2.java:84)

For information about the default values of the pipeline parameters, see Pipeline parameters on this page.

De-identification

You can run the de-identification pipeline to do one or both of the following:

  • De-identify only existing files in the Cloud Storage bucket.
  • De-identify only new files added to the Cloud Storage bucket after the pipeline is triggered.
De-identify new and existing files

To trigger a streaming de-identification Dataflow pipeline that processes all the CSV files in the DATA_STORAGE_BUCKET bucket (specified in the filePattern parameter), run the following command:

./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \
-Pargs=" --region=${REGION} \
--project=${PROJECT_ID} \
--streaming --enableStreamingEngine \
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \
--numWorkers=2 --maxNumWorkers=3 \
--runner=DataflowRunner \
--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \
--dataset=${BQ_DATASET_NAME}   \
--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \
--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \
--batchSize=200000 \
--DLPMethod=DEID \
--serviceAccount=${SERVICE_ACCOUNT_EMAIL} \
--gcsNotificationTopic=projects/${PROJECT_ID}/topics/${GCS_NOTIFICATION_TOPIC}"
De-identify only existing files

To trigger a batch de-identification Dataflow pipeline that processes only the CSV files that are currently present in the DATA_STORAGE_BUCKET bucket (specified in the filePattern parameter), run the following command:

./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \
-Pargs=" --region=${REGION} \
--project=${PROJECT_ID} \
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \
--dataset=${BQ_DATASET_NAME}   \
--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \
--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \
--batchSize=200000 \
--DLPMethod=DEID \
--serviceAccount=${SERVICE_ACCOUNT_EMAIL}"

Any files that are added to the bucket after the pipeline was triggered are not de-identified.

De-identify only new files

To trigger a streaming de-identification Dataflow pipeline that processes new CSV files that are added to the DATA_STORAGE_BUCKET bucket (specified in the filePattern parameter), run the following command:

./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \
-Pargs=" --region=${REGION} \
--project=${PROJECT_ID} \
--streaming --enableStreamingEngine \
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \
--dataset=${BQ_DATASET_NAME}   \
--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \
--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \
--batchSize=200000 \
--DLPMethod=INSPECT \
--serviceAccount=${SERVICE_ACCOUNT_EMAIL} \
--gcsNotificationTopic=projects/${PROJECT_ID}/topics/${GCS_NOTIFICATION_TOPIC} \
--processExistingFiles=false" 

Any files that were added to the bucket before the pipeline was triggered are not de-identified.

De-identification results

You can find the de-identification findings in the BigQuery dataset, specified in the dataset parameter, with the same names as the respective input files.

If --processExistingFiles is set to false and --gcsNotificationTopic is not provided, then the pipeline fails with an error similar to the following:

Exception in thread "main" java.lang.IllegalArgumentException: Either --processExistingFiles should be set to true or --gcsNotificationTopic should be provided
        at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.runInspectAndDeidPipeline(DLPTextToBigQueryStreamingV2.java:113)
        at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.run(DLPTextToBigQueryStreamingV2.java:93)
        at com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2.main(DLPTextToBigQueryStreamingV2.java:84)

For information about the default values of the pipeline parameters, see Pipeline parameters on this page.

Validation

You can run some quick validations for a BigQuery table to check the tokenized data. The following steps use a sample dataset to validate de-identified results:

  1. In Cloud Shell, display the header row of the CSV file that you used to create the schema:

    gsutil cp gs://${PROJECT_ID}-demo-data/CCRecords_1564602825.csv - | head -1
    
  2. In the query editor of SQL workspace, run the following query to compare the schema with the header row of the CSV file:

    SELECT table_name, column_name
    FROM `<dataset_id>.INFORMATION_SCHEMA.COLUMNS`
    WHERE table_name="CCRecords_1564602825"
    

    There are no spaces in the column names because the pipeline ensures that the column and table names only contain valid characters according to the BigQuery naming standard.

  3. Validate that the number of rows in the input file and the output table are equal:

    Input records:

    echo $(($(gcloud storage cat gs://${PROJECT_ID}-demo-data/CCRecords_1564602825.csv | wc -l) - 1))
    

    Output records:

    SELECT COUNT(*) as number_of_rows
    FROM `<dataset_id>.CCRecords_1564602825` LIMIT 1
    
  4. Validate that the bucketing transformation is successfully applied to the JobTitle column:

    SELECT JobTitle, COUNT(*) AS number_of_records_found
    FROM `<dataset_id>.CCRecords_1564602825`
    GROUP BY JobTitle
    

    In the output, the JobTitle values should be grouped into three generalized buckets: Executive, Engineer, and Manager.

  5. Validate that values in the Age column are grouped into six different buckets from 60 to 20:

    SELECT Age, COUNT(*) AS number_of_records_found
    FROM `<dataset_id>.CCRecords_1564602825`
    GROUP BY Age
    ORDER BY Age DESC
    
  6. Validate the masking transformation for the SSN:

    SELECT SSN
    FROM `<dataset_id>.CCRecords_*`
    WHERE REGEXP_CONTAINS(SSN, "@*")
    

    In the output, the first five digits for all SSN entries should be masked.

  7. Validate that the cryptographic transformation used deterministic encryption for the card_holders_name, card_number and card_pin entries:

    SELECT Additional_Details
    FROM `<dataset_id>.CCRecords_*`
    WHERE REGEXP_CONTAINS(Additional_Details, r'(IBAN_CODE+)') or REGEXP_CONTAINS(Additional_Details, r'(EMAIL_ADDRESS+)')or regexp_contains(Additional_Details, r'(PHONE_NUMBER+)')or regexp_contains(Additional_Details, r'(ONLINE_USER_ID+)')
    

    In the output, sensitive values should be replaced with placeholder values such as [IBAN_CODE], [EMAIL_ADDRESS], [PHONE_NUMBER,] and [ONLINE_USER_ID].

  8. Query the de-identified copies of the dataset for the ID 76901:

    SELECT * FROM `<dataset_id>.CCRecords_1564602825` WHERE ID='76901'
    
  9. In Cloud Shell, compare the output from the previous step with the original dataset in the CSV file for the ID 76901:

    gsutil cp gs://${PROJECT_ID}-demo-data/CCRecords_1564602825.csv - | awk -F "," '$1 == 76901'
    

Re-identification from BigQuery

  1. Export a SQL query to read and re-identify data from BigQuery. The sample provided below selects 10 records that match the query.

    export QUERY="select ID,Card_Number,Card_Holders_Name from \`${PROJECT_ID}.${BQ_DATASET_NAME}.CCRecords_1564602828\` where safe_cast(Credit_Limit as int64)>100000 and safe_cast (Age as int64)>50 group by ID,Card_Number,Card_Holders_Name limit 10"
    
  2. Create a Cloud Storage file with the following query:

    export REID_QUERY_BUCKET=<name>
    cat << EOF | gsutil cp - gs://${REID_QUERY_BUCKET}/reid_query.sql
    ${QUERY}
    EOF
    
  3. Create a Pub/Sub topic. For more information, see create a topic.

  4. Run the pipeline by passing the required parameters:

    ./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 
    -Pargs="--region=${REGION} 
    --project=${PROJECT_ID} 
    --tempLocation=gs://${DATA_STORAGE_BUCKET}/temp 
    --numWorkers=1 --maxNumWorkers=2 
    --runner=DataflowRunner 
    --tableRef=${PROJECT_ID}:${BQ_DATASET_NAME}.<table> 
    --dataset=${BQ_DATASET_NAME} 
    --topic=projects/${PROJECT_ID}/topics/<topic_name> 
    --autoscalingAlgorithm=THROUGHPUT_BASED 
    --workerMachineType=n1-highmem-4 
    --deidentifyTemplateName=${REID_TEMPLATE_NAME} 
    --DLPMethod=REID 
    --keyRange=1024 
    --queryPath=gs://${REID_QUERY_BUCKET}/reid_query.sql 
    --serviceAccount=${SERVICE_ACCOUNT_EMAIL}"
    

    This command triggers a batch re-identification Dataflow pipeline that processes all the records from the query stored in reid_query.sql. The re-identified results can be found in the BigQuery dataset (dataset parameter) with the name of the input table as the suffix.

Pipeline Parameters

Pipeline Option Description Used in Operations
region Specifies a regional endpoint for deploying your Dataflow jobs. All
project The project ID for your Google Cloud project. All
streaming true if streaming pipeline. INSPECT/DEID
enableStreamingEngine Specifies whether Dataflow Streaming Engine is enabled or disabled. INSPECT/DEID
tempLocation Cloud Storage path for temporary files. Must be a valid Cloud Storage URL. All
numWorkers (Optional) The initial number of Compute Engine instances to use when executing your pipeline. This option determines how many workers the Dataflow service starts up when your job begins. All
maxNumWorkers (Optional) The maximum number of Compute Engine instances to be made available to your pipeline during execution. This value can be higher than the initial number of workers (specified by numWorkers) to allow your job to scale up, automatically or otherwise. All
runner DataflowRunner All
inspectTemplateName DLP API inspect template name. INSPECT/DEID
filePattern The file pattern that will be used to scan the files to be processed by the job. INSPECT/DEID
deidentifyTemplateName DLP de-identify template name. All
DLPMethod Type of DLP operation to perform. Valid values are INSPECT, DEID, or REID. All
processExistingFiles Files existing in the Cloud Storage bucket before the pipeline is started will not be processed when the value is set to false. The default value is true. INSPECT/DEID
gcsNotificationTopic Pub/Sub topic for notifications of new files added to the Cloud Storage bucket in filePattern. Format: projects/$PROJECT_ID>/topics/$GCS_TOPIC_NAME. INSPECT/DEID
batchSize (Optional) Batch size for the DLP API. The default is 500,000. All
dataset BigQuery dataset to write the inspection or de-identification results to or to read from in case of re-identification. All
recordDelimiter (Optional) Record delimiter. INSPECT/DEID
columnDelimiter Column delimiter. Only required in case of a custom delimiter. INSPECT/DEID
tableRef BigQuery table to export from in the form <project>:<dataset>.<table>. REID
queryPath Query file for re-identification. REID
headers DLP table headers. Required for the JSONL file type. INSPECT/DEID
numShardsPerDLPRequestBatching (Optional) Number of shards for DLP request batches. Can be used to control the parallelism of DLP requests. The default value is 100. All
numberOfWorkerHarnessThreads (Optional) The number of threads per each worker harness process. All
dlpApiRetryCount (Optional) Number of retries in case of transient errors in DLP API. The default value is 10. All
initialBackoff (Optional) Initial backoff (in seconds) for retries with exponential backoff. The default is 5s. All
outputBucket GCS path for storing the deidentified files DEID
DLPParent (Optional) The resource location for DLP templates. Format: projects/<project-id>/locations/<region>. By default,the location will be global. ALL

For more details, see Dataflow Pipeline Options.

Supported file formats

1. CSV

For sample commands for processing CSV files, see Run the samples.

2. TSV

The pipeline supports the TSV file format which uses tabs as column delimiters. The pipeline options are similar to that of CSV files. The extension of the file name should be .tsv.

./gradlew run ... -Pargs="... --filePattern=gs://<bucket_name>/small_file.tsv"

3. JSONL

The pipeline supports JSONL file format where each line is a valid JSON object and newline characters separate JSON objects. For a sample file, see the test resources. To run the pipeline for JSONL files, the list of comma-separated headers also needs to be passed in the pipeline options.

./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/CCRecords_sample.jsonl --headers=<comma_separated_list_of_headers>"

The original schema of the input file will not be preserved. This solution simplifies the data when it converts the data to a DLP API request object.

4. Avro

The pipeline handles Avro files similarly to how it handles CSV files. No additional changes are required to run the pipeline except updating the --filePattern parameter. For example:

./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/*.avro"

The original schema of the input file will not be preserved. This solution simplifies the data when it converts the data to a DLP API request object.

5. CSV files with custom delimiters

The pipeline supports CSV files with a custom delimiter. The delimiter has to be passed in the pipeline option as --columnDelimiter.

./gradlew run ... -Pargs="... --columnDelimiter=|"

5. Parquet

The inspection and de-identification pipelines support Parquet file format where data can be read from GCS storage bucket and the results of DLP Dataflow pipeline will be written in BigQuery tables. For sample data in Parquet file format, refer to mock-data.

No additional changes are required to run the pipeline except updating the --filePattern parameter. For example:

./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/*.parquet"

The original schema of the input file will not be preserved. This solution simplifies the data when it converts the data to a DLP API request object.

6. ORC

Inspection of ORC files

The inspection pipeline will read ORC files from an input Cloud Storage bucket and the results will be written in BigQuery tables.

./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/*.orc"
De-identification

The de-identification pipeline will read the data from an input Cloud Storage bucket. The de-identified results can be written in a BigQuery dataset as tables or an output Cloud Storage bucket as ORC files.

  1. To write the de-identified files to BigQuery tables:
./gradlew run ... -Pargs="... --filePattern=gs://${PROJECT_ID}-demo-data/*.orc"
  1. To write the de-identified files to a Cloud storage bucket:
./gradlew run ... -Pargs="... 
--filePattern=gs://${PROJECT_ID}-demo-data/*.orc \
--outputBucket=<output_storage_bucket> ..."

In the above command, replace output_storage_bucket with the URI of the Cloud Storage bucket where you want to store the de-identified ORC files. The de-identification pipeline allows input files with varying schemas to be processed in the same pipeline.

Currently, this solution can process only primitive data types available in ORC format when the results are stored in an output Cloud Storage bucket.

For sample data in ORC file format, refer to mock-data.

Amazon S3 Scanner

To use Amazon S3 as a source of input files, use AWS credentials as instructed below.

  1. Create an AWS access key.

  2. Export the AWS access key, secret key, and credentials provider to your environment variables.

    export AWS_ACCESS_KEY="<access_key>"
    export AWS_SECRET_KEY="<secret_key>"
    export AWS_CRED="{\"@type\":\"AWSStaticCredentialsProvider\",\"awsAccessKeyId\":\"${AWS_ACCESS_KEY}\",\"awsSecretKey\":\"${AWS_SECRET_KEY}\"}"
    
  3. Use Gradle to build and run the job to perform DLP operations on a CSV file stored in Amazon S3. The results will be written to BigQuery.

    Update the filePattern and awsRegion parameters with appropriate values in the following command.

    ./gradlew build
    
    // inspect is default as DLP Method; For deid: --DLPMethod=DEID
    ./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 -Pargs="
    --region=${REGION}  
    --project=${PROJECT_ID}
    --streaming --enableStreamingEngine 
    --tempLocation=gs://${PROJECT_ID}-demo-data/temp 
    --numWorkers=1 --maxNumWorkers=2 
    --runner=DataflowRunner 
    --filePattern=s3://<bucket>/file.csv 
    --dataset=${BQ_DATASET_NAME}  
    --inspectTemplateName=${INSPECT_TEMPLATE_NAME}  
    --deidentifyTemplateName=${DEID_TEMPLATE_NAME}
    --awsRegion=<aws_region> 
    --awsCredentialsProvider=${AWS_CRED} 
    --serviceAccount=$SERVICE_ACCOUNT_EMAIL"
    

Parameters

  • --awsRegion: The region where the AWS resources reside.

  • --awsCredentialsProvider: The AWS credentials provider.

GCS as sink for DEID

The pipeline offers GCS as a sink for the DEID workflow. Currently, it supports the deidentification of CSV files and outputs the files in CSV format.

Following is the command to deidentify existing CSV files. Update outputBucket parameter with the correct GCS path for storing the deidentified files.

./gradlew run -DmainClass=com.google.swarm.tokenization.DLPTextToBigQueryStreamingV2 \
-Pargs=" --region=${REGION} \
--project=${PROJECT_ID} \
--tempLocation=gs://${DATA_STORAGE_BUCKET}/temp \
--numWorkers=1 --maxNumWorkers=2 \
--runner=DataflowRunner \
--filePattern=gs://${DATA_STORAGE_BUCKET}/*.csv \
--inspectTemplateName=${INSPECT_TEMPLATE_NAME} \
--deidentifyTemplateName=${DEID_TEMPLATE_NAME} \
--batchSize=200000 \
--DLPMethod=DEID \
--outputBucket=gs://${DATA_STORAGE_BUCKET}/output \
--serviceAccount=${SERVICE_ACCOUNT_EMAIL}"

Parameters

  • --outputBucket: The GCS path for storing the deidentified files

Adapt this pipeline for your use cases

The DLP templates used in this tutorial are specifically tailored for inspecting, de-identifying, and re-identifying sample data containing simulated personally identifiable information (PII). It is important to note that when working with your data, you should create custom DLP templates that align with the characteristics of the data being processed. For information about creating your templates, see the following:

  1. Create your inspection templates and run the inspection on sample data
  2. Create de-identification templates and run de-identification on sample data

Run this pipeline outside Cloud Shell

Cloud Shell comes with a pre-configured development environment that includes many of the most popular tools and libraries, such as the Google Cloud CLI (gcloud CLI), Gradle, Java, Git, Vim, and more. This means that you don't have to spend time setting up your own environment on your local machine. If you want to run this solution from your own machine, follow these steps:

  1. Create a new Google Cloud project.
  2. Set up required tools and libraries to run the DLP Dataflow pipelines.
  3. Set up Application Default Credentials.
  4. Set the project id:
gcloud config set project <project_id>
  1. Create your credential file:
gcloud auth application-default login
  1. Set the project as a quota project:
gcloud auth application-default set-quota-project <project_id>
  1. Run the shell script:
sh setup-data-tokeninzation-solution-v2.sh
  1. Set the environment variables:
source set_env.sh
  1. Run the INSPECT/DEID/REID pipeline commands as described in the tutorial.

Required tools and libraries

This section provides a minimal list of tools and libraries along with their recommended versions that can be used to run this pipeline outside Google Shell.

  1. Install Java using OpenJDK (recommended version: 19.0.1 and above).
  2. Use Gradle Wrapper to build the project:
    ./gradlew build
    
  3. Set up Git.
  4. Install the Google Cloud CLI.
  5. Download jq.

Troubleshooting

The following are issues you might encounter while running the pipeline, and the ways you can avoid or recover from them.

  • Duplicate rows: When writing data to a BigQuery table, Cloud DLP might write duplicate rows.

    The project uses the Streaming Inserts API of BigQuery, which by default, enables a best-effort deduplication mechanism. For a possible solution, see High number of duplicates in Dataflow pipeline streaming inserts to BigQuery.

  • Errors in the transformation process. You can view the detailed errors in worker logs for the PTransform.

    1. INVALID_ARGUMENT: Too many findings to de-identify. Retry with a smaller request.

      DLP has a max finding per request limit of 3000. Run the pipeline again with a smaller batch size.

    2. RESOURCE_EXHAUSTED: Quota exceeded for quota metric 'Number of requests' and limit 'Number of requests per minute' of service 'dlp.googleapis.com'

      This can happen if the Dataflow pipeline is being run with a small batch size. Re-run the pipeline with a larger batch size.

      If increasing the batch size is not possible or if the issue persists even after reaching the maximum batch size, consider trying one of the following options:

      • The pipeline incorporates a retry mechanism for DLP API calls, utilizing an exponential delay approach. To enhance the retry behavior, you can adjust the value of the dlpApiRetryCount parameter. For more information, see the entries for dlpApiRetryCount and initialBackoff in Pipeline parameters.

      • The pipeline includes a parameter called numShardsPerDLPRequestBatching. Decreasing this value below the default (100) will result in a lower number of concurrent requests sent to DLP.

      • Verify if any other pipelines or clients are generating DLP API requests.

      • Consider submitting a request to increase the quota limit for Sensitive Data Protection.

Dataflow DAG

Inspection and de-identification

v2_dag_

Re-identification

v2_dag_

Advanced topics

  • Dataflow templates allow you to package a Dataflow pipeline for deployment. Instead of having to build the pipeline every time, you can create Flex Templates and deploy the template by using the Google Cloud console, the Google Cloud CLI, or REST API calls.
  • For more details, see Dataflow templates and Flex Templates.

Some considerations

  • The behavior of the pipeline is dependent on factors such as the length of the record, the number of findings per record, the DLP API quota on the project, and other applications or pipelines generating DLP API traffic.
  • You may need to adjust the parameters mentioned above. If you encounter errors, see Troubleshooting on this page.
  • Most errors observed in the pipeline indicate that the parameters need to be adjusted. However, there may be error scenarios that the pipeline doesn't currently handle and these may require code changes.
  • Due to the reasons mentioned above, this solution should not be considered production-ready.
Open Source Agenda is not affiliated with "Dlp Dataflow Deidentification" Project. README Source: GoogleCloudPlatform/dlp-dataflow-deidentification
Stars
85
Open Issues
18
Last Commit
4 weeks ago
License

Open Source Agenda Badge

Open Source Agenda Rating