Ad

Cassandra's Execute_concurrent Not Working As It Should

- 1 answer

I have written a Python script which reads csv and inserts them into the Cassandra table using both async and concurrent but concurrent is slower than async. My aim to use concurrent was to achieve parallel writes and hence, speed up the task of indexing the csv file in cassandra.

Code using async:

for df in chunks:
            futures = []
            df = df.to_dict(orient='records')
            chunk_counter += 1
            for row in df:
                key = str(row["0"])
                row = json.dumps(row, default=str)
                futures.append(
                    self.session.execute_async(
                        insert_sql, [key, "version_1", row]
                    )
                )
                # batch.add(insert_sql, (key, "version_1", row))
            # self.session.execute(batch)
            for future in futures:
                self.log.debug(future)
                continue

Code using concurrents:

for df in chunks:
            futures = []
            df = df.to_dict(orient='records')
            chunk_counter += 1
            for row in df:
                key = str(row["0"])
                row = json.dumps(row, default=str)
                params = (key, row, )
                futures.append(
                    (
                        insert_sql,
                        params
                    )
                )
            results = execute_concurrent(
                self.session, futures, raise_on_first_error=False)
            for (success, result) in results:
                if not success:
                    self.handle_error(result)  # result will be an Exception
Ad

Answer

You're not setting the concurrency parameter of the execute_concurrent, and by default it uses 100.

From documentation:

The concurrency parameter controls how many statements will be executed concurrently. When Cluster.protocol_version is set to 1 or 2, it is recommended that this be kept below 100 times the number of core connections per host times the number of connected hosts (see Cluster.set_core_connections_per_host()). If that amount is exceeded, the event loop thread may attempt to block on new connection creation, substantially impacting throughput. If protocol_version is 3 or higher, you can safely experiment with higher levels of concurrency.

Ad
source: stackoverflow.com
Ad