Broken Connection after context cancellation
When cancelling a running QueryRowContext the driver returns interrupted (9)
Same does NOT happen when using mattn-
github.com/mattn/go-sqlite3` package.
I assume somewhere in driver handling is a race condition that doesn't properly "clean up" an interrupted connection when returning to the pool.
EDIT: While experimenting, it seems that a work-around for the issue with interrupted connections MIGHT be returning false
from (*sqlite.conn).IsValid()
func (c *conn) IsValid() bool {
// return c.db != 0 // original code
return c.db != 0 && sqlite3.Xsqlite3_is_interrupted(c.tls, c.db) == 0
}
EDIT2: Upon reading Sqlite docs, it seems that some pending statement might not be finalized correctly when cancellation occurs
The sqlite3_interrupt(D) call is in effect until all currently running SQL statements on database connection D complete. Any new SQL statements that are started after the sqlite3_interrupt() call and before the running statement count reaches zero are interrupted as if they had been running prior to the sqlite3_interrupt() call. New SQL statements that are started after the running statement count reaches zero are not effected by the sqlite3_interrupt(). A call to sqlite3_interrupt(D) that occurs when there are no running SQL statements is a no-op and has no effect on SQL statements that are started after the sqlite3_interrupt() call returns.
Original Comments
func (s *stmt) query(ctx context.Context, args []driver.NamedValue) (r driver.Rows, err error) {
// ...
if pstmt, err = s.c.prepareV2(&psql); err != nil {
return nil, err // This is interrupted (9)
// returning ErrBadConn here "drops" the dead connection
// allowing to retry with a clean one, but still drops a request.
// return nil, driver.ErrBadConn
}
Once a connection is in "interrupted" state, it's no longer able to handle any new calls.
affected version: v1.34.1 tested OS: windows 11 amd64
Repro:
- Run a bunch of requests more than can be handled concurrently
e.g. by calling
http://127.0.0.1:8082/items/2
repeatedly after initially seeding viahttp://127.0.0.1:8082/seed
- then cancel requests that time-out
- At some point a running sqlite operation will be "interrupted" and leave the connection in a broken state.
- Any further call that fetches the same pooled connection will be dropped as it is "interrupted"
Repro example (Server):
package main
import (
"context"
"database/sql"
"errors"
"fmt"
"net/http"
"os"
"os/signal"
"path/filepath"
"runtime"
"time"
_ "github.com/mattn/go-sqlite3"
_ "modernc.org/sqlite"
)
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
absDb, err := filepath.Abs("simple-web.db")
if err != nil {
panic(err)
}
dbSlash := filepath.ToSlash(absDb)
connStr := "file:///" + dbSlash + "?_pragma=foreign_keys(1)&_pragma=journal_mode(WAL)&_pragma=synchronous(NORMAL)&_pragma=busy_timeout(10000)"
// connStr = "file:///" + dbSlash + "?_foreign_keys=1&_journal_mode=WAL&_synchronous=NORMAL&_busy_timeout=10000&_mutex=no"
fmt.Printf("connecting to %s\n", connStr)
db, err := sql.Open("sqlite", connStr)
if err != nil {
panic(err)
}
db.SetMaxOpenConns(2 * runtime.NumCPU())
db.SetMaxIdleConns(2 * runtime.NumCPU())
db.SetConnMaxLifetime(5 * time.Minute)
db.SetConnMaxIdleTime(1 * time.Minute)
defer db.Close()
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL DEFAULT ''
)`); err != nil {
panic(err)
}
http.HandleFunc("/seed", func(w http.ResponseWriter, r *http.Request) {
db.Exec(`INSERT INTO data (Name) VALUES ('A'),('B'),('C'),('D')`)
})
http.HandleFunc("/items/{id}", func(w http.ResponseWriter, r *http.Request) {
id := r.PathValue("id")
type entry struct {
Id int64 `sql:"id"`
Name string `sql:"name"`
}
scanEntry := func(ctx context.Context, id string) (entry, error) {
// this might return ERROR_BUSY even though we have a busy_timeout set and
// per interface, db.Conn(..) should BLOCK until a connection is obtained
dbConn, err := db.Conn(ctx)
if err != nil {
return entry{}, fmt.Errorf("obtaining db conn: %w", err)
}
defer dbConn.Close()
// modernc sqlite breaks connections when context gets cancelled
// ctx = context.Background()
row := dbConn.QueryRowContext(ctx, "SELECT Id,Name FROM data WHERE id = ?", id)
if err := row.Err(); err != nil {
return entry{}, fmt.Errorf("retrieving row: %w", err)
}
e := entry{}
if err := row.Scan(&e.Id, &e.Name); err != nil {
return entry{}, fmt.Errorf("scanning entry: %w", err)
}
return e, nil
}
const HttpClientClosedRequest = 499
e, err := scanEntry(r.Context(), id)
if err != nil {
if errors.Is(err, context.Canceled) {
w.WriteHeader(HttpClientClosedRequest)
return
}
w.WriteHeader(http.StatusNotFound)
fmt.Fprintf(os.Stderr, "%v - failed to retrieve row %v\n", time.Now().Format(time.RFC3339), err)
return
}
w.Header().Set("Content-Type", "text/plain")
fmt.Fprintf(w, "%#v", e)
})
server := http.Server{
Addr: ":8082",
Handler: http.DefaultServeMux,
}
go func() {
server.ListenAndServe()
}()
<-ctx.Done()
server.Shutdown(context.Background())
db.Close()
}
Heres some sample Code in C# that runs for ~1000ms before cancelling any pending requests
async Task Main()
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(QueryCancelToken);
var token = cts.Token;
var numRequests = Environment.ProcessorCount;
numRequests *= 20;
cts.CancelAfter(1000);
var tasks = Enumerable.Range(0, numRequests).Select(j => Task.Run(async () =>
{
var avg = j;
using var hc = new HttpClient();
try
{
while (!token.IsCancellationRequested)
{
using var after500ms = CancellationTokenSource.CreateLinkedTokenSource(token);
after500ms.CancelAfter(500);
try
{
using var resp = await hc.GetAsync("http://127.0.0.1:8082/items/2", after500ms.Token);
resp.EnsureSuccessStatusCode();
}
catch
{
}
}
}
catch (OperationCanceledException ocex) when (ocex.CancellationToken == token)
{
}
catch
{
}
}));
await Task.WhenAll(tasks);
}