Ad

Error While Importing WriteToDatastore (Apache Beam/Google DataFlow)

I'm trying to use an Apache Beam pipeline to write entities to the Google Cloud Datastore. For testing, I'm doing this in a local Python 2.7 virtual environment that is setup using the Apache Beam instructions. Coding is done in a Jupyter notebook locally. Here's the pseudo code I'm trying:

import apache_beam as beam
#from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud import datastore
from google.cloud.datastore.entity import Entity

# projectId will be taken from the environment
storage = datastore.Client()

# The kind for the new entity
gds_entity_kind = 'test_entity'


class PrintFn(beam.DoFn):
    def process(self, element):
        print (element)
        return None


def create_entity(entity_id, name):
    key = storage.key(gds_entity_kind, int(entity_id))

    entity = Entity(key=key)
    entity.update({
        'name': name
    })

    return entity


lines = [
    "'0815';'entity A'",
    "'4711';'entity B'"
]

with beam.Pipeline() as p:
    (p
     | 'read lines' >> beam.Create(lines)
     | 'rows to columns' >> beam.Map(lambda v: v.split(';'))
     | 'remove quotes' >> beam.Map(lambda words: [word.strip('\'') for word in words])
     | 'create entities' >> beam.Map(lambda fields: create_entity(*fields))
#     | 'write to datastore' >> WriteToDatastore()
     | 'debug print' >> beam.ParDo(PrintFn())
    )

I found this posting with the similar problem, but somewhat the answer seems unrelated to my situation?! I figure out that the problem seems to be related to the import statement

from google.cloud import datastore

in relation to

from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore

If I restart the kernel and import the WriteToDataStore only, then I don't get any error. If I try to import both, I get this error. Any help is appreciated!

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-10-45d84b2c60ba> in <module>()
      1 import apache_beam as beam
----> 2 from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
      3 from google.cloud import datastore
      4 from google.cloud.datastore.entity import Entity
      5 

/usr/local/lib/python2.7/site-packages/apache_beam/io/gcp/datastore/v1/datastoreio.py in <module>()
     21 import time
     22 
---> 23 from apache_beam.io.gcp.datastore.v1 import helper
     24 from apache_beam.io.gcp.datastore.v1 import query_splitter
     25 from apache_beam.io.gcp.datastore.v1 import util

/usr/local/lib/python2.7/site-packages/apache_beam/io/gcp/datastore/v1/helper.py in <module>()
     34 # pylint: disable=wrong-import-order, wrong-import-position
     35 try:
---> 36   from google.cloud.proto.datastore.v1 import datastore_pb2
     37   from google.cloud.proto.datastore.v1 import entity_pb2
     38   from google.cloud.proto.datastore.v1 import query_pb2

/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/cloud/proto/datastore/v1/datastore_pb2.py in <module>()
     15 
     16 from google.api import annotations_pb2 as google_dot_api_dot_annotations__pb2
---> 17 from google.cloud.proto.datastore.v1 import entity_pb2 as google_dot_cloud_dot_proto_dot_datastore_dot_v1_dot_entity__pb2
     18 from google.cloud.proto.datastore.v1 import query_pb2 as google_dot_cloud_dot_proto_dot_datastore_dot_v1_dot_query__pb2
     19 

