Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 228 additions & 16 deletions graphile/graphile-test/src/context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import type {
import { parse, print, Kind } from 'graphql';
import type { GraphileConfig } from 'graphile-config';
import { execute } from 'grafast';
import { withPgClientFromPgService } from 'graphile-build-pg';
import { makePgService } from 'postgraphile/adaptors/pg';
import type { Client, Pool } from 'pg';
import type { Client, Pool, QueryConfig } from 'pg';
import type { GetConnectionOpts, GetConnectionResult } from 'pgsql-test';

import type { GetConnectionsInput } from './types';

const EMPTY_ARRAY: readonly PgNotice[] = Object.freeze([]);
const $$queue = Symbol('withPgClientQueue');

/**
* V4-compatible error format (plain object, not GraphQLError class)
*/
Expand Down Expand Up @@ -176,6 +178,204 @@ function normalizeResult<T>(result: ExecutionResult, document?: DocumentNode): T
return normalized as T;
}

interface PgNotice {
severity: string;
message: string;
code?: string;
detail?: string;
hint?: string;
position?: number;
internalPosition?: number;
internalQuery?: string;
where?: string;
schema?: string;
table?: string;
column?: string;
dataType?: string;
constraint?: string;
file?: string;
line?: number;
routine?: string;
}

interface PgQueryResult {
rows: unknown[];
rowCount: number | null;
notices: readonly PgNotice[];
}

interface AdaptedPgClient {
rawClient: Client;
withTransaction(callback: (client: AdaptedPgClient) => Promise<unknown>): Promise<unknown>;
query(opts: { text: string; values?: unknown[]; arrayMode?: boolean }): Promise<PgQueryResult>;
}

function convertNotice(n: Record<string, unknown>): PgNotice {
return {
severity: (n.severity as string) ?? 'NOTICE',
message: (n.message as string) ?? '',
code: n.code as string | undefined,
detail: n.detail as string | undefined,
hint: n.hint as string | undefined,
position: n.position != null ? parseInt(String(n.position), 10) : undefined,
internalPosition: n.internalPosition != null ? parseInt(String(n.internalPosition), 10) : undefined,
internalQuery: n.internalQuery as string | undefined,
where: n.where as string | undefined,
schema: n.schema as string | undefined,
table: n.table as string | undefined,
column: n.column as string | undefined,
dataType: n.dataType as string | undefined,
constraint: n.constraint as string | undefined,
file: n.file as string | undefined,
line: n.line != null ? parseInt(String(n.line), 10) : undefined,
routine: n.routine as string | undefined,
};
}

async function pgClientQuery(pgClient: Client, queryObj: QueryConfig): Promise<PgQueryResult> {
let notices: PgNotice[] | null = null;
const addNotice = (n: Record<string, unknown>) => {
const converted = convertNotice(n);
if (notices === null) {
notices = [converted];
} else {
notices.push(converted);
}
};
pgClient.addListener('notice', addNotice);
try {
const { rows, rowCount } = await pgClient.query(queryObj);
return { rows, rowCount, notices: notices ?? EMPTY_ARRAY };
} finally {
pgClient.removeListener('notice', addNotice);
}
}

function newNodePostgresPgClient(
pgClient: Client,
txLevel: number,
alwaysQueue: boolean,
alreadyInTransaction: boolean
): AdaptedPgClient {
let queue: Promise<unknown> | null = null;
const addToQueue = <T>(callback: () => Promise<T>): Promise<T> => {
const result = queue ? queue.then(callback) : callback();
const clearIfSame = () => {
if (queue === newQueue) {
queue = null;
}
};
const newQueue = result.then(clearIfSame, clearIfSame);
queue = newQueue;
return result;
};
return {
rawClient: pgClient,
withTransaction(callback) {
return addToQueue(async () => {
if (txLevel === 0 && !alreadyInTransaction) {
await pgClient.query({ text: 'begin' });
} else {
await pgClient.query({
text: `savepoint tx${txLevel === 0 ? '' : txLevel}`,
});
}
try {
const newClient = newNodePostgresPgClient(pgClient, txLevel + 1, alwaysQueue, alreadyInTransaction);
const innerResult = await callback(newClient);
if (txLevel === 0 && !alreadyInTransaction) {
await pgClient.query({ text: 'commit' });
} else {
await pgClient.query({
text: `release savepoint tx${txLevel === 0 ? '' : txLevel}`,
});
}
return innerResult;
} catch (e) {
try {
if (txLevel === 0 && !alreadyInTransaction) {
await pgClient.query({ text: 'rollback' });
} else {
await pgClient.query({
text: `rollback to savepoint tx${txLevel === 0 ? '' : txLevel}`,
});
}
} catch (_e2) {
console.error(`Error occurred whilst rolling back: ${e}`);
Comment on lines +303 to +304
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Rollback error handler logs original error instead of the rollback failure error

When a withTransaction callback throws error e and the subsequent rollback query also fails with _e2, the console.error on line 304 logs the original error e rather than the rollback error _e2. The rollback failure reason is silently discarded.

Root Cause

In the catch block at graphile/graphile-test/src/context.ts:302-305:

} catch (_e2) {
    console.error(`Error occurred whilst rolling back: ${e}`);
}

The message "Error occurred whilst rolling back" implies the logged value explains why the rollback failed, but it actually logs e (the original callback error), not _e2 (the rollback failure). The original error e is already re-thrown at line 306 and will be caught/logged by callers, so logging it here adds no new information. Meanwhile, the actual rollback error _e2 — the useful diagnostic — is completely lost.

Impact: When debugging transaction rollback failures (e.g., connection dropped during rollback), the operator sees the original error repeated rather than the rollback-specific error, making it harder to diagnose the root cause of the rollback failure.

Suggested change
} catch (_e2) {
console.error(`Error occurred whilst rolling back: ${e}`);
} catch (_e2) {
console.error(`Error occurred whilst rolling back (original error: ${e}): ${_e2}`);
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

}
throw e;
}
});
},
query(opts) {
if (queue || alwaysQueue) {
return addToQueue(doIt);
} else {
return doIt();
}
function doIt() {
const { text, values, arrayMode } = opts;
const queryObj: QueryConfig = arrayMode
? { text, values, rowMode: 'array' as const } as QueryConfig
: { text, values };
return pgClientQuery(pgClient, queryObj);
}
},
};
}

