Python crate using multiple cursors parallel

Hello guys

The documentation says to initialize a connection and create cursors and then use them to execute queries and fetch data. Can multiple cursors been created and do jobs parallel? Or is there another approach to parallelize executing queries?

Thanks for your help!
Regards Schabi

1 Like

Hi @SchabiDesigns

The crate-python library is thread-safe only at the module level, not at the connection or cursor level. Additionally, since the DB API lacks an asynchronous specification, it does not yet support executing queries asynchronously. To use crate-python in parallel, you must open multiple connections.

If you need async execution, you might want to look into using asyncpg or psycopg3 using the pg wire protocol port (5432)

2 Likes

Thanks for your help @proddata

The documentation helped me to fetch some data from the database.

How would you establish a pool feature from the asyncpg package?

Some Background: I use streamlit in combination with crateDB. streamlit provide some drivers for different database solutions. I would implement one for crateDB, but not sure how it has to be done properly.

1 Like

Dear Schabi,

Excellent. Thanks. Please let us know any time where details can be improved.

Sorry for not displaying a verbatim example demonstrating this use case. Are you able to make it work by combining relevant details appropriately, or do you observe any errors when doing so?

We also would like to see CrateDB working well together with Streamlit, so we appreciate you sharing more details about your use case with us. Did you already try to make it work using Streamlit’s PostgreSQL database adapter?

With kind regards,
Andreas.

Thanks @amotl

  • Documentation is good like it is :slight_smile:
  • I also managed to use the pool feature from the asyncpg package, which wasn’t a big deal.
  • Nice to hear that you would like to have crateDB working with streamlit. :star_struck:

I tried the PostgreSQL database adapter you mentioned before, but faced some errors.

sqlalchemy.exc.InternalError: (psycopg2.errors.InternalError_) line 1:1: mismatched input 'ROLLBACK' expecting {'SELECT', 'DEALLOCATE', 'FETCH', 'END', 'WITH', 'CREATE', 'ALTER', 'KILL', 'CLOSE', 'BEGIN', 'START', 'COMMIT', 'ANALYZE', 'DISCARD', 'EXPLAIN', 'SHOW', 'OPTIMIZE', 'REFRESH', 'RESTORE', 'DROP', 'INSERT', 'VALUES', 'DELETE', 'UPDATE', 'SET', 'RESET', 'COPY', 'GRANT', 'DENY', 'REVOKE', 'DECLARE'}
CONTEXT:  io.crate.exceptions.SQLExceptions.esToCrateException(SQLExceptions.java:211)
io.crate.exceptions.SQLExceptions.prepareForClientTransmission(SQLExceptions.java:200)
io.crate.protocols.postgres.Messages.sendErrorResponse(Messages.java:189)
io.crate.protocols.postgres.PostgresWireProtocol.handleSimpleQuery(PostgresWireProtocol.java:788)
io.crate.protocols.postgres.PostgresWireProtocol$MessageHandler.dispatchMessage(PostgresWireProtocol.java:340)
io.crate.protocols.postgres.PostgresWireProtocol$MessageHandler.dispatchState(PostgresWireProtocol.java:330)
io.crate.protocols.postgres.PostgresWireProtocol$MessageHandler.channelRead0(PostgresWireProtocol.java:298)
io.crate.protocols.postgres.PostgresWireProtocol$MessageHandler.channelRead0(PostgresWireProtocol.java:282)
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)


