Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
255 changes: 255 additions & 0 deletions packages/kernel-test/src/io.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs';
import { waitUntilQuiescent } from '@metamask/kernel-utils';
import { Kernel } from '@metamask/ocap-kernel';
import type { IOChannel, IOConfig } from '@metamask/ocap-kernel';
import * as net from 'node:net';
import * as os from 'node:os';
import * as path from 'node:path';
import { describe, it, expect, afterEach } from 'vitest';

import { getBundleSpec, makeTestLogger } from './utils.ts';

function tempSocketPath(): string {
return path.join(
os.tmpdir(),
`io-int-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`,
);
}

async function connectToSocket(socketPath: string): Promise<net.Socket> {
return new Promise((resolve, reject) => {
const client = net.createConnection(socketPath, () => {
client.removeListener('error', reject);
resolve(client);
});
client.on('error', reject);
});
}

async function writeLine(socket: net.Socket, line: string): Promise<void> {
return new Promise((resolve, reject) => {
socket.write(`${line}\n`, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
}

async function readLine(socket: net.Socket): Promise<string> {
return new Promise((resolve) => {
let buffer = '';
const onData = (data: Buffer): void => {
buffer += data.toString();
const idx = buffer.indexOf('\n');
if (idx !== -1) {
socket.removeListener('data', onData);
resolve(buffer.slice(0, idx));
}
};
socket.on('data', onData);
});
}

async function makeTestSocketChannel(
_name: string,
socketPath: string,
): Promise<IOChannel> {
const fsPromises = await import('node:fs/promises');
const lineQueue: string[] = [];
const readerQueue: { resolve: (value: string | null) => void }[] = [];
let currentSocket: net.Socket | null = null;
let lineBuffer = '';
let closed = false;

function deliverLine(line: string): void {
const reader = readerQueue.shift();
if (reader) {
reader.resolve(line);
} else {
lineQueue.push(line);
}
}

function deliverEOF(): void {
while (readerQueue.length > 0) {
readerQueue.shift()?.resolve(null);
}
}

const server = net.createServer((socket) => {
if (currentSocket) {
socket.destroy();
return;
}
currentSocket = socket;
lineBuffer = '';
socket.on('data', (data: Buffer) => {
lineBuffer += data.toString();
let idx = lineBuffer.indexOf('\n');
while (idx !== -1) {
deliverLine(lineBuffer.slice(0, idx));
lineBuffer = lineBuffer.slice(idx + 1);
idx = lineBuffer.indexOf('\n');
}
});
socket.on('end', () => {
if (lineBuffer.length > 0) {
deliverLine(lineBuffer);
lineBuffer = '';
}
currentSocket = null;
deliverEOF();
});
socket.on('error', () => {
currentSocket = null;
deliverEOF();
});
});

try {
await fsPromises.unlink(socketPath);
} catch {
// ignore
}

await new Promise<void>((resolve, reject) => {
server.on('error', reject);
server.listen(socketPath, () => {
server.removeListener('error', reject);
resolve();
});
});

return {
async read() {
if (closed) {
return null;
}
const queued = lineQueue.shift();
if (queued !== undefined) {
return queued;
}
if (!currentSocket) {
return null;
}
return new Promise<string | null>((resolve) => {
readerQueue.push({ resolve });
});
},
async write(data: string) {
if (!currentSocket) {
throw new Error('no connected client');
}
const socket = currentSocket;
return new Promise<void>((resolve, reject) => {
socket.write(`${data}\n`, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
},
async close() {
if (closed) {
return;
}
closed = true;
deliverEOF();
currentSocket?.destroy();
currentSocket = null;
await new Promise<void>((resolve) => {
server.close(() => resolve());
});
try {
await fsPromises.unlink(socketPath);
} catch {
// ignore
}
},
};
}

describe('IO kernel service', () => {
const clients: net.Socket[] = [];

afterEach(async () => {
for (const client of clients) {
client.destroy();
}
clients.length = 0;
});

it('reads and writes through an IO channel', async () => {
const socketPath = tempSocketPath();
const kernelDatabase = await makeSQLKernelDatabase({
dbFilename: ':memory:',
});
const { logger } = makeTestLogger();

const { NodejsPlatformServices } = await import('@ocap/nodejs');
const kernel = await Kernel.make(
new NodejsPlatformServices({
logger: logger.subLogger({ tags: ['platform'] }),
}),
kernelDatabase,
{
resetStorage: true,
logger,
ioChannelFactory: async (name: string, config: IOConfig) => {
if (config.type !== 'socket') {
throw new Error(`unsupported: ${config.type}`);
}
return makeTestSocketChannel(name, config.path);
},
},
);

const config = {
bootstrap: 'io',
forceReset: true,
io: {
repl: {
type: 'socket' as const,
path: socketPath,
},
},
services: ['repl'],
vats: {
io: {
bundleSpec: getBundleSpec('io-vat'),
parameters: { name: 'io' },
},
},
};

const { rootKref } = await kernel.launchSubcluster(config);
await waitUntilQuiescent();

// Connect to the socket
const client = await connectToSocket(socketPath);
clients.push(client);

// Small delay for connection setup
await new Promise((resolve) => setTimeout(resolve, 20));

// Send a line from the test to the vat
await writeLine(client, 'hello from test');

// Trigger the vat to read
await kernel.queueMessage(rootKref, 'doRead', []);
await waitUntilQuiescent(100);

// Trigger the vat to write
const linePromise = readLine(client);
await kernel.queueMessage(rootKref, 'doWrite', ['hello from vat']);
await waitUntilQuiescent(100);

const received = await linePromise;
expect(received).toBe('hello from vat');
});
});
39 changes: 39 additions & 0 deletions packages/kernel-test/src/vats/io-vat.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { E } from '@endo/eventual-send';
import { makeDefaultExo } from '@metamask/kernel-utils/exo';

import { unwrapTestLogger } from '../test-powers.ts';
import type { TestPowers } from '../test-powers.ts';

/**
* Build function for testing IO kernel services.
*
* @param vatPowers - Special powers granted to this vat.
* @param parameters - Initialization parameters from the vat's config object.
* @param parameters.name - The name of the vat.
* @returns The root object for the new vat.
*/
// eslint-disable-next-line @typescript-eslint/explicit-function-return-type
export function buildRootObject(
vatPowers: TestPowers,
parameters: { name?: string } = {},
) {
const name = parameters?.name ?? 'io-vat';
const tlog = unwrapTestLogger(vatPowers, name);
let ioService: unknown;

return makeDefaultExo('root', {
async bootstrap(_vats: unknown, services: { repl: unknown }) {
tlog('bootstrap');
ioService = services.repl;
},
async doRead() {
const line = await E(ioService).read();
tlog(`read: ${line}`);
return line;
},
async doWrite(data: string) {
await E(ioService).write(data);
tlog(`wrote: ${data}`);
},
});
}
1 change: 1 addition & 0 deletions packages/nodejs/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export { NodejsPlatformServices } from './kernel/PlatformServices.ts';
export { makeKernel } from './kernel/make-kernel.ts';
export { makeNodeJsVatSupervisor } from './vat/make-supervisor.ts';
export { makeIOChannelFactory, makeSocketIOChannel } from './io/index.ts';
25 changes: 25 additions & 0 deletions packages/nodejs/src/io/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import type { IOChannelFactory } from '@metamask/ocap-kernel';
import type { IOConfig } from '@metamask/ocap-kernel';

import { makeSocketIOChannel } from './socket-channel.ts';

export { makeSocketIOChannel } from './socket-channel.ts';

/**
* Create an IOChannelFactory for the Node.js environment.
* Dispatches on `config.type` to the appropriate channel implementation.
*
* @returns An IOChannelFactory.
*/
export function makeIOChannelFactory(): IOChannelFactory {
return async (name: string, config: IOConfig) => {
switch (config.type) {
case 'socket':
return makeSocketIOChannel(name, config.path);
default:
throw new Error(
`Unsupported IO channel type "${config.type}" for channel "${name}"`,
);
}
};
}
Loading
Loading