worker: Queues for webhooks, minor refactoring/cleanup

This commit is contained in:
Gabriel Simmer 2024-08-19 11:39:22 +01:00
parent 47c11893ea
commit d0bd30d401
Signed by: arch
SSH key fingerprint: SHA256:m3OEcdtrnBpMX+2BDGh/byv3hrCekCLzDYMdvGEKPPQ
2 changed files with 92 additions and 18 deletions

View file

@ -1,8 +1,13 @@
import index from "./index.html"; import index from "./index.html";
import { IttyRouter, json, error, withParams } from 'itty-router' import { IttyRouter, json, error, withParams } from 'itty-router'
const router = IttyRouter(); const ROUTER = IttyRouter();
const cache = caches.default; const CACHE = caches.default;
const SNAPSHOT_PREFIX = "snapshot:";
const SNAPSHOT_KEYS = { prefix: SNAPSHOT_PREFIX };
const WEBHOOK_PREFIX = "webhook:";
const WEBHOOK_KEYS = { prefix: WEBHOOK_PREFIX };
async function withCache(request, env) { async function withCache(request, env) {
const url = new URL(request.url); const url = new URL(request.url);
@ -11,7 +16,7 @@ async function withCache(request, env) {
// Construct the cache key from the cache URL // Construct the cache key from the cache URL
const cacheKey = new Request(cacheUrl.toString(), request); const cacheKey = new Request(cacheUrl.toString(), request);
let response = await cache.match(cacheKey); let response = await CACHE.match(cacheKey);
if (response) { if (response) {
return response; return response;
@ -24,7 +29,7 @@ async function snapshot(request, env, ctx, date = "LATEST") {
const kv = env.KV_STORE; const kv = env.KV_STORE;
const bucket = env.R2_BUCKET; const bucket = env.R2_BUCKET;
const { value, metadata } = await kv.getWithMetadata(date); const { value, metadata } = await kv.getWithMetadata(`${SNAPSHOT_PREFIX}${date}`);
if (value == null) { if (value == null) {
return {error: `${date} key not found`}; return {error: `${date} key not found`};
} }
@ -66,7 +71,7 @@ async function renderIndex(request, env, ctx) {
new Response(index, { headers: { "Content-Type": "text/html", new Response(index, { headers: { "Content-Type": "text/html",
"Cache-Control": "s-maxage=3600" } }), "Cache-Control": "s-maxage=3600" } }),
); );
ctx.waitUntil(cache.put(request.cacheKey, response.clone())); ctx.waitUntil(CACHE.put(request.cacheKey, response.clone()));
return response return response
} }
@ -76,25 +81,31 @@ async function latest(request, env) {
headers: { "Content-Type": "application/json", headers: { "Content-Type": "application/json",
"Cache-Control": "s-maxage=3600"} "Cache-Control": "s-maxage=3600"}
}); });
ctx.waitUntil(cache.put(request.cacheKey, response.clone())); ctx.waitUntil(CACHE.put(request.cacheKey, response.clone()));
return response; return response;
} }
async function snapshotList(request, env, ctx) { async function snapshotList(request, env, ctx) {
const kv = env.KV_STORE; const kv = env.KV_STORE;
let list = await kv.list(); let list = await kv.list(SNAPSHOT_KEYS);
let filtered = await Promise.all( let filtered = await Promise.all(
list.keys.filter((value) => value.name != "LATEST") list.keys.filter((value) => value.name != `${SNAPSHOT_PREFIX}LATEST`)
.map(async (value) => { const v = await kv.get(value.name); return { date: value.name, hash: v } } )); .map(async (value) => {
const v = await kv.get(value.name);
return {
date: value.name.replace(`${SNAPSHOT_PREFIX}`, ""), hash: v
} } ));
console.log(filtered)
const response = new Response(JSON.stringify({ const response = new Response(JSON.stringify({
latest: await kv.get("LATEST"), latest: await kv.get(`${SNAPSHOT_PREFIX}LATEST`),
list: filtered, list: filtered,
}), { }), {
headers: { "Content-Type": "application/json", headers: { "Content-Type": "application/json",
"Cache-Control": "s-maxage=3600"} "Cache-Control": "s-maxage=3600"}
}); });
ctx.waitUntil(cache.put(request.cacheKey, response.clone())); ctx.waitUntil(CACHE.put(request.cacheKey, response.clone()));
return response; return response;
} }
@ -116,16 +127,28 @@ async function newSnapshot(request, env, ctx) {
} = Object.fromEntries(body) } = Object.fromEntries(body)
// Don't bother uploading to R2 if the hashes match. // Don't bother uploading to R2 if the hashes match.
let latest = kv.get("LATEST"); let latest = kv.get(`${SNAPSHOT_PREFIX}LATEST`);
if (latest != hash) { if (latest != hash) {
await bucket.put(`${hash}.png`, file); await bucket.put(`${hash}.png`, file);
let webhooks = kv.list(WEBHOOK_KEYS);
await Promise.all(webhooks.keys.map(async key => {
let full = await kv.get(key.name);
let webhook = JSON.parse(full.value);
await env.WEBHOOKS_QUEUE.send({
type: webhook.type,
url: webhook.url,
hash: hash,
date: date,
file: `https://daily-servo-r2.gmem.ca/${hash}.png`
});
}));
} }
await kv.put("LATEST", hash, { await kv.put("${SNAPSHOT_PREFIX}LATEST", hash, {
metadata: { date: date } metadata: { date: date }
}); });
// Keep for 1 year. // Keep for 1 year.
await kv.put(`${date}`, hash, { expirationTtl: 31_536_000 }); await kv.put(`${SNAPSHOT_PREFIX}${date}`, hash, { expirationTtl: 31_536_000 });
return new Response("Uploaded", { status: 201}); return new Response("Uploaded", { status: 201});
} }
@ -139,18 +162,59 @@ async function specificSnapshot(request, env, ctx) {
return response return response
} }
router async function migrateKeys(request, env, ctx) {
const kv = env.KV_STORE;
let list = await kv.list();
await Promise.all(list.keys.map(async key => {
let full = await kv.getWithMetadata(key.name);
await kv.put(`${SNAPSHOT_PREFIX}${key.name}`, full.value, { expiration: key.expiration, metadata: key.metadata });
await kv.delete(key.name);
}));
return new Response(JSON.stringify({}), { status: 200, headers: {"Content-Type": "application/json"} });
}
ROUTER
.get("/", withCache, renderIndex) .get("/", withCache, renderIndex)
.get("/latest", withCache, latest) .get("/latest", withCache, latest)
.get("/latest.json", withCache, snapshot) .get("/latest.json", withCache, snapshot)
.get("/list.json", withCache, snapshotList) .get("/list.json", withCache, snapshotList)
.post("/new", withAuth, newSnapshot) .post("/new", withAuth, newSnapshot)
// .get("/migrate", withAuth, migrateKeys)
.get("*", withCache, specificSnapshot) .get("*", withCache, specificSnapshot)
export default { export default {
fetch: (request, ...args) => fetch: (request, ...args) =>
router ROUTER
.fetch(request, ...args) .fetch(request, ...args)
.then(json) .then(json)
.catch(error) .catch(error),
async queue(batch, env) {
for (const msg of batch.messages) {
let content = msg.body;
let payload = "Daily Servo image hash changed!";
let content_type = "text/plain";
if (content.type == "discord") {
payload = {
content: `[Daily Servo](<https://servo.gmem.ca>) update ${content.date} (${content.hash})`,
embeds: [ {title: `${content.date} snapshot`, image: { url: content.file }, type: "image" } ],
};
content_type = "application/json";
}
let response = await fetch(`${content.url}`, {
method: "POST",
body: JSON.stringify(payload),
headers: {
"X-Source": "Cloudflare-Workers",
"Content-Type": content_type,
},
});
if (response.ok) {
msg.ack();
} else {
msg.retry({delaySeconds: 600});
}
}
}
} }

View file

@ -10,4 +10,14 @@ id = "eddd851b2439407a877c0c2c6e503da2"
[[r2_buckets]] [[r2_buckets]]
binding = "R2_BUCKET" binding = "R2_BUCKET"
bucket_name = "daily-servo" bucket_name = "daily-servo"
[[queues.producers]]
binding = "WEBHOOKS_QUEUE"
queue = "daily-servo-webhooks"
[[queues.consumers]]
queue = "daily-servo-webhooks"
max_batch_size = 10
max_batch_timeout = 5
max_retries = 2