(Background on this error at: https://sqlalche.me/e/20/2j85)

I also took a look into st.connection and there is also a youtube-video how to add a connection to any database, but not sure how it has to be done :face_with_peeking_eye:

1 Like

Hi again,

Thank you for sharing your outcome. This error message usually indicates some other error happened where CrateDB does not exactly behave like PostgreSQL, the error is propagated to the application, and it issues a ROLLBACK, which also does not work but finally bubbles up the stack to be presented to the user.

In other words, another query fails, and the error report is getting masked by the exception you’ve shared.

With kind regards,
Andreas.

@proddata was so kind to whip up a quick example at GitHub - proddata/streamlit-cratedb. It looks like it validates that CrateDB is able to work successfully with Streamlit. You may want to exercise this application and maybe start from there when applicable?

However, I also admit it does not exactly satisfy your specific questions yet, about database connection pool concerns, about support for efficient and safe parallel access, or support for Python’s asyncio. On the Python DB API driver which is using the HTTP transport, we are adjusting the thread safety advertisement with Configure DB API interface attribute `threadsafety = 1`: Threads may share the module, but not connections by amotl · Pull Request #635 · crate/crate-python · GitHub. So, when following relevant constraints, by maintaining a dedicated connection per thread or process, parallel access should work well.

On the side of support for asyncio, because Streamlit uses SQLAlchemy to connect to SQL databases, that other patch may well be able to make a difference if async operations on the database level are supported by Streamlit: Dialect: Add support for `asyncpg` and `psycopg3` drivers by amotl · Pull Request #11 · crate/sqlalchemy-cratedb · GitHub. It hasn’t been mainlined yet, but the PR description includes instructions how to install the improvement directly from the feature branch. Documentation is missing yet, but the change log item conveys the ingredients of the patch:

Added support for psycopg and asyncpg drivers, by introducing the crate+psycopg://, crate+asyncpg://, and crate+urllib3:// dialect identifiers. The asynchronous variant of psycopg is also supported.

Actually it “simply works” when adding sqlalchemy-cratedb to your requirements.txt and specifying secrets like

sqlalchemy
sqlalchemy-cratedb
streamlit
# .streamlit/secrets.toml

[connections.cratedb]
dialect = "crate"
host = "localhost"
port = "4200"
username = "crate"
password = ""

then you can just

conn = st.connection("cratedb", type="sql")
query_result = conn.query(query, ttl="10m")

I updated GitHub - proddata/streamlit-cratedb … did not know it was that simple :wink:

2 Likes

Thanks for your help Georg and Andreas!
Not sure if I should split this topic and if how it should be done… There are kind of 2 problems and 2 solutions :slight_smile:

1 Like

Hi again.

You are welcome. Please ask further questions any time. Based on your suggestions, and @proddata’s investigations, we added those two resources to our documentation and code snippets.

No worries about splitting this. While I admit the conversation mixes different topics, I think it will be too difficult to split in this case. If you want, you may want to rephrase the title and fragments of your original post, at your disposal.

Cheers,
Andreas.

1 Like

Hi Georg,

do you know how to add multiple hosts so that a kind of load balancer is in place?
Or do I need to setup a load balancer manually on the cluster and use that access point?

Regards Pascal

I think connection() takes kwargs** and sqlalchemy-cratedb supports a servers argument, but I am not sure how they need to be provided. Maybe @amotl knows :grimacing:

Thanks Georg :slight_smile:

I just added the servers list to the toml file but host have to be included as well. Due to that I have no information if any load balancing will be happening or not.
@amotl Andreas is there any way to get that information? I found that method active_servers() within the source code of the http client but I think I have no access from within streamlit:

Thanks for your help guys!

Dear Pascal,

Based on this modified synopsis example code at Access list of active server connections to CrateDB when using SQLAlchemy · GitHub, you can observe it’s there, but the requested attribute is a bit nested.

So, effectively, just use connection.connection.dbapi_connection.client.active_servers, where the first connection symbol is a reference to an sa.engine.Connection object, like outlined in the example above.

With kind regards,
Andreas.

1 Like

Thanks Andreas @amotl
That helped a lot!
I used following to check the active servers:

conn = st.connection(“crateDB”, type=“sql”)
conn_alchemy_engine = conn.connect()
conn_alchemy_engine.connection.dbapi_connection.client.active_servers()

This results in only one element, which I specified in the toml file under host.
The servers property will be ignored and if I try to add multiple hosts this also fails from streamlit side. I also tried a stupid approach to add servers with the append method:

conn_alchemy_engine.connection.dbapi_connection.client.active_servers.append([list_of_servers])

which had no effect.

Thanks for your help
Regards

1 Like

Hi Pascal,

I’ve created a corresponding issue at Using multiple servers/endpoints · Issue #146 · crate/sqlalchemy-cratedb · GitHub, in order to investigate the problem. Please let us know about the urgency of this matter on your side [1], so we can optimally plan accordingly.

With kind regards,
Andreas.


  1. NB/JFYI: Because those servers would be used in a round-robin way, just configuring multiple of them definitively does not increase concurrency.

    If you are currently mostly looking into getting efficient communications right, this issue may not be a blocker for you right away.

    In a cluster setup, with multiple nodes available, adding all or some of them may increase resiliency on the connectivity aspects, that’s true. But it will not increase performance. ↩︎

1 Like

Hi Andreas
There is a work around so it is not that important. :wink:
→ I think I will go for a load balancer and use that as entry point for the cluster. Do you have a recommendation?
Regards