I did get a solution working with the crate python client using execute many. still curious about the loader feature on web ui. It says new users cant load attachments.
t_stamp,entity,float_value,quality
1654064489856,site_a/equipment_01,512.000,196
1654064939895,site_a/equipment_01,463.500,196
1654066289969,site_a/equipment_01,586.500,196
1654066740002,site_a/equipment_01,491.167,196
1654068090078,site_a/equipment_01,622.714,196
1654068540111,site_a/equipment_01,701.667,196
1654069890183,site_a/equipment_01,329.200,196
1654070340222,site_a/equipment_01,342.833,196
1654071690296,site_a/equipment_02,309.769,196
1654072140345,site_a/equipment_02,1357.333,196
1654073490413,site_a/equipment_02,359.000,196
1654073940438,site_a/equipment_02,317.091,196
1654075290520,site_a/equipment_02,141.769,196
1654075740547,site_a/equipment_02,408.300,196
1654077090628,site_a/equipment_02,163.250,196
1654077540664,site_a/equipment_02,122.500,196
1654078890748,site_a/equipment_02,209.000,196
1654079340773,site_a/equipment_02,540.167,196
1654080690847,site_a/equipment_02,303.545,196
this is my create db ddl
CREATE TABLE spot_data_history (
"t_stamp" TIMESTAMP,
"entity" TEXT,
"float_value" DOUBLE,
"quality" INT,
PRIMARY KEY (t_stamp,entity)
);
The pipfile
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
sqlalchemy = "*"
pyodbc = "*"
pandas = "*"
dynaconf = "*"
loguru = "*"
crate = {extras = ["sqlalchemy"], version = "*"}
pendulum = "*"
[dev-packages]
black = "*"
[requires]
python_version = "3.10"
The ingest
from pathlib import Path
import csv
from crate import client
from loguru import logger # Add loguru for logging
connection = client.connect("http://localhost:4201")
cursor = connection.cursor()
sql = "INSERT INTO spot_data_history values (?,?,?,?)"
data_dir = Path(__file__).parent / "data"
csv_files = (data_dir).glob('*.csv')
for csv_file in csv_files:
with open(csv_file, "r") as file:
reader = csv.reader(file)
next(reader) # Skip the header row
chunk_size = 10_000 # Define your chunk size
chunk = []
for idx, row in enumerate(reader):
# csv may have empty lines
if len(row) == 0:
continue
t_stamp, entity, float_value, quality = row
chunk.append([t_stamp, entity, float_value, quality])
if len(chunk) >= chunk_size:
logger.info("inserting...")
cursor.executemany(sql, chunk)
chunk = [] # Clear the chunk after insertion
if chunk: # Insert any remaining rows
cursor.executemany(sql, chunk)
cursor.close()
connection.close()
the docker compose used to spin up local crate db cluster
version: '3.8'
services:
cratedb01:
image: crate:latest
ports:
- "4201:4200"
- "5432:5432"
volumes:
- ./data/crate/01:/data
command: ["crate",
"-Ccluster.name=crate-docker-cluster",
"-Cnode.name=cratedb01",
"-Cnode.data=true",
"-Cnetwork.host=_site_",
"-Cdiscovery.seed_hosts=cratedb02,cratedb03",
"-Ccluster.initial_master_nodes=cratedb01,cratedb02,cratedb03",
"-Cgateway.expected_data_nodes=3",
"-Cgateway.recover_after_data_nodes=2"]
deploy:
replicas: 1
restart_policy:
condition: on-failure
environment:
- CRATE_HEAP_SIZE=2g
cratedb02:
image: crate:latest
ports:
- "4202:4200"
volumes:
- ./data/crate/02:/data
command: ["crate",
"-Ccluster.name=crate-docker-cluster",
"-Cnode.name=cratedb02",
"-Cnode.data=true",
"-Cnetwork.host=_site_",
"-Cdiscovery.seed_hosts=cratedb01,cratedb03",
"-Ccluster.initial_master_nodes=cratedb01,cratedb02,cratedb03",
"-Cgateway.expected_data_nodes=3",
"-Cgateway.recover_after_data_nodes=2"]
deploy:
replicas: 1
restart_policy:
condition: on-failure
environment:
- CRATE_HEAP_SIZE=2g
cratedb03:
image: crate:latest
ports:
- "4203:4200"
volumes:
- ./data/crate/03:/data
command: ["crate",
"-Ccluster.name=crate-docker-cluster",
"-Cnode.name=cratedb03",
"-Cnode.data=true",
"-Cnetwork.host=_site_",
"-Cdiscovery.seed_hosts=cratedb01,cratedb02",
"-Ccluster.initial_master_nodes=cratedb01,cratedb02,cratedb03",
"-Cgateway.expected_data_nodes=3",
"-Cgateway.recover_after_data_nodes=2"]
deploy:
replicas: 1
restart_policy:
condition: on-failure
environment:
- CRATE_HEAP_SIZE=2g