diff --git a/worker/src/index.js b/worker/src/index.js index c2d209c..2e19d42 100644 --- a/worker/src/index.js +++ b/worker/src/index.js @@ -1,8 +1,13 @@ import index from "./index.html"; import { IttyRouter, json, error, withParams } from 'itty-router' -const router = IttyRouter(); -const cache = caches.default; +const ROUTER = IttyRouter(); +const CACHE = caches.default; + +const SNAPSHOT_PREFIX = "snapshot:"; +const SNAPSHOT_KEYS = { prefix: SNAPSHOT_PREFIX }; +const WEBHOOK_PREFIX = "webhook:"; +const WEBHOOK_KEYS = { prefix: WEBHOOK_PREFIX }; async function withCache(request, env) { const url = new URL(request.url); @@ -11,7 +16,7 @@ async function withCache(request, env) { // Construct the cache key from the cache URL const cacheKey = new Request(cacheUrl.toString(), request); - let response = await cache.match(cacheKey); + let response = await CACHE.match(cacheKey); if (response) { return response; @@ -24,7 +29,7 @@ async function snapshot(request, env, ctx, date = "LATEST") { const kv = env.KV_STORE; const bucket = env.R2_BUCKET; - const { value, metadata } = await kv.getWithMetadata(date); + const { value, metadata } = await kv.getWithMetadata(`${SNAPSHOT_PREFIX}${date}`); if (value == null) { return {error: `${date} key not found`}; } @@ -66,7 +71,7 @@ async function renderIndex(request, env, ctx) { new Response(index, { headers: { "Content-Type": "text/html", "Cache-Control": "s-maxage=3600" } }), ); - ctx.waitUntil(cache.put(request.cacheKey, response.clone())); + ctx.waitUntil(CACHE.put(request.cacheKey, response.clone())); return response } @@ -76,25 +81,31 @@ async function latest(request, env) { headers: { "Content-Type": "application/json", "Cache-Control": "s-maxage=3600"} }); - ctx.waitUntil(cache.put(request.cacheKey, response.clone())); + ctx.waitUntil(CACHE.put(request.cacheKey, response.clone())); return response; } async function snapshotList(request, env, ctx) { const kv = env.KV_STORE; - let list = await kv.list(); + let list = await kv.list(SNAPSHOT_KEYS); + let filtered = await Promise.all( - list.keys.filter((value) => value.name != "LATEST") - .map(async (value) => { const v = await kv.get(value.name); return { date: value.name, hash: v } } )); + list.keys.filter((value) => value.name != `${SNAPSHOT_PREFIX}LATEST`) + .map(async (value) => { + const v = await kv.get(value.name); + return { + date: value.name.replace(`${SNAPSHOT_PREFIX}`, ""), hash: v + } } )); + console.log(filtered) const response = new Response(JSON.stringify({ - latest: await kv.get("LATEST"), + latest: await kv.get(`${SNAPSHOT_PREFIX}LATEST`), list: filtered, }), { headers: { "Content-Type": "application/json", "Cache-Control": "s-maxage=3600"} }); - ctx.waitUntil(cache.put(request.cacheKey, response.clone())); + ctx.waitUntil(CACHE.put(request.cacheKey, response.clone())); return response; } @@ -116,16 +127,28 @@ async function newSnapshot(request, env, ctx) { } = Object.fromEntries(body) // Don't bother uploading to R2 if the hashes match. - let latest = kv.get("LATEST"); + let latest = kv.get(`${SNAPSHOT_PREFIX}LATEST`); if (latest != hash) { await bucket.put(`${hash}.png`, file); + let webhooks = kv.list(WEBHOOK_KEYS); + await Promise.all(webhooks.keys.map(async key => { + let full = await kv.get(key.name); + let webhook = JSON.parse(full.value); + await env.WEBHOOKS_QUEUE.send({ + type: webhook.type, + url: webhook.url, + hash: hash, + date: date, + file: `https://daily-servo-r2.gmem.ca/${hash}.png` + }); + })); } - await kv.put("LATEST", hash, { + await kv.put("${SNAPSHOT_PREFIX}LATEST", hash, { metadata: { date: date } }); // Keep for 1 year. - await kv.put(`${date}`, hash, { expirationTtl: 31_536_000 }); + await kv.put(`${SNAPSHOT_PREFIX}${date}`, hash, { expirationTtl: 31_536_000 }); return new Response("Uploaded", { status: 201}); } @@ -139,18 +162,59 @@ async function specificSnapshot(request, env, ctx) { return response } -router +async function migrateKeys(request, env, ctx) { + const kv = env.KV_STORE; + let list = await kv.list(); + await Promise.all(list.keys.map(async key => { + let full = await kv.getWithMetadata(key.name); + await kv.put(`${SNAPSHOT_PREFIX}${key.name}`, full.value, { expiration: key.expiration, metadata: key.metadata }); + await kv.delete(key.name); + })); + + return new Response(JSON.stringify({}), { status: 200, headers: {"Content-Type": "application/json"} }); +} + +ROUTER .get("/", withCache, renderIndex) .get("/latest", withCache, latest) .get("/latest.json", withCache, snapshot) .get("/list.json", withCache, snapshotList) .post("/new", withAuth, newSnapshot) + // .get("/migrate", withAuth, migrateKeys) .get("*", withCache, specificSnapshot) export default { fetch: (request, ...args) => - router + ROUTER .fetch(request, ...args) .then(json) - .catch(error) + .catch(error), + async queue(batch, env) { + for (const msg of batch.messages) { + let content = msg.body; + let payload = "Daily Servo image hash changed!"; + let content_type = "text/plain"; + + if (content.type == "discord") { + payload = { + content: `[Daily Servo]() update ${content.date} (${content.hash})`, + embeds: [ {title: `${content.date} snapshot`, image: { url: content.file }, type: "image" } ], + }; + content_type = "application/json"; + } + let response = await fetch(`${content.url}`, { + method: "POST", + body: JSON.stringify(payload), + headers: { + "X-Source": "Cloudflare-Workers", + "Content-Type": content_type, + }, + }); + if (response.ok) { + msg.ack(); + } else { + msg.retry({delaySeconds: 600}); + } + } + } } diff --git a/worker/wrangler.toml b/worker/wrangler.toml index 7b57b30..13d9669 100644 --- a/worker/wrangler.toml +++ b/worker/wrangler.toml @@ -10,4 +10,14 @@ id = "eddd851b2439407a877c0c2c6e503da2" [[r2_buckets]] binding = "R2_BUCKET" -bucket_name = "daily-servo" \ No newline at end of file +bucket_name = "daily-servo" + +[[queues.producers]] +binding = "WEBHOOKS_QUEUE" +queue = "daily-servo-webhooks" + +[[queues.consumers]] +queue = "daily-servo-webhooks" +max_batch_size = 10 +max_batch_timeout = 5 +max_retries = 2 \ No newline at end of file