/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/cloud/proto/datastore/v1/entity_pb2.py in <module>()
     26   serialized_pb=_b('\n,google/cloud/proto/datastore/v1/entity.proto\x12\x13google.datastore.v1\x1a\x1cgoogle/api/annotations.proto\x1a\x1cgoogle/protobuf/struct.proto\x1a\x1fgoogle/protobuf/timestamp.proto\x1a\x18google/type/latlng.proto\"7\n\x0bPartitionId\x12\x12\n\nproject_id\x18\x02 \x01(\t\x12\x14\n\x0cnamespace_id\x18\x04 \x01(\t\"\xb7\x01\n\x03Key\x12\x36\n\x0cpartition_id\x18\x01 \x01(\x0b\x32 .google.datastore.v1.PartitionId\x12\x32\n\x04path\x18\x02 \x03(\x0b\x32$.google.datastore.v1.Key.PathElement\x1a\x44\n\x0bPathElement\x12\x0c\n\x04kind\x18\x01 \x01(\t\x12\x0c\n\x02id\x18\x02 \x01(\x03H\x00\x12\x0e\n\x04name\x18\x03 \x01(\tH\x00\x42\t\n\x07id_type\"8\n\nArrayValue\x12*\n\x06values\x18\x01 \x03(\x0b\x32\x1a.google.datastore.v1.Value\"\xf1\x03\n\x05Value\x12\x30\n\nnull_value\x18\x0b \x01(\x0e\x32\x1a.google.protobuf.NullValueH\x00\x12\x17\n\rboolean_value\x18\x01 \x01(\x08H\x00\x12\x17\n\rinteger_value\x18\x02 \x01(\x03H\x00\x12\x16\n\x0c\x64ouble_value\x18\x03 \x01(\x01H\x00\x12\x35\n\x0ftimestamp_value\x18\n \x01(\x0b\x32\x1a.google.protobuf.TimestampH\x00\x12-\n\tkey_value\x18\x05 \x01(\x0b\x32\x18.google.datastore.v1.KeyH\x00\x12\x16\n\x0cstring_value\x18\x11 \x01(\tH\x00\x12\x14\n\nblob_value\x18\x12 \x01(\x0cH\x00\x12.\n\x0fgeo_point_value\x18\x08 \x01(\x0b\x32\x13.google.type.LatLngH\x00\x12\x33\n\x0c\x65ntity_value\x18\x06 \x01(\x0b\x32\x1b.google.datastore.v1.EntityH\x00\x12\x36\n\x0b\x61rray_value\x18\t \x01(\x0b\x32\x1f.google.datastore.v1.ArrayValueH\x00\x12\x0f\n\x07meaning\x18\x0e \x01(\x05\x12\x1c\n\x14\x65xclude_from_indexes\x18\x13 \x01(\x08\x42\x0c\n\nvalue_type\"\xbf\x01\n\x06\x45ntity\x12%\n\x03key\x18\x01 \x01(\x0b\x32\x18.google.datastore.v1.Key\x12?\n\nproperties\x18\x03 \x03(\x0b\x32+.google.datastore.v1.Entity.PropertiesEntry\x1aM\n\x0fPropertiesEntry\x12\x0b\n\x03key\x18\x01 \x01(\t\x12)\n\x05value\x18\x02 \x01(\x0b\x32\x1a.google.datastore.v1.Value:\x02\x38\x01\x42\x82\x01\n\x17\x63om.google.datastore.v1B\x0b\x45ntityProtoP\x01Z<google.golang.org/genproto/googleapis/datastore/v1;datastore\xaa\x02\x19Google.Cloud.Datastore.V1b\x06proto3')
     27   ,
---> 28   dependencies=[google_dot_api_dot_annotations__pb2.DESCRIPTOR,google_dot_protobuf_dot_struct__pb2.DESCRIPTOR,google_dot_protobuf_dot_timestamp__pb2.DESCRIPTOR,google_dot_type_dot_latlng__pb2.DESCRIPTOR,])
     29 _sym_db.RegisterFileDescriptor(DESCRIPTOR)
     30 

/usr/local/Cellar/python/2.7.11/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/google/protobuf/descriptor.pyc in __new__(cls, name, package, options, serialized_pb, dependencies, public_dependencies, syntax, pool)
    827         # TODO(amauryfa): use the pool passed as argument. This will work only
    828         # for C++-implemented DescriptorPools.
--> 829         return _message.default_pool.AddSerializedFile(serialized_pb)
    830       else:
    831         return super(FileDescriptor, cls).__new__(cls)

