This article describes how to connect to CrateDB from Go using the pgx PostgreSQL driver.
Prerequisites
To connect to CrateDB, we first install the main pgx
package as well as its connection pooling package:
$ go get github.com/jackc/pgx/v4
$ go get github.com/jackc/pgx/v4/pgxpool
Connecting to CrateDB
Below you will find a snippet utilizing pgx
to create a pool of 10 connections to CrateDB. A simple test query gets run against a system table to ensure data can be retrieved:
package main
import (
"context"
"fmt"
"os"
"github.com/jackc/pgx/v4/pgxpool"
)
func main() {
ctx := context.Background()
// a regular PostgreSQL-style connection URL with a pool of 10 connections
config, err := pgxpool.ParseConfig("postgresql://crate@localhost:5432/doc?pool_max_conns=10")
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to parse database configuration: %v\n", err)
os.Exit(1)
}
dbpool, err := pgxpool.ConnectConfig(ctx, config)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
os.Exit(1)
}
defer dbpool.Close()
var id string
var hostname string
err = dbpool.QueryRow(ctx, "SELECT id, hostname FROM sys.nodes LIMIT 1").Scan(&id, &hostname)
if err != nil {
fmt.Fprintf(os.Stderr, "QueryRow failed: %v\n", err)
os.Exit(1)
}
fmt.Println(id, hostname)
}
Ingesting into CrateDB
Several strategies are available to achieve high performance for ingesting data into CrateDB.
Multi-Value Insert
pgx
supports prepared statements for bulk ingests, which the following example makes use of:
package main
import (
"context"
"fmt"
"os"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
)
func main() {
ctx := context.Background()
// a regular PostgreSQL-style connection URL with a pool of 10 connections
config, err := pgxpool.ParseConfig("postgresql://crate@localhost:5432/doc?pool_max_conns=10")
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to parse database configuration: %v\n", err)
os.Exit(1)
}
dbpool, err := pgxpool.ConnectConfig(ctx, config)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
os.Exit(1)
}
defer dbpool.Close()
// simple test table, created via CREATE TABLE doc.my_test_table(numeric_value INTEGER);
stmt := `INSERT INTO doc.my_test_table (numeric_value) VALUES ($1);`
// create a new batch
batch := &pgx.Batch{}
// fill the batch with 200 sample values
for i := 0; i < 200; i++ {
batch.Queue(stmt, i)
}
// send the batch
br := dbpool.SendBatch(ctx, batch)
// execute the batch
_, err = br.Exec()
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to execute batch %v\n", err)
os.Exit(1)
}
// close the batch
err = br.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to close batch %v\n", err)
os.Exit(1)
}
}
UNNEST
Alternatively, we can also ingest by passing arrays and using UNNEST
:
package main
import (
"context"
"fmt"
"os"
"github.com/jackc/pgx/v4/pgxpool"
)
func main() {
ctx := context.Background()
// a regular PostgreSQL-style connection URL with a pool of 10 connections
config, err := pgxpool.ParseConfig("postgresql://crate@localhost:5432/doc?pool_max_conns=10")
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to parse database configuration: %v\n", err)
os.Exit(1)
}
dbpool, err := pgxpool.ConnectConfig(ctx, config)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
os.Exit(1)
}
defer dbpool.Close()
// fill an array with 200 sample values
var values [200]int
for i := 0; i < 200; i++ {
values[i] = i
}
// simple test table, created via CREATE TABLE doc.my_test_table(numeric_value INTEGER);
stmt := `INSERT INTO doc.my_test_table (numeric_value) SELECT * FROM UNNEST($1::INTEGER[]);`
// send the batch
result, err := dbpool.Exec(ctx, stmt, values)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to execute query %v\n", err)
os.Exit(1)
}
fmt.Println(result)
}