4 min read

Integrating BigQuery and Datastore in GCP

Cloud technology is accelerating the rate of business. Set your business up for success with the dynamic tools the Google Cloud Platform (GCP) offers. Promevo helps analyze your business needs and pairs you with the right tools and resources to modernize your business and accomplish goals. 


Capitalize on Google's Business Intelligence (BI) Tools 

Datastore and BigQuery are specifically designed to optimize data storage and help implement BI strategies. They allow app developers and business analysts the flexibility to manipulate data to optimize business functions and performance.

Leveraging Google's business intelligence tools requires migrating data from the data source to the Google Cloud Platform. Promevo guides organizations through uploading data from on-premises warehouses to GCP. Uploading data to the cloud provides scalability; it also significantly reduces the costs of data storage. 


Business Intelligence with Datastore and BigQuery

Datastore for AppDevs

Datastore supports application developers by offering a NoSQL solution. App developers can store and read data without building a relational database. Since Datastore is fully-managed, developers can divert more focus to application building. 

As new data is introduced, Datastore scales automatically and increases efficiency. Google's data centers use redundancy for a highly available database with minimized points of failure.

Use Datastore for:

  • Storing product catalogs and providing real-time details for a retailer 
  • Offering customers a personalized experience with capabilities to store customer information like preferences and past activities
  • Execute transactions based on ACID properties to ensure the safe completion of tasks like bank transfers

BigQuery for BI Analysts

BigQuery was created for analysts using an analytical database. BI teams use relational databases, which aren't built for data warehouses. With BigQuery, analysts use the cloud to optimize datasets for a quicker processing time and reduce the cost of data storage. 

BigQuery allows analysts:

  • On-demand access to the latest data
  • Data storage optimization using AI systems
  • Faster queries with a flexible architecture

Integrating BigQuery and Datastore using Python

Integrating BigQuery and Datastore combines the functionality of both. Your teams will be better equipped to manipulate and store data to optimize business functions. 

Promevo takes on the role of implementing integration using Python. Follow along to complete the steps yourself or reach out to a Promevo engineering expert for help.

Creating The Pipeline

This process has four steps:

  • Create a Pub/Sub topic
  • Build a Cloud Scheduler for the Pub/Sub topic
  • Write a Cloud Function with a trigger on the Pub/Sub topic
  • Verify successful write to BigQuery

To begin, create a Pub/Sub topic. Pub/Sub is a powerful messaging system, and we are going to use it as a facilitator for our project.


Next, is the main section of the project: the custom Python code to write from your Datastore kind to BigQuery. To do this, we will create a Cloud Function. In this example,  the function was named DatastoreToBigQuery and we left the settings as default. 

pasted image 0

Once you complete the initial configuration, the next step is to code the procedure. As you can see below, there are a few choices to make on the code window. 

There are two required files that Cloud Functions creates when you select this option. First is the main.py file. This is where you will place your Python script. 

py File Capture

Secondly, is the requirements.txt file. This is required to run Cloud Function Python packages that are not included in the default configuration. 

What’s In The Script?

Let us further explore the script:

def listen_for_pubsub(event, context):
    import pandas as pd
    import datetime
    from google.cloud import datastore
    from google.cloud import bigquery
    #find max date currently in BQ.
    client = bigquery.Client()
    query = """select max(timestamp_in_ms) max_ts 
      from your_project.your_dataset.DatastoreInBigQuery`"""
    query_job = client.query(query)  # Make an API request.
    for row in query_job:
    max_ts = row["max_ts"] if row["max_ts"] is not None else 0
    print('Most recentin BQ:' + str(datetime.datetime.fromtimestamp(max_ts/1000.0)))
    #find all records in datastore that are after max 
    client = datastore.Client()
    query = client.query(kind="MyDatastoreData")
    query.add_filter("timestamp_in_ms", ">", max_ts)
    results = list(query.fetch())
    df = pd.DataFrame(results)
    print(str(len(df.index)) + ' new records found in Datastore!')
    # add new records to BQ only if there are new records
    if len(df.index) > 0:
        client = bigquery.Client()
        table = 'your_dataset.DatastoreInBigQuery'
        job_config = bigquery.LoadJobConfig(
                bigquery.SchemaField("random_data", "STRING"),
                bigquery.SchemaField("timestamp_in_ms", "INTEGER"),
        job = client.load_table_from_dataframe(df, table, job_config=job_config)
    print(str(len(df.index)) + ' rows written to BQ!')

The above script is a simple procedure definition. After importing the required Python libraries, a BigQuery client was created to query the data. This step is for one purpose: finding the maximum date in the BigQuery data. Feel free to use a different key or Datastore metadata. 

The next section creates a Datastore client and simply finds all records that exist after the end point we discovered in the first section. 

The code will now check to only run if there are new rows. If there is new data, it adds the rows into your BigQuery table. 

Caveats: when developing, examine the Pandas dataframe and take note of the order of the fields. When you specify the BigQuery schema, make sure you add the data in the correct order.  Also, these instructions assume you have already created the destination BigQuery table. 

Code Screenshot

One other tip: when developing the code, run the script without the function definition. Also, include two extra lines of code which allow you to authenticate the script locally via a service account JSON private key.

import os os.environ["GOOGLE_APPLICATION_CREDENTIALS"]=r"C:\your-credential-file.json"

The final step is to create the scheduler. The configuration is fairly straightforward, where the only required variables are the name of the Pub/Sub topic and your scheduled frequency. The format for the frequency is in unix-cron notation. Use crontab.guru/examples.html for some easy-to-understand options.

Define schedule 1
Cloud Scheduler

The Cloud Function

In summary, the Cloud Scheduler is sending a Pub/Sub message every five minutes. The message does not include anything important, it is only a proxy for triggering the Cloud Function process. The Cloud Function is where all of the magic happens via a simple Python script. The function finds the time boundary and uses that to move only new records since the last runtime. Then finally, the script adds the new data to BigQuery. 

Integration Completed

The job will now run every five minutes to capture new records from the Datastore kind. Test this process by adding a few entities to test. If the process fails, check the Cloud Function logs. Also, please ensure you have the correct roles configured for each step. 

Query Results

Promevo can help define a scope for this process for your business. Our Google experts and engineering consultants offer technical support for every phase of the business intelligence journey. 


Why Promevo

Promevo is your trusted service partner. We work with you to understand your unique needs and develop a custom solution designed to scale with your business as it grows. 

Advantages include:

  • End-to-end Solutions Specific to Your Needs - We provide our clients with everything they need for their Google Cloud journey. You can expect solutions customized to your business needs for Cloud Account Management, App Modernization, Data Engineering & Analytics, and Infrastructure Modernization.

  • Advisory Workshops - We leverage our Google expertise to guide our clients and maximize success in Google Cloud. From strategy assessment to internal advocacy and thought leadership, Promevo's advisory workshops are designed to produce the best outcomes.

  • Certifications and Google Expertise - Our Google Certified Engineer team supports our clients with unparalleled technical support and Google expertise.

  • Packaged Solutions - Promevo offers a 4-week engagement to develop a business intelligence dashboard and connect our client’s real data with up to 4 data sources. This data analytics packaged solution includes assessment, proof of concept, and a transition plan.

Our certified professionals can design solutions around data warehousing, analytics, and business intelligence. Promevo can help you develop your data strategy and then select and implement the right data technologies for your business needs.


New call-to-action