diff --git a/graphile/graphile-test/src/context.ts b/graphile/graphile-test/src/context.ts index be26f3c32..f16e7c8ab 100644 --- a/graphile/graphile-test/src/context.ts +++ b/graphile/graphile-test/src/context.ts @@ -10,13 +10,15 @@ import type { import { parse, print, Kind } from 'graphql'; import type { GraphileConfig } from 'graphile-config'; import { execute } from 'grafast'; -import { withPgClientFromPgService } from 'graphile-build-pg'; import { makePgService } from 'postgraphile/adaptors/pg'; -import type { Client, Pool } from 'pg'; +import type { Client, Pool, QueryConfig } from 'pg'; import type { GetConnectionOpts, GetConnectionResult } from 'pgsql-test'; import type { GetConnectionsInput } from './types'; +const EMPTY_ARRAY: readonly PgNotice[] = Object.freeze([]); +const $$queue = Symbol('withPgClientQueue'); + /** * V4-compatible error format (plain object, not GraphQLError class) */ @@ -176,6 +178,204 @@ function normalizeResult(result: ExecutionResult, document?: DocumentNode): T return normalized as T; } +interface PgNotice { + severity: string; + message: string; + code?: string; + detail?: string; + hint?: string; + position?: number; + internalPosition?: number; + internalQuery?: string; + where?: string; + schema?: string; + table?: string; + column?: string; + dataType?: string; + constraint?: string; + file?: string; + line?: number; + routine?: string; +} + +interface PgQueryResult { + rows: unknown[]; + rowCount: number | null; + notices: readonly PgNotice[]; +} + +interface AdaptedPgClient { + rawClient: Client; + withTransaction(callback: (client: AdaptedPgClient) => Promise): Promise; + query(opts: { text: string; values?: unknown[]; arrayMode?: boolean }): Promise; +} + +function convertNotice(n: Record): PgNotice { + return { + severity: (n.severity as string) ?? 'NOTICE', + message: (n.message as string) ?? '', + code: n.code as string | undefined, + detail: n.detail as string | undefined, + hint: n.hint as string | undefined, + position: n.position != null ? parseInt(String(n.position), 10) : undefined, + internalPosition: n.internalPosition != null ? parseInt(String(n.internalPosition), 10) : undefined, + internalQuery: n.internalQuery as string | undefined, + where: n.where as string | undefined, + schema: n.schema as string | undefined, + table: n.table as string | undefined, + column: n.column as string | undefined, + dataType: n.dataType as string | undefined, + constraint: n.constraint as string | undefined, + file: n.file as string | undefined, + line: n.line != null ? parseInt(String(n.line), 10) : undefined, + routine: n.routine as string | undefined, + }; +} + +async function pgClientQuery(pgClient: Client, queryObj: QueryConfig): Promise { + let notices: PgNotice[] | null = null; + const addNotice = (n: Record) => { + const converted = convertNotice(n); + if (notices === null) { + notices = [converted]; + } else { + notices.push(converted); + } + }; + pgClient.addListener('notice', addNotice); + try { + const { rows, rowCount } = await pgClient.query(queryObj); + return { rows, rowCount, notices: notices ?? EMPTY_ARRAY }; + } finally { + pgClient.removeListener('notice', addNotice); + } +} + +function newNodePostgresPgClient( + pgClient: Client, + txLevel: number, + alwaysQueue: boolean, + alreadyInTransaction: boolean +): AdaptedPgClient { + let queue: Promise | null = null; + const addToQueue = (callback: () => Promise): Promise => { + const result = queue ? queue.then(callback) : callback(); + const clearIfSame = () => { + if (queue === newQueue) { + queue = null; + } + }; + const newQueue = result.then(clearIfSame, clearIfSame); + queue = newQueue; + return result; + }; + return { + rawClient: pgClient, + withTransaction(callback) { + return addToQueue(async () => { + if (txLevel === 0 && !alreadyInTransaction) { + await pgClient.query({ text: 'begin' }); + } else { + await pgClient.query({ + text: `savepoint tx${txLevel === 0 ? '' : txLevel}`, + }); + } + try { + const newClient = newNodePostgresPgClient(pgClient, txLevel + 1, alwaysQueue, alreadyInTransaction); + const innerResult = await callback(newClient); + if (txLevel === 0 && !alreadyInTransaction) { + await pgClient.query({ text: 'commit' }); + } else { + await pgClient.query({ + text: `release savepoint tx${txLevel === 0 ? '' : txLevel}`, + }); + } + return innerResult; + } catch (e) { + try { + if (txLevel === 0 && !alreadyInTransaction) { + await pgClient.query({ text: 'rollback' }); + } else { + await pgClient.query({ + text: `rollback to savepoint tx${txLevel === 0 ? '' : txLevel}`, + }); + } + } catch (_e2) { + console.error(`Error occurred whilst rolling back: ${e}`); + } + throw e; + } + }); + }, + query(opts) { + if (queue || alwaysQueue) { + return addToQueue(doIt); + } else { + return doIt(); + } + function doIt() { + const { text, values, arrayMode } = opts; + const queryObj: QueryConfig = arrayMode + ? { text, values, rowMode: 'array' as const } as QueryConfig + : { text, values }; + return pgClientQuery(pgClient, queryObj); + } + }, + }; +} + +async function makeNodePostgresWithPgClient_inner( + pgClient: Client & { [key: symbol]: Promise | null }, + pgSettings: Record | null, + callback: (client: AdaptedPgClient) => T | Promise, + alwaysQueue: boolean, + alreadyInTransaction: boolean +): Promise { + const pgSettingsEntries: [string, string][] = []; + if (pgSettings != null) { + for (const [key, value] of Object.entries(pgSettings)) { + if (value == null) continue; + pgSettingsEntries.push([key, '' + value]); + } + } + while (pgClient[$$queue]) { + await pgClient[$$queue]; + } + return (pgClient[$$queue] = (async () => { + try { + if (pgSettingsEntries.length > 0) { + await pgClient.query({ + text: alreadyInTransaction ? 'savepoint tx' : 'begin', + }); + try { + await pgClient.query({ + text: 'select set_config(el->>0, el->>1, true) from json_array_elements($1::json) el', + values: [JSON.stringify(pgSettingsEntries)], + }); + const client = newNodePostgresPgClient(pgClient, 1, alwaysQueue, alreadyInTransaction); + const result = await callback(client); + await pgClient.query({ + text: alreadyInTransaction ? 'release savepoint tx' : 'commit', + }); + return result; + } catch (e) { + await pgClient.query({ + text: alreadyInTransaction + ? 'rollback to savepoint tx' + : 'rollback', + }); + throw e; + } + } else { + const client = newNodePostgresPgClient(pgClient, 0, alwaysQueue, alreadyInTransaction); + return await callback(client); + } + } finally { + pgClient[$$queue] = null; + } + })()) as Promise; +} + interface PgSettings { [key: string]: string; } @@ -262,23 +462,34 @@ export const runGraphQLInContext = async ({ ), }; - // Provide a custom withPgClient function that uses the test client + // Provide a custom withPgClient function that uses the test client. // This ensures GraphQL operations run within the test transaction - // instead of getting a new connection from the pool + // instead of getting a new connection from the pool. + // + // Uses the full adapted client from @dataplan/pg with: + // - Query queuing to serialize concurrent calls + // - Proper pgSettings handling with savepoint wrapping + // - arrayMode → rowMode translation for pg client compatibility + const isInTransaction = !input.useRoot; const withPgClientKey = pgService.withPgClientKey ?? 'withPgClient'; - contextValue[withPgClientKey] = async ( - _pgSettings: Record | null, - callback: (client: Client) => T | Promise - ): Promise => { - // Simply use the test client - it's already in a transaction - // The pgSettings have already been applied above via setContextOnClient - return callback(pgClient); + const adaptedWithPgClient = async ( + callerPgSettings: Record | null, + callback: (client: AdaptedPgClient) => unknown + ) => { + return makeNodePostgresWithPgClient_inner( + pgClient as Client & { [key: symbol]: Promise | null }, + callerPgSettings, + callback, + true, + isInTransaction + ); }; - - // Check if we're in a transaction by looking at the test client's transaction state - // When useRoot is true, we might not be in a transaction - // pgsql-test's `db` client is in a transaction, but `pg` (root) client may not be - const isInTransaction = !input.useRoot; + let released = false; + (adaptedWithPgClient as unknown as { release: () => void }).release = () => { + if (released) return; + released = true; + }; + contextValue[withPgClientKey] = adaptedWithPgClient; // Wrap the entire query execution in a savepoint if we're in a transaction // This matches v4 PostGraphile behavior where each mutation is wrapped in a savepoint @@ -297,6 +508,7 @@ export const runGraphQLInContext = async ({ rawResult = await execute({ schema, document, + middleware: null, variableValues: variables ?? undefined, contextValue, resolvedPreset,