Just wondering if anyone had a good solution for dealing with highly volatile data before storing the final results into CrateDB.
Up until now I’ve just been storing every record in CrateDB and then manually deleting old data but this seems non-optimal due to deleted records just being tombstoned.
I’m looking for something that could sustain many updates (180-sh) over a few minutes and then push the final result into Crate. Currently evaluating redis, but a little concerned about it purging data before it’s written to CrateDB if we get a large spike in utilization.
The redis solution looks something like this:
import asyncio
import json
import time
from redis import asyncio as aioredis
import asyncpg
async def main():
redis = aioredis.from_url("redis://localhost:6379", encoding="utf-8", decode_responses=True)
crate_pool = await asyncpg.create_pool(host="localhost", user="crate", max_size=100)
while True:
keys = await redis.keys("event:*")
for key in keys:
last_modified = await redis.object("TIME", key)
if (time.time() - last_modified[1]) >= 300: # check if key haven't been modified for 5 minutes
event_str = await redis.get(key)
event_json = json.loads(event_str)
crate_pool.acquire() as conn:
await conn.execute(text(f"INSERT INTO test.events (id, timestamp, data) VALUES ('{event_json['id']}', '{event_json['timestamp']}', '{event_json['data']}')"))
await redis.delete(key)
await asyncio.sleep(60)
if __name__ == "__main__":
asyncio.run(main())
I think redis or a message queue a la Kafka / RabbitMQ is a standard solution
non-optimal due to deleted records just being tombstoned.
They get deleted eventually with the next segment merge. However if you are not writing too much data, updates might just happen in-memory segments in CrateDB anyway.
Generally speaking I would avoid single inserts and use bulk requests instead
Do you happen to have any example code (Python or Go would be great!) showing how to do bulk inserts of data from Kafka where you only include “final” events rather than the whole stream of events?
Does tuning the table’s refresh_interval increase the amount of time that data exists in the memory pool?
If I’m getting updates every 5 seconds for somewhere between 3 and 10 minutes and I really only want the final update would using something like INSERT … ON CONFLICT DO UPDATE be ideal?
Does ON CONFLICT DO UPDATE work with bulk inserts if there’s duplicate data in the values list?
Do you happen to have any example code (Python or Go would be great!) showing how to do bulk inserts of data from Kafka where you only include “final” events rather than the whole stream of events?
Nothing to publicly share unfortunately. Maybe something the team will look into for some blog post, but can’t promise anything.
Does tuning the table’s refresh_interval increase the amount of time that data exists in the memory pool?
no, not necessarily. That depends more on "translog.flush_threshold_size" your clusters overall available memory and some other factors. Generally speaking it can take some minutes, till segments are flushed to disks and translogs are cleared.
increasing the refresh_interval however can help with overall index performance - but also depends on many many factors.
INSERT … ON CONFLICT DO UPDATE
Yes, can be used with primary keys set.
Does ON CONFLICT DO UPDATE work with bulk inserts if there’s duplicate data in the values list?
you could also use a sub select on insert e.g.
cr> CREATE TABLE t01 (val int);
CREATE OK, 1 row affected (1.684 sec)
cr> INSERT INTO t01
SELECT DISTINCT val
FROM (VALUES (1),(2),(1),(2),(1)) a(val);
INSERT OK, 2 rows affected (0.139 sec)
cr> SELECT * FROM t01;
+-----+
| val |
+-----+
| 1 |
| 2 |
+-----+
SELECT 2 rows in set (0.191 sec)