Ingest with python asyncpg

Hi guys

I have some issues to ingest data with python and the asyncpg package. I have a dataframe with following datatypes:
int32, datetime64[ns, UTC], object, float64, int32

…and want to ingest with:

await connection.copy_records_to_table(my_table_name, records=df.itertuples(index=False), columns=df.columns.to_list(), timeout=10)

I receive following error:

---------------------------------------------------------------------------
InternalServerError                       Traceback (most recent call last)
Cell In[229], line 5
      1 connection = await pool.acquire(timeout=60)
      3 table = "dataset"
----> 5 s = await connection.copy_records_to_table(table, records=df.itertuples(index=False), columns=df.columns.to_list(), timeout=10)

File c:\Users\SCPA\Azure_DevOps\ALD_Project\venv_app\Lib\site-packages\asyncpg\connection.py:1081, in Connection.copy_records_to_table(self, table_name, records, columns, schema_name, timeout, where)
   1076 opts = '(FORMAT binary)'
   1078 copy_stmt = 'COPY {tab}{cols} FROM STDIN {opts} {cond}'.format(
   1079     tab=tabname, cols=cols, opts=opts, cond=cond)
-> 1081 return await self._protocol.copy_in(
   1082     copy_stmt, None, None, records, intro_ps._state, timeout)

File c:\Users\SCPA\Azure_DevOps\ALD_Project\venv_app\Lib\site-packages\asyncpg\protocol\protocol.pyx:565, in copy_in()

InternalServerError: line 1:114: extraneous input 'binary' expecting {',', ')'}

I use following versions:
asyncpg 0.29.0
sqlalchemy-cratedb 0.37.0

BTW: the ingestion with your crate package works fine, but I would like to switch to asyncpg because its async, I can use parallel connections and an connection pool. What I have seen for the crate package that when you define multiple hosts in the connection and one of them will be disconnected it will removed from the list and not automatically included again if connection to the node is again available.

Thanks for your help in advance :slight_smile:
Regards Schabi

1 Like

Dear @SchabiDesigns,

thanks for your report. We will need have a look into both issues you are observing, about asyncpg’s copy_records_to_table, and about the DB API HTTP driver not bringing back connections to nodes dropping out of the registry.

The most important information for you is that CrateDB does not support COPY ... FROM STDIN yet, see COPY FROM STDIN · Issue #12952 · crate/crate · GitHub. So, because copy_records_to_table apparently uses that, it will not work until CrateDB will provide corresponding support.

The most efficient way to insert many records in bulk/batch mode, if you are looking into that direction, is to use CrateDB’s bulk endpoint.

With kind regards,
Andreas.

Based on your request and questions, I see that this code path, as supported by the Python DB API driver and the SQLAlchemy dialect for example, with corresponding support for pandas/Dask, as outlined above, does not support async styles yet. Nevertheless, it is still the most efficient way to run high-performance ingests into CrateDB, and supports parallel operations well.

Thanks for sharing this important bit of information. Based on that, and depending on your environment, whether you absolutely need async support or not, I would like to encourage you to try out this interface, i.e. to use pandas/Dask together with the SQLAlchemy insert_bulk performance support method.

https://cratedb.com/docs/sqlalchemy-cratedb/support.html#support-insert-bulk

If you are looking further into parallel processing of dataframes, the information on this page might also be relevant for you.

DataFrame operations — SQLAlchemy Dialect

Would that code fragment as a whole even support asynchronous I/O operations well? Does df.itertuples() support them, because it is actually a generator? Which dataframe library are you using?

connection.copy_records_to_table(
  my_table_name, records=df.itertuples(index=False), ...)

Thanks Andreas :slight_smile:

Well than I stay with that to ingest data. :+1:

1 Like

I use pandas and I am not sure how asyncpg will be handle that. I did some performance tests by fetching lot of data from crateDB and asyncpg have been much faster (200%) compared to crate for my test case. BUT only because I can split a big query in multiple small ones. So I thought to test it also for ingesting. Anyway ingesting works fine with the crate package till now.

1 Like

yes would be great if you could take a look into that. Sure I could make a routine which take care of that, but would be great to have it already included as standard. :wink:

1 Like