async function makeNodePostgresWithPgClient_inner<T>(
pgClient: Client & { [key: symbol]: Promise<unknown> | null },
pgSettings: Record<string, string> | null,
callback: (client: AdaptedPgClient) => T | Promise<T>,
alwaysQueue: boolean,
alreadyInTransaction: boolean
): Promise<T> {
const pgSettingsEntries: [string, string][] = [];
if (pgSettings != null) {
for (const [key, value] of Object.entries(pgSettings)) {
if (value == null) continue;
pgSettingsEntries.push([key, '' + value]);
}
}
while (pgClient[$$queue]) {
await pgClient[$$queue];
}
return (pgClient[$$queue] = (async () => {
try {
if (pgSettingsEntries.length > 0) {
await pgClient.query({
text: alreadyInTransaction ? 'savepoint tx' : 'begin',
});
try {
await pgClient.query({
text: 'select set_config(el->>0, el->>1, true) from json_array_elements($1::json) el',
values: [JSON.stringify(pgSettingsEntries)],
});
const client = newNodePostgresPgClient(pgClient, 1, alwaysQueue, alreadyInTransaction);
const result = await callback(client);
await pgClient.query({
text: alreadyInTransaction ? 'release savepoint tx' : 'commit',
});
return result;
} catch (e) {
await pgClient.query({
text: alreadyInTransaction
? 'rollback to savepoint tx'
: 'rollback',
});
throw e;
}
} else {
const client = newNodePostgresPgClient(pgClient, 0, alwaysQueue, alreadyInTransaction);
return await callback(client);
}
} finally {
pgClient[$$queue] = null;
}
})()) as Promise<T>;
}

interface PgSettings {
[key: string]: string;
}
Expand Down Expand Up @@ -262,23 +462,34 @@ export const runGraphQLInContext = async <T = ExecutionResult>({
),
};

// Provide a custom withPgClient function that uses the test client
// Provide a custom withPgClient function that uses the test client.
// This ensures GraphQL operations run within the test transaction
// instead of getting a new connection from the pool
// instead of getting a new connection from the pool.
//
// Uses the full adapted client from @dataplan/pg with:
// - Query queuing to serialize concurrent calls
// - Proper pgSettings handling with savepoint wrapping
// - arrayMode → rowMode translation for pg client compatibility
const isInTransaction = !input.useRoot;
const withPgClientKey = pgService.withPgClientKey ?? 'withPgClient';
contextValue[withPgClientKey] = async <T>(
_pgSettings: Record<string, string> | null,
callback: (client: Client) => T | Promise<T>
): Promise<T> => {
// Simply use the test client - it's already in a transaction
// The pgSettings have already been applied above via setContextOnClient
return callback(pgClient);
const adaptedWithPgClient = async (
callerPgSettings: Record<string, string> | null,
callback: (client: AdaptedPgClient) => unknown
) => {
return makeNodePostgresWithPgClient_inner(
pgClient as Client & { [key: symbol]: Promise<unknown> | null },
callerPgSettings,
callback,
true,
isInTransaction
);
};

// Check if we're in a transaction by looking at the test client's transaction state
// When useRoot is true, we might not be in a transaction
// pgsql-test's `db` client is in a transaction, but `pg` (root) client may not be
const isInTransaction = !input.useRoot;
let released = false;
(adaptedWithPgClient as unknown as { release: () => void }).release = () => {
if (released) return;
released = true;
};
contextValue[withPgClientKey] = adaptedWithPgClient;

// Wrap the entire query execution in a savepoint if we're in a transaction
// This matches v4 PostGraphile behavior where each mutation is wrapped in a savepoint
Expand All @@ -297,6 +508,7 @@ export const runGraphQLInContext = async <T = ExecutionResult>({
rawResult = await execute({
schema,
document,
middleware: null,
variableValues: variables ?? undefined,
contextValue,
resolvedPreset,
Expand Down
Loading