TypeError: Couldn't build proto file into descriptor pool!
Invalid proto descriptor for file "google/cloud/proto/datastore/v1/entity.proto":
  google.datastore.v1.PartitionId.project_id: "google.datastore.v1.PartitionId.project_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.PartitionId.namespace_id: "google.datastore.v1.PartitionId.namespace_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.PartitionId: "google.datastore.v1.PartitionId" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.partition_id: "google.datastore.v1.Key.partition_id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.path: "google.datastore.v1.Key.path" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement.id_type: "google.datastore.v1.Key.PathElement.id_type" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement.kind: "google.datastore.v1.Key.PathElement.kind" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement.id: "google.datastore.v1.Key.PathElement.id" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement.name: "google.datastore.v1.Key.PathElement.name" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.PathElement: "google.datastore.v1.Key.PathElement" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key: "google.datastore.v1.Key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.ArrayValue.values: "google.datastore.v1.ArrayValue.values" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.ArrayValue: "google.datastore.v1.ArrayValue" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.value_type: "google.datastore.v1.Value.value_type" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.null_value: "google.datastore.v1.Value.null_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.boolean_value: "google.datastore.v1.Value.boolean_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.integer_value: "google.datastore.v1.Value.integer_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.double_value: "google.datastore.v1.Value.double_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.timestamp_value: "google.datastore.v1.Value.timestamp_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.key_value: "google.datastore.v1.Value.key_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.string_value: "google.datastore.v1.Value.string_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.blob_value: "google.datastore.v1.Value.blob_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.geo_point_value: "google.datastore.v1.Value.geo_point_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.entity_value: "google.datastore.v1.Value.entity_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.array_value: "google.datastore.v1.Value.array_value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.meaning: "google.datastore.v1.Value.meaning" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value.exclude_from_indexes: "google.datastore.v1.Value.exclude_from_indexes" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Value: "google.datastore.v1.Value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.key: "google.datastore.v1.Entity.key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.properties" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry.key: "google.datastore.v1.Entity.PropertiesEntry.key" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Entity.PropertiesEntry.value" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity.PropertiesEntry: "google.datastore.v1.Entity.PropertiesEntry" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Entity: "google.datastore.v1.Entity" is already defined in file "google/cloud/datastore_v1/proto/entity.proto".
  google.datastore.v1.Key.partition_id: "google.datastore.v1.PartitionId" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Key.path: "google.datastore.v1.Key.PathElement" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.ArrayValue.values: "google.datastore.v1.Value" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.key_value: "google.datastore.v1.Key" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.entity_value: "google.datastore.v1.Entity" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Value.array_value: "google.datastore.v1.ArrayValue" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.PropertiesEntry.value: "google.datastore.v1.Value" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.key: "google.datastore.v1.Key" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
  google.datastore.v1.Entity.properties: "google.datastore.v1.Entity.PropertiesEntry" seems to be defined in "google/cloud/datastore_v1/proto/entity.proto", which is not imported by "google/cloud/proto/datastore/v1/entity.proto".  To use it here, please add the necessary import.
Ad

Answer

I figured out that the programming concept for using Google Cloud Datastore with Apache Beam (Google Cloud Dataflow) differs from the default Datastore API.

You need to use the Datastore helper as given in this example. With this I was able to changed my code that now successfully runs. Notice the different import for the entity and the different entity creation process.

In summary it creates the entity in the full qualified JSON notation that you will also see when exploring the Datastore in the Google Cloud Console. My original code created the entity in a much more simpler JSON that is usually also understood when writing to the Datastore. Overall I avoided the different dependencies of my original two different imports that caused the error.

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper as datastore_helper, PropertyFilter

#from google.cloud import datastore
#from google.cloud.datastore.entity import Entity


def create_entity(_id, name):
    entity = entity_pb2.Entity()
    kind = "test_entity"

    datastore_helper.add_key_path(entity.key, kind, _id)

    datastore_helper.add_properties(entity, {
            "name": unicode(name)
        })

    return entity

class PrintFn(beam.DoFn):
    def process(self, element):
        print (element)
        return None

project_id = 'your-gcp-project-id'

lines = [
    "'0815';'entity A'",
    "'4711';'entity B'"
]

with beam.Pipeline() as p:
    (p
     | 'read lines' >> beam.Create(lines)
     | 'rows to columns' >> beam.Map(lambda v: v.split(';'))
     | 'remove quotes' >> beam.Map(lambda words: [word.strip('\'') for word in words])
     | 'create entities' >> beam.Map(lambda fields: create_entity(*fields))
     | 'write to datastore' >> WriteToDatastore(project_id)
#     | 'debug print' >> beam.ParDo(PrintFn())
    )
Ad
source: stackoverflow.com
Ad