Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
ARG BASE=node
ARG BASE_VERSION=20-bookworm
ARG BASE_VERSION=22-bookworm
FROM ${BASE}:${BASE_VERSION} AS build

LABEL org.opencontainers.image.source="https://github.com/constructive-io/constructive"
Expand Down
2 changes: 1 addition & 1 deletion jobs/job-scheduler/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@
"@constructive-io/job-pg": "workspace:^",
"@constructive-io/job-utils": "workspace:^",
"@pgpmjs/logger": "workspace:^",
"node-schedule": "1.3.2"
"node-schedule": "^2.1.1"
}
}
90 changes: 43 additions & 47 deletions jobs/job-scheduler/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,59 +199,55 @@ export default class Scheduler {
}
}
}
listen() {
async listen(): Promise<void> {
if (this.stopped) return;
const listenForChanges = (
err: Error | null,
client: PoolClient,
release: () => void
) => {
if (err) {
log.error('Error connecting with notify listener', err);
if (err instanceof Error && err.stack) {
log.debug(err.stack);
}
// Try again in 5 seconds
// should this really be done in the node process?
if (!this.stopped) {
setTimeout(this.listen, 5000);
}
return;
let client: PoolClient;
let release: () => void;
try {
client = await this.pgPool.connect();
release = () => client.release();
} catch (err) {
log.error('Error connecting with notify listener', err);
if (err instanceof Error && err.stack) {
log.debug(err.stack);
}
if (!this.stopped) {
setTimeout(() => this.listen(), 5000);
}
return;
}
if (this.stopped) {
release();
return;
}
this.listenClient = client;
this.listenRelease = release;
client.on('notification', () => {
log.info('a NEW scheduled JOB!');
if (this.doNextTimer) {
// Must be idle, do something!
this.doNext(client);
}
});
client.query('LISTEN "scheduled_jobs:insert"');
client.on('error', (e: unknown) => {
if (this.stopped) {
release();
return;
}
this.listenClient = client;
this.listenRelease = release;
client.on('notification', () => {
log.info('a NEW scheduled JOB!');
if (this.doNextTimer) {
// Must be idle, do something!
this.doNext(client);
}
});
client.query('LISTEN "scheduled_jobs:insert"');
client.on('error', (e: unknown) => {
if (this.stopped) {
release();
return;
}
log.error('Error with database notify listener', e);
if (e instanceof Error && e.stack) {
log.debug(e.stack);
}
release();
if (!this.stopped) {
this.listen();
}
});
log.info(
`${this.workerId} connected and looking for scheduled jobs...`
);
this.doNext(client);
};
this.pgPool.connect(listenForChanges);
log.error('Error with database notify listener', e);
if (e instanceof Error && e.stack) {
log.debug(e.stack);
}
release();
if (!this.stopped) {
this.listen();
}
});
log.info(
`${this.workerId} connected and looking for scheduled jobs...`
);
this.doNext(client);
}

async stop(): Promise<void> {
Expand Down
1 change: 0 additions & 1 deletion jobs/knative-job-fn/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
},
"dependencies": {
"@pgpmjs/logger": "workspace:^",
"body-parser": "1.19.0",
"express": "5.2.1"
}
}
3 changes: 1 addition & 2 deletions jobs/knative-job-fn/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import express from 'express';
import bodyParser from 'body-parser';
import http from 'node:http';
import https from 'node:https';
import { URL } from 'node:url';
Expand Down Expand Up @@ -131,7 +130,7 @@ const logger = createLogger('knative-job-fn');
const createJobApp = () => {
const app: any = express();

app.use(bodyParser.json());
app.use(express.json());

// Basic request logging for all incoming job invocations.
app.use((req: any, res: any, next: any) => {
Expand Down
1 change: 0 additions & 1 deletion jobs/knative-job-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
"@constructive-io/job-pg": "workspace:^",
"@constructive-io/job-utils": "workspace:^",
"@pgpmjs/logger": "workspace:^",
"body-parser": "1.19.0",
"express": "5.2.1",
"pg": "8.17.1"
}
Expand Down
3 changes: 1 addition & 2 deletions jobs/knative-job-server/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import express from 'express';
import bodyParser from 'body-parser';
import type { Pool, PoolClient } from 'pg';
import * as jobs from '@constructive-io/job-utils';
import poolManager from '@constructive-io/job-pg';
Expand Down Expand Up @@ -39,7 +38,7 @@ const logger = createLogger('knative-job-server');

export default (pgPool: Pool = poolManager.getPool()) => {
const app = express();
app.use(bodyParser.json());
app.use(express.json());

const withClient =
(cb: WithClientHandler) =>
Expand Down
3 changes: 1 addition & 2 deletions jobs/knative-job-worker/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,7 @@
"@constructive-io/job-pg": "workspace:^",
"@constructive-io/job-utils": "workspace:^",
"@pgpmjs/logger": "workspace:^",
"pg": "8.17.1",
"request": "2.88.2"
"pg": "8.17.1"
},
"devDependencies": {
"@pgpm/database-jobs": "^0.16.0",
Expand Down
84 changes: 40 additions & 44 deletions jobs/knative-job-worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,56 +183,52 @@ export default class Worker {
}
}
}
listen() {
async listen(): Promise<void> {
if (this.stopped) return;
const listenForChanges = (
err: Error | null,
client: PoolClient,
release: () => void
) => {
if (err) {
log.error('Error connecting with notify listener', err);
if (err instanceof Error && err.stack) {
log.debug(err.stack);
}
// Try again in 5 seconds
// should this really be done in the node process?
if (!this.stopped) {
setTimeout(this.listen, 5000);
}
return;
let client: PoolClient;
let release: () => void;
try {
client = await this.pgPool.connect();
release = () => client.release();
} catch (err) {
log.error('Error connecting with notify listener', err);
if (err instanceof Error && err.stack) {
log.debug(err.stack);
}
if (!this.stopped) {
setTimeout(() => this.listen(), 5000);
}
return;
}
if (this.stopped) {
release();
return;
}
this.listenClient = client;
this.listenRelease = release;
client.on('notification', () => {
if (this.doNextTimer) {
// Must be idle, do something!
this.doNext(client);
}
});
client.query('LISTEN "jobs:insert"');
client.on('error', (e: unknown) => {
if (this.stopped) {
release();
return;
}
this.listenClient = client;
this.listenRelease = release;
client.on('notification', () => {
if (this.doNextTimer) {
// Must be idle, do something!
this.doNext(client);
}
});
client.query('LISTEN "jobs:insert"');
client.on('error', (e: unknown) => {
if (this.stopped) {
release();
return;
}
log.error('Error with database notify listener', e);
if (e instanceof Error && e.stack) {
log.debug(e.stack);
}
release();
if (!this.stopped) {
this.listen();
}
});
log.info(`${this.workerId} connected and looking for jobs...`);
this.doNext(client);
};
this.pgPool.connect(listenForChanges);
log.error('Error with database notify listener', e);
if (e instanceof Error && e.stack) {
log.debug(e.stack);
}
release();
if (!this.stopped) {
this.listen();
}
});
log.info(`${this.workerId} connected and looking for jobs...`);
this.doNext(client);
}

async stop(): Promise<void> {
Expand Down
56 changes: 39 additions & 17 deletions jobs/knative-job-worker/src/req.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import requestLib from 'request';
import http from 'node:http';
import https from 'node:https';
import { URL } from 'node:url';
import {
getCallbackBaseUrl,
getJobGatewayConfig,
Expand Down Expand Up @@ -47,35 +49,55 @@ const request = (
databaseId
});
return new Promise<boolean>((resolve, reject) => {
requestLib.post(
let parsed: URL;
try {
parsed = new URL(url);
} catch (e) {
return reject(e);
}

const isHttps = parsed.protocol === 'https:';
const client = isHttps ? https : http;
const payload = JSON.stringify(body);

const req = client.request(
{
hostname: parsed.hostname,
port: parsed.port || (isHttps ? 443 : 80),
path: parsed.pathname + parsed.search,
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Content-Length': Buffer.byteLength(payload),

// these are used by job-worker/job-fn
'X-Worker-Id': workerId,
'X-Job-Id': jobId,
'X-Job-Id': String(jobId),
'X-Database-Id': databaseId,

// async HTTP completion callback
'X-Callback-Url': completeUrl
},
url,
json: true,
body
},
function (error: unknown) {
if (error) {
log.error(`request error for job[${jobId}] fn[${fn}]`, error);
if (error instanceof Error && error.stack) {
log.debug(error.stack);
}
return reject(error);
}
log.debug(`request success for job[${jobId}] fn[${fn}]`);
return resolve(true);
},
(res) => {
res.on('data', () => {});
res.on('end', () => {
log.debug(`request success for job[${jobId}] fn[${fn}]`);
resolve(true);
});
}
);

req.on('error', (error) => {
log.error(`request error for job[${jobId}] fn[${fn}]`, error);
if (error.stack) {
log.debug(error.stack);
}
reject(error);
});

req.write(payload);
req.end();
});
};

Expand Down
Loading
Loading