diff --git a/packages/kernel-test/src/io.test.ts b/packages/kernel-test/src/io.test.ts new file mode 100644 index 000000000..80a99f51d --- /dev/null +++ b/packages/kernel-test/src/io.test.ts @@ -0,0 +1,255 @@ +import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs'; +import { waitUntilQuiescent } from '@metamask/kernel-utils'; +import { Kernel } from '@metamask/ocap-kernel'; +import type { IOChannel, IOConfig } from '@metamask/ocap-kernel'; +import * as net from 'node:net'; +import * as os from 'node:os'; +import * as path from 'node:path'; +import { describe, it, expect, afterEach } from 'vitest'; + +import { getBundleSpec, makeTestLogger } from './utils.ts'; + +function tempSocketPath(): string { + return path.join( + os.tmpdir(), + `io-int-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`, + ); +} + +async function connectToSocket(socketPath: string): Promise { + return new Promise((resolve, reject) => { + const client = net.createConnection(socketPath, () => { + client.removeListener('error', reject); + resolve(client); + }); + client.on('error', reject); + }); +} + +async function writeLine(socket: net.Socket, line: string): Promise { + return new Promise((resolve, reject) => { + socket.write(`${line}\n`, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); +} + +async function readLine(socket: net.Socket): Promise { + return new Promise((resolve) => { + let buffer = ''; + const onData = (data: Buffer): void => { + buffer += data.toString(); + const idx = buffer.indexOf('\n'); + if (idx !== -1) { + socket.removeListener('data', onData); + resolve(buffer.slice(0, idx)); + } + }; + socket.on('data', onData); + }); +} + +async function makeTestSocketChannel( + _name: string, + socketPath: string, +): Promise { + const fsPromises = await import('node:fs/promises'); + const lineQueue: string[] = []; + const readerQueue: { resolve: (value: string | null) => void }[] = []; + let currentSocket: net.Socket | null = null; + let lineBuffer = ''; + let closed = false; + + function deliverLine(line: string): void { + const reader = readerQueue.shift(); + if (reader) { + reader.resolve(line); + } else { + lineQueue.push(line); + } + } + + function deliverEOF(): void { + while (readerQueue.length > 0) { + readerQueue.shift()?.resolve(null); + } + } + + const server = net.createServer((socket) => { + if (currentSocket) { + socket.destroy(); + return; + } + currentSocket = socket; + lineBuffer = ''; + socket.on('data', (data: Buffer) => { + lineBuffer += data.toString(); + let idx = lineBuffer.indexOf('\n'); + while (idx !== -1) { + deliverLine(lineBuffer.slice(0, idx)); + lineBuffer = lineBuffer.slice(idx + 1); + idx = lineBuffer.indexOf('\n'); + } + }); + socket.on('end', () => { + if (lineBuffer.length > 0) { + deliverLine(lineBuffer); + lineBuffer = ''; + } + currentSocket = null; + deliverEOF(); + }); + socket.on('error', () => { + currentSocket = null; + deliverEOF(); + }); + }); + + try { + await fsPromises.unlink(socketPath); + } catch { + // ignore + } + + await new Promise((resolve, reject) => { + server.on('error', reject); + server.listen(socketPath, () => { + server.removeListener('error', reject); + resolve(); + }); + }); + + return { + async read() { + if (closed) { + return null; + } + const queued = lineQueue.shift(); + if (queued !== undefined) { + return queued; + } + if (!currentSocket) { + return null; + } + return new Promise((resolve) => { + readerQueue.push({ resolve }); + }); + }, + async write(data: string) { + if (!currentSocket) { + throw new Error('no connected client'); + } + const socket = currentSocket; + return new Promise((resolve, reject) => { + socket.write(`${data}\n`, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); + }, + async close() { + if (closed) { + return; + } + closed = true; + deliverEOF(); + currentSocket?.destroy(); + currentSocket = null; + await new Promise((resolve) => { + server.close(() => resolve()); + }); + try { + await fsPromises.unlink(socketPath); + } catch { + // ignore + } + }, + }; +} + +describe('IO kernel service', () => { + const clients: net.Socket[] = []; + + afterEach(async () => { + for (const client of clients) { + client.destroy(); + } + clients.length = 0; + }); + + it('reads and writes through an IO channel', async () => { + const socketPath = tempSocketPath(); + const kernelDatabase = await makeSQLKernelDatabase({ + dbFilename: ':memory:', + }); + const { logger } = makeTestLogger(); + + const { NodejsPlatformServices } = await import('@ocap/nodejs'); + const kernel = await Kernel.make( + new NodejsPlatformServices({ + logger: logger.subLogger({ tags: ['platform'] }), + }), + kernelDatabase, + { + resetStorage: true, + logger, + ioChannelFactory: async (name: string, config: IOConfig) => { + if (config.type !== 'socket') { + throw new Error(`unsupported: ${config.type}`); + } + return makeTestSocketChannel(name, config.path); + }, + }, + ); + + const config = { + bootstrap: 'io', + forceReset: true, + io: { + repl: { + type: 'socket' as const, + path: socketPath, + }, + }, + services: ['repl'], + vats: { + io: { + bundleSpec: getBundleSpec('io-vat'), + parameters: { name: 'io' }, + }, + }, + }; + + const { rootKref } = await kernel.launchSubcluster(config); + await waitUntilQuiescent(); + + // Connect to the socket + const client = await connectToSocket(socketPath); + clients.push(client); + + // Small delay for connection setup + await new Promise((resolve) => setTimeout(resolve, 20)); + + // Send a line from the test to the vat + await writeLine(client, 'hello from test'); + + // Trigger the vat to read + await kernel.queueMessage(rootKref, 'doRead', []); + await waitUntilQuiescent(100); + + // Trigger the vat to write + const linePromise = readLine(client); + await kernel.queueMessage(rootKref, 'doWrite', ['hello from vat']); + await waitUntilQuiescent(100); + + const received = await linePromise; + expect(received).toBe('hello from vat'); + }); +}); diff --git a/packages/kernel-test/src/vats/io-vat.ts b/packages/kernel-test/src/vats/io-vat.ts new file mode 100644 index 000000000..e66b6decb --- /dev/null +++ b/packages/kernel-test/src/vats/io-vat.ts @@ -0,0 +1,39 @@ +import { E } from '@endo/eventual-send'; +import { makeDefaultExo } from '@metamask/kernel-utils/exo'; + +import { unwrapTestLogger } from '../test-powers.ts'; +import type { TestPowers } from '../test-powers.ts'; + +/** + * Build function for testing IO kernel services. + * + * @param vatPowers - Special powers granted to this vat. + * @param parameters - Initialization parameters from the vat's config object. + * @param parameters.name - The name of the vat. + * @returns The root object for the new vat. + */ +// eslint-disable-next-line @typescript-eslint/explicit-function-return-type +export function buildRootObject( + vatPowers: TestPowers, + parameters: { name?: string } = {}, +) { + const name = parameters?.name ?? 'io-vat'; + const tlog = unwrapTestLogger(vatPowers, name); + let ioService: unknown; + + return makeDefaultExo('root', { + async bootstrap(_vats: unknown, services: { repl: unknown }) { + tlog('bootstrap'); + ioService = services.repl; + }, + async doRead() { + const line = await E(ioService).read(); + tlog(`read: ${line}`); + return line; + }, + async doWrite(data: string) { + await E(ioService).write(data); + tlog(`wrote: ${data}`); + }, + }); +} diff --git a/packages/nodejs/src/index.ts b/packages/nodejs/src/index.ts index 6af1ec51b..49c133fdf 100644 --- a/packages/nodejs/src/index.ts +++ b/packages/nodejs/src/index.ts @@ -1,3 +1,4 @@ export { NodejsPlatformServices } from './kernel/PlatformServices.ts'; export { makeKernel } from './kernel/make-kernel.ts'; export { makeNodeJsVatSupervisor } from './vat/make-supervisor.ts'; +export { makeIOChannelFactory, makeSocketIOChannel } from './io/index.ts'; diff --git a/packages/nodejs/src/io/index.ts b/packages/nodejs/src/io/index.ts new file mode 100644 index 000000000..f736baf96 --- /dev/null +++ b/packages/nodejs/src/io/index.ts @@ -0,0 +1,25 @@ +import type { IOChannelFactory } from '@metamask/ocap-kernel'; +import type { IOConfig } from '@metamask/ocap-kernel'; + +import { makeSocketIOChannel } from './socket-channel.ts'; + +export { makeSocketIOChannel } from './socket-channel.ts'; + +/** + * Create an IOChannelFactory for the Node.js environment. + * Dispatches on `config.type` to the appropriate channel implementation. + * + * @returns An IOChannelFactory. + */ +export function makeIOChannelFactory(): IOChannelFactory { + return async (name: string, config: IOConfig) => { + switch (config.type) { + case 'socket': + return makeSocketIOChannel(name, config.path); + default: + throw new Error( + `Unsupported IO channel type "${config.type}" for channel "${name}"`, + ); + } + }; +} diff --git a/packages/nodejs/src/io/socket-channel.test.ts b/packages/nodejs/src/io/socket-channel.test.ts new file mode 100644 index 000000000..ea7b49611 --- /dev/null +++ b/packages/nodejs/src/io/socket-channel.test.ts @@ -0,0 +1,238 @@ +import type { IOChannel } from '@metamask/ocap-kernel'; +import fs from 'node:fs/promises'; +import * as net from 'node:net'; +import * as os from 'node:os'; +import * as path from 'node:path'; +import { describe, it, expect, afterEach } from 'vitest'; + +import { makeSocketIOChannel } from './socket-channel.ts'; + +function tempSocketPath(): string { + return path.join( + os.tmpdir(), + `io-test-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`, + ); +} + +async function connectToSocket(socketPath: string): Promise { + return new Promise((resolve, reject) => { + const client = net.createConnection(socketPath, () => { + client.removeListener('error', reject); + resolve(client); + }); + client.on('error', reject); + }); +} + +async function writeLine(socket: net.Socket, line: string): Promise { + return new Promise((resolve, reject) => { + socket.write(`${line}\n`, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); +} + +async function readLine(socket: net.Socket): Promise { + return new Promise((resolve) => { + let buffer = ''; + const onData = (data: Buffer): void => { + buffer += data.toString(); + const idx = buffer.indexOf('\n'); + if (idx !== -1) { + socket.removeListener('data', onData); + resolve(buffer.slice(0, idx)); + } + }; + socket.on('data', onData); + }); +} + +async function fileExists(filePath: string): Promise { + try { + await fs.access(filePath); + return true; + } catch { + return false; + } +} + +describe('makeSocketIOChannel', () => { + const channels: IOChannel[] = []; + const clients: net.Socket[] = []; + + afterEach(async () => { + for (const client of clients) { + client.destroy(); + } + clients.length = 0; + for (const channel of channels) { + await channel.close(); + } + channels.length = 0; + }); + + it('creates a listening socket', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + expect(await fileExists(socketPath)).toBe(true); + }); + + it('reads lines from a connected client', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + const client = await connectToSocket(socketPath); + clients.push(client); + + await writeLine(client, 'hello'); + await writeLine(client, 'world'); + + const line1 = await channel.read(); + const line2 = await channel.read(); + + expect(line1).toBe('hello'); + expect(line2).toBe('world'); + }); + + it('writes lines to a connected client', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + const client = await connectToSocket(socketPath); + clients.push(client); + + // Small delay for connection to be established + await new Promise((resolve) => setTimeout(resolve, 10)); + + const linePromise = readLine(client); + await channel.write('output'); + const received = await linePromise; + + expect(received).toBe('output'); + }); + + it('returns null on client disconnect', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + const client = await connectToSocket(socketPath); + + // Start a read that will block + const readPromise = channel.read(); + client.destroy(); + + const result = await readPromise; + expect(result).toBeNull(); + }); + + it('returns null when no client is connected', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + const result = await channel.read(); + expect(result).toBeNull(); + }); + + it('throws on write when no client is connected', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + await expect(channel.write('data')).rejects.toThrow( + 'has no connected client', + ); + }); + + it('queues lines before read is called', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + const client = await connectToSocket(socketPath); + clients.push(client); + + // Send lines before any reads + await writeLine(client, 'a'); + await writeLine(client, 'b'); + + // Small delay for data to arrive + await new Promise((resolve) => setTimeout(resolve, 20)); + + expect(await channel.read()).toBe('a'); + expect(await channel.read()).toBe('b'); + }); + + it('rejects second connection', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + const client1 = await connectToSocket(socketPath); + clients.push(client1); + + const client2 = await connectToSocket(socketPath); + + // Second client should be destroyed + await new Promise((resolve) => { + client2.on('close', () => resolve()); + }); + expect(client2.destroyed).toBe(true); + }); + + it('cleans up socket file on close', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + + expect(await fileExists(socketPath)).toBe(true); + await channel.close(); + expect(await fileExists(socketPath)).toBe(false); + }); + + it('returns null after close', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + + await channel.close(); + + const result = await channel.read(); + expect(result).toBeNull(); + }); + + it('throws on write after close', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + const client = await connectToSocket(socketPath); + clients.push(client); + + await channel.close(); + + await expect(channel.write('data')).rejects.toThrow('is closed'); + }); + + it('removes stale socket file on creation', async () => { + const socketPath = tempSocketPath(); + + // Create the first channel + const channel1 = await makeSocketIOChannel('test', socketPath); + await channel1.close(); + + // Recreate a stale file + await fs.writeFile(socketPath, ''); + + // Should succeed despite the stale file + const channel2 = await makeSocketIOChannel('test', socketPath); + channels.push(channel2); + + expect(await fileExists(socketPath)).toBe(true); + }); +}); diff --git a/packages/nodejs/src/io/socket-channel.ts b/packages/nodejs/src/io/socket-channel.ts new file mode 100644 index 000000000..354d767e3 --- /dev/null +++ b/packages/nodejs/src/io/socket-channel.ts @@ -0,0 +1,175 @@ +import type { IOChannel } from '@metamask/ocap-kernel'; +import fs from 'node:fs/promises'; +import * as net from 'node:net'; + +type PendingReader = { + resolve: (value: string | null) => void; +}; + +/** + * Create an IOChannel backed by a Unix domain socket. + * + * Creates a `net.Server` listening on the configured socket path. + * Accepts one connection at a time. Lines are `\n`-delimited. + * + * @param name - The channel name (for diagnostics). + * @param socketPath - The file path for the Unix domain socket. + * @returns A promise for the IOChannel, resolved once the server is listening. + */ +export async function makeSocketIOChannel( + name: string, + socketPath: string, +): Promise { + const lineQueue: string[] = []; + const readerQueue: PendingReader[] = []; + let currentSocket: net.Socket | null = null; + let buffer = ''; + let closed = false; + + /** + * Deliver a line to a pending reader or enqueue it. + * + * @param line - The line to deliver. + */ + function deliverLine(line: string): void { + const reader = readerQueue.shift(); + if (reader) { + reader.resolve(line); + } else { + lineQueue.push(line); + } + } + + /** + * + */ + function deliverEOF(): void { + while (readerQueue.length > 0) { + const reader = readerQueue.shift(); + reader?.resolve(null); + } + } + + /** + * Handle incoming data by splitting on newlines. + * + * @param data - The raw data buffer from the socket. + */ + function handleData(data: Buffer): void { + buffer += data.toString(); + let newlineIndex = buffer.indexOf('\n'); + while (newlineIndex !== -1) { + const line = buffer.slice(0, newlineIndex); + buffer = buffer.slice(newlineIndex + 1); + deliverLine(line); + newlineIndex = buffer.indexOf('\n'); + } + } + + /** + * + */ + function handleDisconnect(): void { + // Deliver any remaining buffered data as a final line + if (buffer.length > 0) { + deliverLine(buffer); + buffer = ''; + } + currentSocket = null; + deliverEOF(); + } + + const server = net.createServer((socket) => { + if (currentSocket) { + // Only one connection at a time + socket.destroy(); + return; + } + currentSocket = socket; + buffer = ''; + + socket.on('data', handleData); + socket.on('end', handleDisconnect); + socket.on('error', handleDisconnect); + socket.on('close', () => { + if (currentSocket === socket) { + handleDisconnect(); + } + }); + }); + + // Remove stale socket file if it exists + try { + await fs.unlink(socketPath); + } catch { + // Ignore if it doesn't exist + } + + await new Promise((resolve, reject) => { + server.on('error', reject); + server.listen(socketPath, () => { + server.removeListener('error', reject); + resolve(); + }); + }); + + const channel: IOChannel = { + async read(): Promise { + if (closed) { + return null; + } + const queued = lineQueue.shift(); + if (queued !== undefined) { + return queued; + } + if (!currentSocket) { + return null; + } + return new Promise((resolve) => { + readerQueue.push({ resolve }); + }); + }, + + async write(data: string): Promise { + if (closed) { + throw new Error(`IO channel "${name}" is closed`); + } + if (!currentSocket) { + throw new Error(`IO channel "${name}" has no connected client`); + } + const socket = currentSocket; + return new Promise((resolve, reject) => { + socket.write(`${data}\n`, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); + }, + + async close(): Promise { + if (closed) { + return; + } + closed = true; + deliverEOF(); + if (currentSocket) { + currentSocket.destroy(); + currentSocket = null; + } + await new Promise((resolve) => { + server.close(() => resolve()); + }); + // Clean up socket file + try { + await fs.unlink(socketPath); + } catch { + // Ignore + } + }, + }; + + return channel; +} diff --git a/packages/nodejs/src/kernel/make-kernel.ts b/packages/nodejs/src/kernel/make-kernel.ts index a359c35a9..00f6353b4 100644 --- a/packages/nodejs/src/kernel/make-kernel.ts +++ b/packages/nodejs/src/kernel/make-kernel.ts @@ -1,8 +1,10 @@ import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs'; import { Logger } from '@metamask/logger'; import { Kernel } from '@metamask/ocap-kernel'; +import type { IOChannelFactory } from '@metamask/ocap-kernel'; import { NodejsPlatformServices } from './PlatformServices.ts'; +import { makeIOChannelFactory } from '../io/index.ts'; /** * The main function for the kernel worker. @@ -13,6 +15,7 @@ import { NodejsPlatformServices } from './PlatformServices.ts'; * @param options.dbFilename - The filename of the SQLite database file. * @param options.logger - The logger to use for the kernel. * @param options.keySeed - Optional seed for libp2p key generation. + * @param options.ioChannelFactory - Optional factory for creating IO channels. * @returns The kernel, initialized. */ export async function makeKernel({ @@ -21,12 +24,14 @@ export async function makeKernel({ dbFilename, logger, keySeed, + ioChannelFactory, }: { workerFilePath?: string; resetStorage?: boolean; dbFilename?: string; logger?: Logger; keySeed?: string | undefined; + ioChannelFactory?: IOChannelFactory; }): Promise { const rootLogger = logger ?? new Logger('kernel-worker'); const platformServicesClient = new NodejsPlatformServices({ @@ -42,6 +47,7 @@ export async function makeKernel({ resetStorage, logger: rootLogger.subLogger({ tags: ['kernel'] }), keySeed, + ioChannelFactory: ioChannelFactory ?? makeIOChannelFactory(), }); return kernel; diff --git a/packages/ocap-kernel/src/Kernel.ts b/packages/ocap-kernel/src/Kernel.ts index e66e535a2..5851533dc 100644 --- a/packages/ocap-kernel/src/Kernel.ts +++ b/packages/ocap-kernel/src/Kernel.ts @@ -2,6 +2,8 @@ import type { CapData } from '@endo/marshal'; import type { KernelDatabase } from '@metamask/kernel-store'; import { Logger } from '@metamask/logger'; +import { IOManager } from './io/IOManager.ts'; +import type { IOChannelFactory } from './io/types.ts'; import { makeKernelFacet } from './kernel-facet.ts'; import type { KernelFacet } from './kernel-facet.ts'; import { KernelQueue } from './KernelQueue.ts'; @@ -82,6 +84,9 @@ export class Kernel { /** The kernel's router */ readonly #kernelRouter: KernelRouter; + /** Manages IO channel lifecycle (optional, requires factory injection) */ + readonly #ioManager: IOManager | undefined; + /** * Construct a new kernel instance. * @@ -92,6 +97,7 @@ export class Kernel { * @param options.logger - Optional logger for error and diagnostic output. * @param options.keySeed - Optional seed for libp2p key generation. * @param options.mnemonic - Optional BIP39 mnemonic for deriving the kernel identity. + * @param options.ioChannelFactory - Optional factory for creating IO channels. */ // eslint-disable-next-line no-restricted-syntax private constructor( @@ -102,6 +108,7 @@ export class Kernel { logger?: Logger; keySeed?: string | undefined; mnemonic?: string | undefined; + ioChannelFactory?: IOChannelFactory; } = {}, ) { this.#platformServices = platformServices; @@ -148,6 +155,21 @@ export class Kernel { logger: this.#logger.subLogger({ tags: ['KernelServiceManager'] }), }); + if (options.ioChannelFactory) { + this.#ioManager = new IOManager({ + factory: options.ioChannelFactory, + registerService: + this.#kernelServiceManager.registerKernelServiceObject.bind( + this.#kernelServiceManager, + ), + unregisterService: + this.#kernelServiceManager.unregisterKernelServiceObject.bind( + this.#kernelServiceManager, + ), + logger: this.#logger.subLogger({ tags: ['IOManager'] }), + }); + } + this.#subclusterManager = new SubclusterManager({ kernelStore: this.#kernelStore, kernelQueue: this.#kernelQueue, @@ -155,6 +177,7 @@ export class Kernel { getKernelService: (name) => this.#kernelServiceManager.getKernelService(name), queueMessage: this.queueMessage.bind(this), + ...(this.#ioManager ? { ioManager: this.#ioManager } : {}), logger: this.#logger.subLogger({ tags: ['SubclusterManager'] }), }); @@ -193,6 +216,7 @@ export class Kernel { * @param options.logger - Optional logger for error and diagnostic output. * @param options.keySeed - Optional seed for libp2p key generation. * @param options.mnemonic - Optional BIP39 mnemonic for deriving the kernel identity. + * @param options.ioChannelFactory - Optional factory for creating IO channels. * @param options.systemSubclusters - Optional array of system subcluster configurations. * @returns A promise for the new kernel instance. */ @@ -204,6 +228,7 @@ export class Kernel { logger?: Logger; keySeed?: string | undefined; mnemonic?: string | undefined; + ioChannelFactory?: IOChannelFactory; systemSubclusters?: SystemSubclusterConfig[]; } = {}, ): Promise { diff --git a/packages/ocap-kernel/src/KernelServiceManager.test.ts b/packages/ocap-kernel/src/KernelServiceManager.test.ts index cec435293..af375ba76 100644 --- a/packages/ocap-kernel/src/KernelServiceManager.test.ts +++ b/packages/ocap-kernel/src/KernelServiceManager.test.ts @@ -132,6 +132,71 @@ describe('KernelServiceManager', () => { }); }); + describe('unregisterKernelServiceObject', () => { + it('removes a registered service', () => { + const testService = { testMethod: () => 'test' }; + serviceManager.registerKernelServiceObject('myService', testService); + + serviceManager.unregisterKernelServiceObject('myService'); + + expect(serviceManager.getKernelService('myService')).toBeUndefined(); + }); + + it('removes from kref lookup', () => { + const testService = { testMethod: () => 'test' }; + const registered = serviceManager.registerKernelServiceObject( + 'myService', + testService, + ); + + serviceManager.unregisterKernelServiceObject('myService'); + + expect( + serviceManager.getKernelServiceByKref(registered.kref), + ).toBeUndefined(); + expect(serviceManager.isKernelService(registered.kref)).toBe(false); + }); + + it('unpins the object and deletes the KV key', () => { + const testService = { testMethod: () => 'test' }; + const registered = serviceManager.registerKernelServiceObject( + 'myService', + testService, + ); + + serviceManager.unregisterKernelServiceObject('myService'); + + expect(kernelStore.kv.get('kernelService.myService')).toBeUndefined(); + // Verify it was unpinned by trying to re-register with the same name + const reregistered = serviceManager.registerKernelServiceObject( + 'myService', + testService, + ); + expect(reregistered.kref).not.toBe(registered.kref); + }); + + it('is a no-op for non-existent service', () => { + expect(() => + serviceManager.unregisterKernelServiceObject('nonexistent'), + ).not.toThrow(); + }); + + it('allows re-registration after unregister', () => { + const service1 = { method: () => 'v1' }; + const service2 = { method: () => 'v2' }; + + serviceManager.registerKernelServiceObject('svc', service1); + serviceManager.unregisterKernelServiceObject('svc'); + const registered = serviceManager.registerKernelServiceObject( + 'svc', + service2, + ); + + expect(registered.service).toBe(service2); + expect(serviceManager.getKernelService('svc')?.service).toBe(service2); + }); + }); + describe('getKernelService', () => { it('retrieves registered service by name', () => { const testService = { diff --git a/packages/ocap-kernel/src/KernelServiceManager.ts b/packages/ocap-kernel/src/KernelServiceManager.ts index 3208984ab..f3c1dc001 100644 --- a/packages/ocap-kernel/src/KernelServiceManager.ts +++ b/packages/ocap-kernel/src/KernelServiceManager.ts @@ -87,6 +87,22 @@ export class KernelServiceManager { return kernelService; } + /** + * Unregister a kernel service object by name. + * + * @param name - The name of the service to unregister. + */ + unregisterKernelServiceObject(name: string): void { + const service = this.#kernelServicesByName.get(name); + if (!service) { + return; + } + this.#kernelServicesByName.delete(name); + this.#kernelServicesByObject.delete(service.kref); + this.#kernelStore.unpinObject(service.kref); + this.#kernelStore.kv.delete(`kernelService.${name}`); + } + /** * Get a kernel service by name. * diff --git a/packages/ocap-kernel/src/index.ts b/packages/ocap-kernel/src/index.ts index c9e1b804b..938019f5a 100644 --- a/packages/ocap-kernel/src/index.ts +++ b/packages/ocap-kernel/src/index.ts @@ -2,9 +2,12 @@ export { Kernel } from './Kernel.ts'; export { VatHandle } from './vats/VatHandle.ts'; export { VatSupervisor } from './vats/VatSupervisor.ts'; export { initTransport } from './remotes/platform/transport.ts'; +export type { IOChannel, IOChannelFactory } from './io/types.ts'; export type { Baggage, ClusterConfig, + IOConfig, + IOSpec, KRef, Message, VatId, diff --git a/packages/ocap-kernel/src/io/IOManager.test.ts b/packages/ocap-kernel/src/io/IOManager.test.ts new file mode 100644 index 000000000..62f6fafd8 --- /dev/null +++ b/packages/ocap-kernel/src/io/IOManager.test.ts @@ -0,0 +1,152 @@ +import { Logger } from '@metamask/logger'; +import { describe, it, expect, vi, beforeEach } from 'vitest'; + +import { IOManager } from './IOManager.ts'; +import type { IOChannel, IOChannelFactory } from './types.ts'; +import type { KernelService } from '../KernelServiceManager.ts'; +import type { IOConfig } from '../types.ts'; + +const makeChannel = (): IOChannel => ({ + read: vi.fn().mockResolvedValue('data'), + write: vi.fn().mockResolvedValue(undefined), + close: vi.fn().mockResolvedValue(undefined), +}); + +describe('IOManager', () => { + let factory: IOChannelFactory; + let registerService: ReturnType; + let unregisterService: ReturnType; + let logger: Logger; + let manager: IOManager; + let channels: IOChannel[]; + + beforeEach(() => { + channels = []; + factory = vi.fn(async () => { + const ch = makeChannel(); + channels.push(ch); + return ch; + }) as unknown as IOChannelFactory; + + registerService = vi.fn( + (name: string): KernelService => ({ + name, + kref: `ko${name}`, + service: {}, + systemOnly: false, + }), + ); + unregisterService = vi.fn(); + logger = new Logger('test'); + + manager = new IOManager({ + factory, + registerService, + unregisterService, + logger, + }); + }); + + describe('createChannels', () => { + it('creates channels and registers services', async () => { + const ioConfig: Record = { + repl: { type: 'socket', path: '/tmp/repl.sock' } as IOConfig, + }; + + await manager.createChannels('s1', ioConfig); + + expect(factory).toHaveBeenCalledWith('repl', ioConfig.repl); + expect(registerService).toHaveBeenCalledWith('repl', expect.any(Object)); + }); + + it('creates multiple channels', async () => { + const ioConfig: Record = { + input: { type: 'socket', path: '/tmp/in.sock' } as IOConfig, + output: { type: 'socket', path: '/tmp/out.sock' } as IOConfig, + }; + + await manager.createChannels('s1', ioConfig); + + expect(factory).toHaveBeenCalledTimes(2); + expect(registerService).toHaveBeenCalledTimes(2); + }); + + it('cleans up on factory failure', async () => { + const successChannel = makeChannel(); + let callCount = 0; + const failingFactory = vi.fn(async () => { + callCount += 1; + if (callCount === 2) { + throw new Error('factory error'); + } + return successChannel; + }) as unknown as IOChannelFactory; + + const mgr = new IOManager({ + factory: failingFactory, + registerService, + unregisterService, + logger, + }); + + const ioConfig: Record = { + first: { type: 'socket', path: '/tmp/a.sock' } as IOConfig, + second: { type: 'socket', path: '/tmp/b.sock' } as IOConfig, + }; + + await expect(mgr.createChannels('s1', ioConfig)).rejects.toThrow( + 'factory error', + ); + + expect(successChannel.close).toHaveBeenCalledOnce(); + expect(unregisterService).toHaveBeenCalledWith('first'); + }); + }); + + describe('destroyChannels', () => { + it('closes channels and unregisters services', async () => { + const ioConfig: Record = { + repl: { type: 'socket', path: '/tmp/repl.sock' } as IOConfig, + }; + + await manager.createChannels('s1', ioConfig); + await manager.destroyChannels('s1'); + + expect(channels[0]?.close).toHaveBeenCalledOnce(); + expect(unregisterService).toHaveBeenCalledWith('repl'); + }); + + it('is idempotent for unknown subcluster', async () => { + expect(await manager.destroyChannels('nonexistent')).toBeUndefined(); + }); + + it('handles close errors gracefully', async () => { + const errorChannel = makeChannel(); + (errorChannel.close as ReturnType).mockRejectedValue( + new Error('close failed'), + ); + + const errorFactory = vi.fn( + async () => errorChannel, + ) as unknown as IOChannelFactory; + const errorSpy = vi.spyOn(logger, 'error'); + + const mgr = new IOManager({ + factory: errorFactory, + registerService, + unregisterService, + logger, + }); + + await mgr.createChannels('s1', { + ch: { type: 'socket', path: '/tmp/ch.sock' } as IOConfig, + }); + await mgr.destroyChannels('s1'); + + expect(errorSpy).toHaveBeenCalledWith( + 'Error closing IO channel "ch":', + expect.any(Error), + ); + }); + }); +}); diff --git a/packages/ocap-kernel/src/io/IOManager.ts b/packages/ocap-kernel/src/io/IOManager.ts new file mode 100644 index 000000000..abba56eaa --- /dev/null +++ b/packages/ocap-kernel/src/io/IOManager.ts @@ -0,0 +1,139 @@ +import type { Logger } from '@metamask/logger'; + +import { makeIOService } from './io-service.ts'; +import type { IOChannel, IOChannelFactory } from './types.ts'; +import type { KernelService } from '../KernelServiceManager.ts'; +import type { IOConfig } from '../types.ts'; + +type RegisterService = ( + name: string, + service: object, + options?: { systemOnly?: boolean }, +) => KernelService; +type UnregisterService = (name: string) => void; + +type IOManagerOptions = { + factory: IOChannelFactory; + registerService: RegisterService; + unregisterService: UnregisterService; + logger?: Logger; +}; + +type SubclusterIOState = { + channels: Map; + serviceNames: string[]; +}; + +/** + * Manages IO channel lifecycle, creating channels at subcluster launch + * and destroying them at termination. + */ +export class IOManager { + readonly #factory: IOChannelFactory; + + readonly #registerService: RegisterService; + + readonly #unregisterService: UnregisterService; + + readonly #logger: Logger | undefined; + + /** IO state indexed by subcluster ID */ + readonly #subclusters: Map = new Map(); + + /** + * Creates a new IOManager instance. + * + * @param options - Constructor options. + * @param options.factory - Factory for creating IO channels. + * @param options.registerService - Function to register a kernel service. + * @param options.unregisterService - Function to unregister a kernel service. + * @param options.logger - Optional logger for diagnostics. + */ + constructor({ + factory, + registerService, + unregisterService, + logger, + }: IOManagerOptions) { + this.#factory = factory; + this.#registerService = registerService; + this.#unregisterService = unregisterService; + this.#logger = logger; + harden(this); + } + + /** + * Create IO channels for a subcluster and register them as kernel services. + * + * @param subclusterId - The ID of the subcluster. + * @param ioConfig - The IO configuration map from channel names to configs. + */ + async createChannels( + subclusterId: string, + ioConfig: Record, + ): Promise { + const channels = new Map(); + const serviceNames: string[] = []; + + for (const [name, config] of Object.entries(ioConfig)) { + try { + const channel = await this.#factory(name, config); + channels.set(name, channel); + + const service = makeIOService(name, channel, config); + this.#registerService(name, service); + serviceNames.push(name); + + this.#logger?.info( + `Created IO channel "${name}" for subcluster ${subclusterId}`, + ); + } catch (error) { + // Clean up any channels we already created before re-throwing + await this.#closeChannels(channels); + for (const registeredName of serviceNames) { + this.#unregisterService(registeredName); + } + throw error; + } + } + + this.#subclusters.set(subclusterId, { channels, serviceNames }); + } + + /** + * Destroy IO channels for a subcluster and unregister their services. + * + * @param subclusterId - The ID of the subcluster. + */ + async destroyChannels(subclusterId: string): Promise { + const state = this.#subclusters.get(subclusterId); + if (!state) { + return; + } + + for (const name of state.serviceNames) { + this.#unregisterService(name); + } + + await this.#closeChannels(state.channels); + this.#subclusters.delete(subclusterId); + + this.#logger?.info(`Destroyed IO channels for subcluster ${subclusterId}`); + } + + /** + * Close all channels in a map, logging errors. + * + * @param channels - The channels to close. + */ + async #closeChannels(channels: Map): Promise { + for (const [name, channel] of channels) { + try { + await channel.close(); + } catch (error) { + this.#logger?.error(`Error closing IO channel "${name}":`, error); + } + } + } +} +harden(IOManager); diff --git a/packages/ocap-kernel/src/io/index.ts b/packages/ocap-kernel/src/io/index.ts new file mode 100644 index 000000000..a132c8a31 --- /dev/null +++ b/packages/ocap-kernel/src/io/index.ts @@ -0,0 +1,2 @@ +export { IOManager } from './IOManager.ts'; +export type { IOChannel, IOChannelFactory } from './types.ts'; diff --git a/packages/ocap-kernel/src/io/io-service.test.ts b/packages/ocap-kernel/src/io/io-service.test.ts new file mode 100644 index 000000000..de799bde1 --- /dev/null +++ b/packages/ocap-kernel/src/io/io-service.test.ts @@ -0,0 +1,116 @@ +import { describe, it, expect, vi } from 'vitest'; + +import { makeIOService } from './io-service.ts'; +import type { IOChannel } from './types.ts'; +import type { IOConfig } from '../types.ts'; + +const makeChannel = (): IOChannel => ({ + read: vi.fn().mockResolvedValue('hello'), + write: vi.fn().mockResolvedValue(undefined), + close: vi.fn().mockResolvedValue(undefined), +}); + +const makeConfig = (overrides: Partial = {}): IOConfig => + ({ + type: 'socket', + path: '/tmp/test.sock', + ...overrides, + }) as IOConfig; + +describe('makeIOService', () => { + describe('read()', () => { + it('delegates to the channel', async () => { + const channel = makeChannel(); + const service = makeIOService('test', channel, makeConfig()) as { + read: () => Promise; + }; + + const result = await service.read(); + + expect(result).toBe('hello'); + expect(channel.read).toHaveBeenCalledOnce(); + }); + + it('throws on write-only channel', async () => { + const channel = makeChannel(); + const service = makeIOService( + 'test', + channel, + makeConfig({ direction: 'out' }), + ) as { read: () => Promise }; + + await expect(service.read()).rejects.toThrow( + 'IO channel "test" is write-only', + ); + expect(channel.read).not.toHaveBeenCalled(); + }); + + it.each(['in', 'inout'] as const)( + 'allows read on direction=%s', + async (direction) => { + const channel = makeChannel(); + const service = makeIOService( + 'test', + channel, + makeConfig({ direction }), + ) as { read: () => Promise }; + + expect(await service.read()).toBe('hello'); + }, + ); + }); + + describe('write()', () => { + it('delegates to the channel', async () => { + const channel = makeChannel(); + const service = makeIOService('test', channel, makeConfig()) as { + write: (data: string) => Promise; + }; + + await service.write('world'); + + expect(channel.write).toHaveBeenCalledWith('world'); + }); + + it('throws on read-only channel', async () => { + const channel = makeChannel(); + const service = makeIOService( + 'test', + channel, + makeConfig({ direction: 'in' }), + ) as { write: (data: string) => Promise }; + + await expect(service.write('data')).rejects.toThrow( + 'IO channel "test" is read-only', + ); + expect(channel.write).not.toHaveBeenCalled(); + }); + + it.each(['out', 'inout'] as const)( + 'allows write on direction=%s', + async (direction) => { + const channel = makeChannel(); + const service = makeIOService( + 'test', + channel, + makeConfig({ direction }), + ) as { write: (data: string) => Promise }; + + expect(await service.write('data')).toBeUndefined(); + }, + ); + }); + + describe('direction defaults', () => { + it('defaults to inout when direction is not specified', async () => { + const channel = makeChannel(); + const service = makeIOService('test', channel, makeConfig()) as { + read: () => Promise; + write: (data: string) => Promise; + }; + + expect(await service.read()).toBe('hello'); + expect(await service.write('data')).toBeUndefined(); + }); + }); +}); diff --git a/packages/ocap-kernel/src/io/io-service.ts b/packages/ocap-kernel/src/io/io-service.ts new file mode 100644 index 000000000..ba45cbdc0 --- /dev/null +++ b/packages/ocap-kernel/src/io/io-service.ts @@ -0,0 +1,37 @@ +import { makeDefaultExo } from '@metamask/kernel-utils/exo'; + +import type { IOChannel } from './types.ts'; +import type { IOConfig } from '../types.ts'; + +/** + * Create a kernel service exo that wraps an IOChannel. + * + * @param name - The name of the IO channel. + * @param channel - The underlying IOChannel to delegate to. + * @param config - The IO configuration for this channel. + * @returns A remotable service object with `read()` and `write()` methods. + */ +export function makeIOService( + name: string, + channel: IOChannel, + config: IOConfig, +): object { + const direction = config.direction ?? 'inout'; + + return makeDefaultExo(`io:${name}`, { + async read(): Promise { + if (direction === 'out') { + throw new Error(`IO channel "${name}" is write-only`); + } + return channel.read(); + }, + + async write(data: string): Promise { + if (direction === 'in') { + throw new Error(`IO channel "${name}" is read-only`); + } + return channel.write(data); + }, + }); +} +harden(makeIOService); diff --git a/packages/ocap-kernel/src/io/types.ts b/packages/ocap-kernel/src/io/types.ts new file mode 100644 index 000000000..f08f00dbd --- /dev/null +++ b/packages/ocap-kernel/src/io/types.ts @@ -0,0 +1,27 @@ +import type { IOConfig } from '../types.ts'; + +/** + * A platform-agnostic IO channel that supports reading and writing data. + * Implementations are platform-specific (e.g., Unix domain sockets in Node.js). + */ +export type IOChannel = { + /** Read the next unit of data, or `null` on EOF/disconnect. */ + read(): Promise; + /** Write a unit of data to the channel. */ + write(data: string): Promise; + /** Close the channel and release resources. */ + close(): Promise; +}; + +/** + * Factory function that creates an IOChannel for a given configuration. + * Injected from the host environment (e.g., Node.js) into the kernel. + * + * @param name - The name of the IO channel (from the cluster config key). + * @param config - The IO configuration describing the channel type and options. + * @returns A promise for the created IOChannel. + */ +export type IOChannelFactory = ( + name: string, + config: IOConfig, +) => Promise; diff --git a/packages/ocap-kernel/src/types.ts b/packages/ocap-kernel/src/types.ts index c77d6ff89..3fb68f0b6 100644 --- a/packages/ocap-kernel/src/types.ts +++ b/packages/ocap-kernel/src/types.ts @@ -11,6 +11,7 @@ import type { VatCheckpoint } from '@metamask/kernel-store'; import type { JsonRpcMessage } from '@metamask/kernel-utils'; import type { DuplexStream } from '@metamask/streams'; import { + assign, define, is, object, @@ -432,10 +433,59 @@ export const isVatConfig = (value: unknown): value is VatConfig => export type VatConfigTable = Record; +// IO configuration types + +const ConsoleIOSpecStruct = object({ type: literal('console') }); +const ListenIOSpecStruct = object({ + type: literal('listen'), + hostport: string(), +}); +const NetworkIOSpecStruct = object({ + type: literal('network'), + hostport: string(), +}); +const FileIOSpecStruct = object({ type: literal('file'), path: string() }); +const SocketIOSpecStruct = object({ type: literal('socket'), path: string() }); + +export type IOSpec = + | Infer + | Infer + | Infer + | Infer + | Infer; + +const IODirectionStruct = union([ + literal('in'), + literal('out'), + literal('inout'), +]); +const IOUnitStruct = union([ + literal('line'), + literal('string'), + literal('chars'), + literal('bytes'), +]); + +const IOExtraStruct = object({ + direction: exactOptional(IODirectionStruct), + unit: exactOptional(IOUnitStruct), +}); + +const IOConfigStruct = union([ + assign(ConsoleIOSpecStruct, IOExtraStruct), + assign(ListenIOSpecStruct, IOExtraStruct), + assign(NetworkIOSpecStruct, IOExtraStruct), + assign(FileIOSpecStruct, IOExtraStruct), + assign(SocketIOSpecStruct, IOExtraStruct), +]); + +export type IOConfig = Infer; + export const ClusterConfigStruct = object({ bootstrap: string(), forceReset: exactOptional(boolean()), services: exactOptional(array(string())), + io: exactOptional(record(string(), IOConfigStruct)), vats: record(string(), VatConfigStruct), bundles: exactOptional(record(string(), VatConfigStruct)), }); diff --git a/packages/ocap-kernel/src/vats/SubclusterManager.ts b/packages/ocap-kernel/src/vats/SubclusterManager.ts index 8d00ba3b7..b5f236970 100644 --- a/packages/ocap-kernel/src/vats/SubclusterManager.ts +++ b/packages/ocap-kernel/src/vats/SubclusterManager.ts @@ -2,6 +2,7 @@ import type { CapData } from '@endo/marshal'; import { SubclusterNotFoundError } from '@metamask/kernel-errors'; import { Logger } from '@metamask/logger'; +import type { IOManager } from '../io/IOManager.ts'; import type { KernelQueue } from '../KernelQueue.ts'; import type { VatManager } from './VatManager.ts'; import { kslot, kunser } from '../liveslots/kernel-marshal.ts'; @@ -30,6 +31,7 @@ type SubclusterManagerOptions = { method: string, args: unknown[], ) => Promise>; + ioManager?: IOManager; logger?: Logger; }; @@ -61,6 +63,9 @@ export class SubclusterManager { /** Logger for diagnostic output */ readonly #logger: Logger; + /** Optional IO manager for creating/destroying IO channels */ + readonly #ioManager: IOManager | undefined; + /** Stores bootstrap root krefs of launched system subclusters */ readonly #systemSubclusterRoots: Map = new Map(); @@ -73,6 +78,7 @@ export class SubclusterManager { * @param options.vatManager - Manager for creating and managing vat instances. * @param options.getKernelService - Function to retrieve a kernel service by its kref. * @param options.queueMessage - Function to queue messages for delivery to targets. + * @param options.ioManager - Optional IO manager for IO channel lifecycle. * @param options.logger - Optional logger for diagnostic output. */ constructor({ @@ -81,6 +87,7 @@ export class SubclusterManager { vatManager, getKernelService, queueMessage, + ioManager, logger, }: SubclusterManagerOptions) { this.#kernelStore = kernelStore; @@ -88,6 +95,7 @@ export class SubclusterManager { this.#vatManager = vatManager; this.#getKernelService = getKernelService; this.#queueMessage = queueMessage; + this.#ioManager = ioManager; this.#logger = logger ?? new Logger('SubclusterManager'); harden(this); } @@ -111,8 +119,15 @@ export class SubclusterManager { if (!config.vats[config.bootstrap]) { Fail`invalid bootstrap vat name ${config.bootstrap}`; } - this.#validateServices(config, isSystem); const subclusterId = this.#kernelStore.addSubcluster(config); + + // Create IO channels before validating services so that IO service + // names are registered and discoverable by #validateServices. + if (config.io && this.#ioManager) { + await this.#ioManager.createChannels(subclusterId, config.io); + } + + this.#validateServices(config, isSystem); const { rootKref, bootstrapResult } = await this.#launchVatsForSubcluster( subclusterId, config, @@ -143,6 +158,11 @@ export class SubclusterManager { } } + // Destroy IO channels before terminating vats + if (this.#ioManager) { + await this.#ioManager.destroyChannels(subclusterId); + } + const vatIdsToTerminate = this.#kernelStore.getSubclusterVats(subclusterId); for (const vatId of vatIdsToTerminate.reverse()) { await this.#vatManager.terminateVat(vatId);