Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/fix-infinite-query-peek-ahead.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@tanstack/react-db': patch
'@tanstack/db': patch
---

Fix `useLiveInfiniteQuery` peek-ahead detection for `hasNextPage`. The initial query now correctly requests `pageSize + 1` items to detect whether additional pages exist, matching the behavior of subsequent page loads.

Fix async on-demand pagination by ensuring the graph callback fires at least once even when there is no pending graph work, so that `loadMoreIfNeeded` is triggered after `setWindow()` increases the limit.
9 changes: 9 additions & 0 deletions packages/db/src/query/live/collection-config-builder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -336,13 +336,22 @@ export class CollectionConfigBuilder<

// Always run the graph if subscribed (eager execution)
if (syncState.subscribedToAllCollections) {
let callbackCalled = false
while (syncState.graph.pendingWork()) {
syncState.graph.run()
// Flush accumulated changes after each graph step to commit them as one transaction.
// This ensures intermediate join states (like null on one side) don't cause
// duplicate key errors when the full join result arrives in the same step.
syncState.flushPendingChanges?.()
callback?.()
callbackCalled = true
}

// Ensure the callback runs at least once even when the graph has no pending work.
// This handles lazy loading scenarios where setWindow() increases the limit or
// an async loadSubset completes and we need to re-check if more data is needed.
if (!callbackCalled) {
callback?.()
}

// On the initial run, we may need to do an empty commit to ensure that
Expand Down
6 changes: 5 additions & 1 deletion packages/react-db/src/useLiveInfiniteQuery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,14 @@ export function useLiveInfiniteQuery<TContext extends Context>(

// Create a live query with initial limit and offset
// Either pass collection directly or wrap query function
// Use pageSize + 1 for peek-ahead detection (to know if there are more pages)
const queryResult = isCollection
? useLiveQuery(queryFnOrCollection)
: useLiveQuery(
(q) => queryFnOrCollection(q).limit(pageSize).offset(0),
(q) =>
queryFnOrCollection(q)
.limit(pageSize + 1)
.offset(0),
deps,
)

Expand Down
300 changes: 295 additions & 5 deletions packages/react-db/tests/useLiveInfiniteQuery.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Post = {
category: string
}

const createMockPosts = (count: number): Array<Post> => {
function createMockPosts(count: number): Array<Post> {
const posts: Array<Post> = []
for (let i = 1; i <= count; i++) {
posts.push({
Expand All @@ -28,6 +28,79 @@ const createMockPosts = (count: number): Array<Post> => {
return posts
}

type OnDemandCollectionOptions = {
id: string
allPosts: Array<Post>
autoIndex?: `off` | `eager`
asyncDelay?: number
}

/**
* Creates an on-demand collection with a loadSubset handler that supports
* sorting, cursor-based pagination, and limit. Returns the collection and
* a reference to recorded loadSubset calls for test assertions.
*/
function createOnDemandCollection(opts: OnDemandCollectionOptions) {
const loadSubsetCalls: Array<LoadSubsetOptions> = []
const { id, allPosts, autoIndex, asyncDelay } = opts

const collection = createCollection<Post>({
id,
getKey: (post: Post) => post.id,
syncMode: `on-demand`,
startSync: true,
...(autoIndex ? { autoIndex } : {}),
sync: {
sync: ({ markReady, begin, write, commit }) => {
markReady()

return {
loadSubset: (subsetOpts: LoadSubsetOptions) => {
loadSubsetCalls.push({ ...subsetOpts })

let filtered = [...allPosts].sort(
(a, b) => b.createdAt - a.createdAt,
)

if (subsetOpts.cursor) {
const whereFromFn = createFilterFunctionFromExpression(
subsetOpts.cursor.whereFrom,
)
filtered = filtered.filter(whereFromFn)
}

if (subsetOpts.limit !== undefined) {
filtered = filtered.slice(0, subsetOpts.limit)
}

function writeAll(): void {
begin()
for (const post of filtered) {
write({ type: `insert`, value: post })
}
commit()
}

if (asyncDelay !== undefined) {
return new Promise<void>((resolve) => {
setTimeout(() => {
writeAll()
resolve()
}, asyncDelay)
})
}

writeAll()
return true
},
}
},
},
})

return { collection, loadSubsetCalls }
}

describe(`useLiveInfiniteQuery`, () => {
it(`should fetch initial page of data`, async () => {
const posts = createMockPosts(50)
Expand Down Expand Up @@ -629,7 +702,7 @@ describe(`useLiveInfiniteQuery`, () => {
{
pageSize: 10,
initialPageParam: 0,
getNextPageParam: (lastPage, allPages, lastPageParam) =>
getNextPageParam: (lastPage, _allPages, lastPageParam) =>
lastPage.length === 10 ? lastPageParam + 1 : undefined,
},
)
Expand Down Expand Up @@ -838,7 +911,7 @@ describe(`useLiveInfiniteQuery`, () => {
{
pageSize: 10,
initialPageParam: 100,
getNextPageParam: (lastPage, allPages, lastPageParam) =>
getNextPageParam: (lastPage, _allPages, lastPageParam) =>
lastPage.length === 10 ? lastPageParam + 1 : undefined,
},
)
Expand Down Expand Up @@ -987,6 +1060,221 @@ describe(`useLiveInfiniteQuery`, () => {
expect(result.current.isFetchingNextPage).toBe(false)
})

it(`should request limit+1 (peek-ahead) from loadSubset for hasNextPage detection`, async () => {
// Verifies that useLiveInfiniteQuery requests pageSize+1 items from loadSubset
// to detect whether there are more pages available (peek-ahead strategy)
const PAGE_SIZE = 10
const { collection, loadSubsetCalls } = createOnDemandCollection({
id: `peek-ahead-limit-test`,
allPosts: createMockPosts(PAGE_SIZE), // Exactly PAGE_SIZE posts
})

const { result } = renderHook(() => {
return useLiveInfiniteQuery(
(q) =>
q
.from({ posts: collection })
.orderBy(({ posts: p }) => p.createdAt, `desc`),
{
pageSize: PAGE_SIZE,
getNextPageParam: (lastPage) =>
lastPage.length === PAGE_SIZE ? lastPage.length : undefined,
},
)
})

await waitFor(() => {
expect(result.current.isReady).toBe(true)
})

const callWithLimit = loadSubsetCalls.find(
(call) => call.limit !== undefined,
)
expect(callWithLimit).toBeDefined()
expect(callWithLimit!.limit).toBe(PAGE_SIZE + 1)

// With exactly PAGE_SIZE posts, hasNextPage should be false (no peek-ahead item returned)
expect(result.current.hasNextPage).toBe(false)
expect(result.current.data).toHaveLength(PAGE_SIZE)
})

it(`should detect hasNextPage via peek-ahead with exactly pageSize+1 items in on-demand collection`, async () => {
// Boundary test: with exactly pageSize+1 items, the peek-ahead item should
// signal hasNextPage=true but NOT appear in user-visible data
const PAGE_SIZE = 10
const { collection } = createOnDemandCollection({
id: `peek-ahead-boundary-test`,
allPosts: createMockPosts(PAGE_SIZE + 1),
})

const { result } = renderHook(() => {
return useLiveInfiniteQuery(
(q) =>
q
.from({ posts: collection })
.orderBy(({ posts: p }) => p.createdAt, `desc`),
{
pageSize: PAGE_SIZE,
getNextPageParam: (lastPage) =>
lastPage.length === PAGE_SIZE ? lastPage.length : undefined,
},
)
})

await waitFor(() => {
expect(result.current.isReady).toBe(true)
})

// Peek-ahead item detected: hasNextPage should be true
expect(result.current.hasNextPage).toBe(true)
// But user-visible data should be exactly pageSize (peek-ahead excluded)
expect(result.current.data).toHaveLength(PAGE_SIZE)
expect(result.current.pages).toHaveLength(1)
expect(result.current.pages[0]).toHaveLength(PAGE_SIZE)
})

it(`should work with on-demand collection and fetch multiple pages`, async () => {
// End-to-end test: on-demand collection where ALL data comes from loadSubset
// (no initial data). Simulates the real Electric on-demand scenario.
const PAGE_SIZE = 10
const { collection, loadSubsetCalls } = createOnDemandCollection({
id: `on-demand-e2e-test`,
allPosts: createMockPosts(25), // 2 full pages + 5 items
autoIndex: `eager`,
})

const { result } = renderHook(() => {
return useLiveInfiniteQuery(
(q) =>
q
.from({ posts: collection })
.orderBy(({ posts: p }) => p.createdAt, `desc`),
{
pageSize: PAGE_SIZE,
getNextPageParam: (lastPage) =>
lastPage.length === PAGE_SIZE ? lastPage.length : undefined,
},
)
})

await waitFor(() => {
expect(result.current.isReady).toBe(true)
})

// Page 1: 10 items
expect(result.current.pages).toHaveLength(1)
expect(result.current.data).toHaveLength(PAGE_SIZE)
expect(result.current.hasNextPage).toBe(true)
expect(result.current.data[0]!.id).toBe(`1`)
expect(result.current.data[9]!.id).toBe(`10`)

// Fetch page 2
act(() => {
result.current.fetchNextPage()
})

await waitFor(() => {
expect(result.current.pages).toHaveLength(2)
})

expect(loadSubsetCalls.length).toBeGreaterThan(1)
expect(result.current.data).toHaveLength(20)
expect(result.current.hasNextPage).toBe(true)
expect(result.current.pages[1]![0]!.id).toBe(`11`)
expect(result.current.pages[1]![9]!.id).toBe(`20`)

// Fetch page 3 (partial page)
act(() => {
result.current.fetchNextPage()
})

await waitFor(() => {
expect(result.current.pages).toHaveLength(3)
})

expect(result.current.data).toHaveLength(25)
expect(result.current.pages[2]).toHaveLength(5)
expect(result.current.hasNextPage).toBe(false)
expect(result.current.pages[2]![0]!.id).toBe(`21`)
expect(result.current.pages[2]![4]!.id).toBe(`25`)
})

it(`should work with on-demand collection with async loadSubset`, async () => {
// Same as the sync on-demand test, but loadSubset returns a Promise
// to simulate async network requests (the real Electric scenario).
const PAGE_SIZE = 10
const { collection, loadSubsetCalls } = createOnDemandCollection({
id: `on-demand-async-test`,
allPosts: createMockPosts(25),
autoIndex: `eager`,
asyncDelay: 10,
})

const { result } = renderHook(() => {
return useLiveInfiniteQuery(
(q) =>
q
.from({ posts: collection })
.orderBy(({ posts: p }) => p.createdAt, `desc`),
{
pageSize: PAGE_SIZE,
getNextPageParam: (lastPage) =>
lastPage.length === PAGE_SIZE ? lastPage.length : undefined,
},
)
})

await waitFor(() => {
expect(result.current.isReady).toBe(true)
})

await waitFor(() => {
expect(result.current.data).toHaveLength(PAGE_SIZE)
})

expect(result.current.pages).toHaveLength(1)
expect(result.current.hasNextPage).toBe(true)

const initialCallCount = loadSubsetCalls.length

// Fetch page 2
act(() => {
result.current.fetchNextPage()
})

expect(result.current.isFetchingNextPage).toBe(true)

await waitFor(
() => {
expect(result.current.data).toHaveLength(20)
},
{ timeout: 500 },
)

expect(result.current.pages).toHaveLength(2)
expect(loadSubsetCalls.length).toBeGreaterThan(initialCallCount)
expect(result.current.hasNextPage).toBe(true)

// Fetch page 3 (partial page) to verify async path handles end-of-data
const callCountBeforePage3 = loadSubsetCalls.length

act(() => {
result.current.fetchNextPage()
})

await waitFor(
() => {
expect(result.current.data).toHaveLength(25)
},
{ timeout: 500 },
)

expect(result.current.pages).toHaveLength(3)
expect(result.current.pages[2]).toHaveLength(5)
expect(loadSubsetCalls.length).toBeGreaterThan(callCountBeforePage3)
expect(result.current.hasNextPage).toBe(false)
})

it(`should track isFetchingNextPage when async loading is triggered`, async () => {
// Define all data upfront
const allPosts = createMockPosts(30)
Expand Down Expand Up @@ -1062,8 +1350,10 @@ describe(`useLiveInfiniteQuery`, () => {
}
// Re-sort after combining
filtered.sort((a, b) => b.createdAt - a.createdAt)
} catch {
// Fallback to original filtered if cursor parsing fails
} catch (e) {
throw new Error(`Test loadSubset: cursor parsing failed`, {
cause: e,
})
}
} else if (opts.limit !== undefined) {
// Apply limit only if no cursor (cursor handles limit internally)
Expand Down
Loading