diff --git a/src/activitypub/Server.ts b/src/activitypub/Server.ts
index 84f842e0..97deb137 100644
--- a/src/activitypub/Server.ts
+++ b/src/activitypub/Server.ts
@@ -9,6 +9,7 @@ import bodyParser from "body-parser";
import { Request, Response, Router } from "express";
import { Server, ServerOptions } from "lambert-server";
import path from "path";
+import { setupListener } from "./listener";
import hostMeta from "./well-known/host-meta";
import webfinger from "./well-known/webfinger";
@@ -24,6 +25,7 @@ export class APServer extends Server {
async start() {
await initDatabase();
await Config.init();
+ setupListener();
this.app.set("json replacer", JSONReplacer);
diff --git a/src/activitypub/listener/index.ts b/src/activitypub/listener/index.ts
new file mode 100644
index 00000000..26c3e3e5
--- /dev/null
+++ b/src/activitypub/listener/index.ts
@@ -0,0 +1,136 @@
+import {
+ APError,
+ APObjectIsPerson,
+ fetchOpts,
+ resolveWebfinger,
+} from "@spacebar/ap";
+import {
+ Channel,
+ Config,
+ EVENTEnum,
+ Event,
+ Message,
+ MessageCreateEvent,
+ OrmUtils,
+ RabbitMQ,
+ User,
+ events,
+} from "@spacebar/util";
+import crypto from "crypto";
+import fetch from "node-fetch";
+
+const sendSignedMessage = async (
+ inbox: string,
+ sender: `${"user" | "channel"}/${string}`,
+ message: object,
+ privateKey: string,
+) => {
+ const digest = crypto
+ .createHash("sha256")
+ .update(JSON.stringify(message))
+ .digest("base64");
+ const signer = crypto.createSign("sha256");
+ const now = new Date();
+
+ const url = new URL(inbox);
+ const inboxFrag = url.pathname;
+ const toSign =
+ `(request-target): post ${inboxFrag}\n` +
+ `host: ${url.hostname}\n` +
+ `date: ${now.toUTCString()}\n` +
+ `digest: SHA-256=${digest}`;
+
+ signer.update(toSign);
+ signer.end();
+
+ const signature = signer.sign(privateKey);
+ const sig_b64 = signature.toString("base64");
+
+ const { webDomain } = Config.get().federation;
+ const header =
+ `keyId="https://${webDomain}/fed/${sender}",` +
+ `headers="(request-target) host date digest",` +
+ `signature=${sig_b64}`;
+
+ return await fetch(
+ inbox,
+ OrmUtils.mergeDeep(fetchOpts, {
+ method: "POST",
+ body: message,
+ headers: {
+ Host: url.hostname,
+ Date: now.toUTCString(),
+ Digest: `SHA-256=${digest}`,
+ Signature: header,
+ },
+ }),
+ );
+};
+
+const onMessage = async (event: MessageCreateEvent) => {
+ const channel_id = event.channel_id;
+ const channel = await Channel.findOneOrFail({
+ where: { id: channel_id },
+ relations: {
+ recipients: {
+ user: true,
+ },
+ },
+ });
+ if (channel.isDm()) {
+ const message = await Message.findOneOrFail({
+ where: { id: event.data.id },
+ });
+ const apMessage = message.toCreateAP();
+
+ for (const recipient of channel.recipients || []) {
+ if (recipient.user.federatedId) {
+ const user = await resolveWebfinger(recipient.user.federatedId);
+ if (!APObjectIsPerson(user))
+ throw new APError("Cannot deliver message");
+
+ if (!user.id) throw new APError("Receiver ID is null?");
+
+ apMessage.to = [user.id];
+
+ const sender = await User.findOneOrFail({
+ where: { id: event.data.author_id },
+ select: ["privateKey"],
+ });
+
+ if (typeof user.inbox != "string")
+ throw new APError("inbox must be URL");
+
+ console.log(
+ await sendSignedMessage(
+ user.inbox,
+ `user/${event.data.author_id}`,
+ message,
+ sender.privateKey,
+ ).then((x) => x.text()),
+ );
+ }
+ }
+ }
+};
+
+type ListenerFunc = (event: Event) => Promise<void>;
+
+const listeners = {
+ MESSAGE_CREATE: onMessage,
+} as Record<EVENTEnum, ListenerFunc>;
+
+export const setupListener = () => {
+ if (RabbitMQ.connection)
+ throw new APError("Activitypub module has not implemented RabbitMQ");
+
+ // for (const event in listeners) {
+ // // process.setMaxListeners(process.getMaxListeners() + 1);
+ // // process.addListener("message", (msg) =>
+ // // listener(msg as ProcessEvent, event, listeners[event as EVENTEnum]),
+ // // );
+
+ events.setMaxListeners(events.getMaxListeners() + 1);
+ events.onAny((event, msg) => listeners[msg.event as EVENTEnum]?.(msg));
+ // }
+};
diff --git a/src/activitypub/routes/channel/#channel_id/inbox.ts b/src/activitypub/routes/channel/#channel_id/inbox.ts
index 2dd36143..89a990dc 100644
--- a/src/activitypub/routes/channel/#channel_id/inbox.ts
+++ b/src/activitypub/routes/channel/#channel_id/inbox.ts
@@ -14,7 +14,11 @@ router.post("/", route({}), async (req, res) => {
const message = await messageFromAP(body.object);
- if ((await Message.count({ where: { id: message.id } })) != 0)
+ if (
+ (await Message.count({
+ where: { federatedId: message.federatedId },
+ })) != 0
+ )
return res.status(200);
await message.save();
diff --git a/src/activitypub/routes/user/#user_id/inbox.ts b/src/activitypub/routes/user/#user_id/inbox.ts
new file mode 100644
index 00000000..89a990dc
--- /dev/null
+++ b/src/activitypub/routes/user/#user_id/inbox.ts
@@ -0,0 +1,33 @@
+import { messageFromAP } from "@spacebar/ap";
+import { route } from "@spacebar/api";
+import { Message, emitEvent } from "@spacebar/util";
+import { Router } from "express";
+import { HTTPError } from "lambert-server";
+
+const router = Router();
+export default router;
+
+router.post("/", route({}), async (req, res) => {
+ const body = req.body;
+
+ if (body.type != "Create") throw new HTTPError("not implemented");
+
+ const message = await messageFromAP(body.object);
+
+ if (
+ (await Message.count({
+ where: { federatedId: message.federatedId },
+ })) != 0
+ )
+ return res.status(200);
+
+ await message.save();
+
+ await emitEvent({
+ event: "MESSAGE_CREATE",
+ channel_id: message.channel_id,
+ data: message.toJSON(),
+ });
+
+ return res.status(200);
+});
diff --git a/src/activitypub/routes/user/index.ts b/src/activitypub/routes/user/#user_id/index.ts
index 67664dea..d92663ff 100644
--- a/src/activitypub/routes/user/index.ts
+++ b/src/activitypub/routes/user/#user_id/index.ts
@@ -5,8 +5,8 @@ import { Request, Response, Router } from "express";
const router = Router();
export default router;
-router.get("/:id", route({}), async (req: Request, res: Response) => {
- const id = req.params.id;
+router.get("/", route({}), async (req: Request, res: Response) => {
+ const id = req.params.user_id;
const user = await User.findOneOrFail({ where: { id } });
diff --git a/src/activitypub/routes/user/inbox.ts b/src/activitypub/routes/user/inbox.ts
deleted file mode 100644
index 2065d7f2..00000000
--- a/src/activitypub/routes/user/inbox.ts
+++ /dev/null
@@ -1,12 +0,0 @@
-import { route } from "@spacebar/api";
-import { Router } from "express";
-import { HTTPError } from "lambert-server";
-
-const router = Router();
-export default router;
-
-router.post("/", route({}), async (req, res) => {
- const body = req.body;
-
- if (body.type != "Create") throw new HTTPError("not implemented");
-});
diff --git a/src/activitypub/util/fetch.ts b/src/activitypub/util/fetch.ts
new file mode 100644
index 00000000..07acdc45
--- /dev/null
+++ b/src/activitypub/util/fetch.ts
@@ -0,0 +1,8 @@
+import { DEFAULT_FETCH_OPTIONS } from "@spacebar/api";
+import { OrmUtils } from "@spacebar/util";
+
+export const fetchOpts = OrmUtils.mergeDeep(DEFAULT_FETCH_OPTIONS, {
+ headers: {
+ Accept: "application/activity+json",
+ },
+});
diff --git a/src/activitypub/util/index.ts b/src/activitypub/util/index.ts
index 767e75d9..fe142a64 100644
--- a/src/activitypub/util/index.ts
+++ b/src/activitypub/util/index.ts
@@ -1,3 +1,4 @@
export * from "./APError";
export * from "./OrderedCollection";
+export * from "./fetch";
export * from "./transforms/index";
diff --git a/src/activitypub/util/transforms/Message.ts b/src/activitypub/util/transforms/Message.ts
deleted file mode 100644
index 3669121c..00000000
--- a/src/activitypub/util/transforms/Message.ts
+++ /dev/null
@@ -1,141 +0,0 @@
-import { APError } from "@spacebar/ap";
-import { DEFAULT_FETCH_OPTIONS } from "@spacebar/api";
-import {
- Channel,
- Config,
- Member,
- Message,
- OrmUtils,
- Snowflake,
- User,
- UserSettings,
-} from "@spacebar/util";
-import { APNote, APPerson, AnyAPObject } from "activitypub-types";
-import fetch from "node-fetch";
-import { ProxyAgent } from "proxy-agent";
-import TurndownService from "turndown";
-
-const fetchOpts = OrmUtils.mergeDeep(DEFAULT_FETCH_OPTIONS, {
- headers: {
- Accept: "application/activity+json",
- },
-});
-
-const hasAPContext = (data: object) => {
- if (!("@context" in data)) return false;
- const context = data["@context"];
- const activitystreams = "https://www.w3.org/ns/activitystreams";
- if (Array.isArray(context))
- return context.find((x) => x == activitystreams);
- return context == activitystreams;
-};
-
-export const resolveAPObject = async <T>(data: string | T): Promise<T> => {
- // we were already given an AP object
- if (typeof data != "string") return data;
-
- const agent = new ProxyAgent();
- const ret = await fetch(data, {
- ...fetchOpts,
- agent,
- });
-
- const json = await ret.json();
-
- if (!hasAPContext(json)) throw new APError("Object is not APObject");
-
- return json;
-};
-
-export const messageFromAP = async (data: APNote): Promise<Message> => {
- if (!data.id) throw new APError("Message must have ID");
- if (data.type != "Note") throw new APError("Message must be Note");
-
- const to = Array.isArray(data.to)
- ? data.to.filter((x) =>
- typeof x == "string" ? x.includes("channel") : false,
- )[0]
- : data.to;
- if (!to || typeof to != "string")
- throw new APError("Message not deliverable");
-
- // TODO: use a regex
- const channel_id = to.split("/").reverse()[0];
- const channel = await Channel.findOneOrFail({
- where: { id: channel_id },
- relations: { guild: true },
- });
-
- if (!data.attributedTo)
- throw new APError("Message must have author (attributedTo)");
- const attrib = await resolveAPObject(
- Array.isArray(data.attributedTo)
- ? data.attributedTo[0] // hmm
- : data.attributedTo,
- );
-
- if (!APObjectIsPerson(attrib))
- throw new APError("Message attributedTo must be Person");
-
- const user = await userFromAP(attrib);
- const member = channel.guild
- ? await Member.findOneOrFail({
- where: { id: user.id, guild_id: channel.guild.id },
- })
- : undefined;
-
- return Message.create({
- id: data.id,
- content: new TurndownService().turndown(data.content),
- timestamp: data.published,
- author: user,
- guild: channel.guild,
- member,
- channel,
-
- type: 0,
- sticker_items: [],
- attachments: [],
- embeds: [],
- reactions: [],
- mentions: [],
- mention_roles: [],
- mention_channels: [],
- });
-};
-
-export const APObjectIsPerson = (object: AnyAPObject): object is APPerson => {
- return object.type == "Person";
-};
-
-export const userFromAP = async (data: APPerson): Promise<User> => {
- if (!data.id) throw new APError("User must have ID");
-
- const url = new URL(data.id);
- const email = `${url.pathname.split("/").reverse()[0]}@${url.hostname}`;
-
- return User.create({
- id: Snowflake.generate(),
- username: data.preferredUsername,
- discriminator: url.hostname,
- bio: new TurndownService().turndown(data.summary),
- email,
- data: {
- hash: "#",
- valid_tokens_since: new Date(),
- },
- extended_settings: "{}",
- settings: UserSettings.create(),
- publicKey: "",
- privateKey: "",
- premium: false,
-
- premium_since: Config.get().defaults.user.premium
- ? new Date()
- : undefined,
- rights: Config.get().register.defaultRights,
- premium_type: Config.get().defaults.user.premiumType ?? 0,
- verified: Config.get().defaults.user.verified ?? true,
- created_at: new Date(),
- });
-};
diff --git a/src/activitypub/util/transforms/index.ts b/src/activitypub/util/transforms/index.ts
index 6596816a..e8107ca0 100644
--- a/src/activitypub/util/transforms/index.ts
+++ b/src/activitypub/util/transforms/index.ts
@@ -1 +1,196 @@
-export * from "./Message";
+import { APError, fetchOpts } from "@spacebar/ap";
+import {
+ Channel,
+ Config,
+ DmChannelDTO,
+ Member,
+ Message,
+ Snowflake,
+ User,
+ UserSettings,
+ WebfingerResponse,
+} from "@spacebar/util";
+import { APNote, APPerson, AnyAPObject } from "activitypub-types";
+import fetch from "node-fetch";
+import { ProxyAgent } from "proxy-agent";
+import TurndownService from "turndown";
+
+const hasAPContext = (data: object) => {
+ if (!("@context" in data)) return false;
+ const context = data["@context"];
+ const activitystreams = "https://www.w3.org/ns/activitystreams";
+ if (Array.isArray(context))
+ return context.find((x) => x == activitystreams);
+ return context == activitystreams;
+};
+
+export const resolveAPObject = async <T>(data: string | T): Promise<T> => {
+ // we were already given an AP object
+ if (typeof data != "string") return data;
+
+ const agent = new ProxyAgent();
+ const ret = await fetch(data, {
+ ...fetchOpts,
+ agent,
+ });
+
+ const json = await ret.json();
+
+ if (!hasAPContext(json)) throw new APError("Object is not APObject");
+
+ return json;
+};
+
+export const resolveWebfinger = async (
+ lookup: string,
+): Promise<AnyAPObject> => {
+ let domain: string, user: string;
+ if (lookup.includes("@")) {
+ // lookup a @handle
+
+ if (lookup[0] == "@") lookup = lookup.slice(1);
+ [domain, user] = lookup.split("@");
+ } else {
+ // lookup was a URL ( hopefully )
+ const url = new URL(lookup);
+ domain = url.hostname;
+ user = url.pathname.split("/").reverse()[0];
+ }
+
+ const agent = new ProxyAgent();
+ const wellknown = (await fetch(
+ `https://${domain}/.well-known/webfinger?resource=${lookup}`,
+ {
+ agent,
+ ...fetchOpts,
+ },
+ ).then((x) => x.json())) as WebfingerResponse;
+
+ const link = wellknown.links.find((x) => x.rel == "self");
+ if (!link) throw new APError(".well-known did not contain rel=self link");
+
+ return await resolveAPObject<AnyAPObject>(link.href);
+};
+
+export const messageFromAP = async (data: APNote): Promise<Message> => {
+ if (!data.id) throw new APError("Message must have ID");
+ if (data.type != "Note") throw new APError("Message must be Note");
+
+ if (!data.attributedTo)
+ throw new APError("Message must have author (attributedTo)");
+ const attrib = await resolveAPObject(
+ Array.isArray(data.attributedTo)
+ ? data.attributedTo[0] // hmm
+ : data.attributedTo,
+ );
+
+ if (!APObjectIsPerson(attrib))
+ throw new APError("Message attributedTo must be Person");
+
+ const user = await userFromAP(attrib);
+
+ const to = Array.isArray(data.to)
+ ? data.to.filter((x) =>
+ typeof x == "string"
+ ? x.includes("channel") || x.includes("user")
+ : false,
+ )[0]
+ : data.to;
+ if (!to || typeof to != "string")
+ throw new APError("Message not deliverable");
+
+ // TODO: use a regex
+
+ let channel: Channel | DmChannelDTO;
+ const to_id = to.split("/").reverse()[0];
+ if (to.includes("user")) {
+ // this is a DM channel
+ const toUser = await User.findOneOrFail({ where: { id: to_id } });
+
+ // Channel.createDMCHannel does a .save() so the author must be present
+ await user.save();
+
+ // const cache = await Channel.findOne({ where: { recipients: []}})
+
+ channel = await Channel.createDMChannel(
+ [toUser.id, user.id],
+ toUser.id,
+ );
+ } else {
+ channel = await Channel.findOneOrFail({
+ where: { id: to_id },
+ relations: { guild: true },
+ });
+ }
+
+ const member =
+ channel instanceof Channel
+ ? await Member.findOneOrFail({
+ where: { id: user.id, guild_id: channel.guild.id },
+ })
+ : undefined;
+
+ return Message.create({
+ id: Snowflake.generate(),
+ federatedId: data.id,
+ content: new TurndownService().turndown(data.content),
+ timestamp: data.published,
+ author: user,
+ guild: channel instanceof Channel ? channel.guild : undefined,
+ member,
+ channel_id: channel.id,
+
+ type: 0,
+ sticker_items: [],
+ attachments: [],
+ embeds: [],
+ reactions: [],
+ mentions: [],
+ mention_roles: [],
+ mention_channels: [],
+ });
+};
+
+export const APObjectIsPerson = (object: AnyAPObject): object is APPerson => {
+ return object.type == "Person";
+};
+
+export const userFromAP = async (data: APPerson): Promise<User> => {
+ if (!data.id) throw new APError("User must have ID");
+
+ const url = new URL(data.id);
+ const email = `${url.pathname.split("/").reverse()[0]}@${url.hostname}`;
+
+ // don't like this
+ // the caching should probably be done elsewhere
+ // this function should only be for converting AP to SB (ideally)
+ const cache = await User.findOne({
+ where: { federatedId: url.toString() },
+ });
+ if (cache) return cache;
+
+ return User.create({
+ federatedId: url.toString(),
+ username: data.preferredUsername,
+ discriminator: url.hostname,
+ bio: new TurndownService().turndown(data.summary),
+ email,
+ data: {
+ hash: "#",
+ valid_tokens_since: new Date(),
+ },
+ extended_settings: "{}",
+ settings: UserSettings.create(),
+ publicKey: "",
+ privateKey: "",
+ premium: false,
+
+ premium_since: Config.get().defaults.user.premium
+ ? new Date()
+ : undefined,
+ rights: Config.get().register.defaultRights,
+ premium_type: Config.get().defaults.user.premiumType ?? 0,
+ verified: Config.get().defaults.user.verified ?? true,
+ created_at: new Date(),
+ });
+};
diff --git a/src/bundle/index.ts b/src/bundle/index.ts
index c6af4f00..8b1c9429 100644
--- a/src/bundle/index.ts
+++ b/src/bundle/index.ts
@@ -16,7 +16,8 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
+export * from "@spacebar/ap";
export * from "@spacebar/api";
-export * from "@spacebar/util";
-export * from "@spacebar/gateway";
export * from "@spacebar/cdn";
+export * from "@spacebar/gateway";
+export * from "@spacebar/util";
diff --git a/src/util/entities/Channel.ts b/src/util/entities/Channel.ts
index a5191dc3..8b692ac7 100644
--- a/src/util/entities/Channel.ts
+++ b/src/util/entities/Channel.ts
@@ -387,6 +387,18 @@ export class Channel extends BaseClass {
if (channel == null) {
name = trimSpecial(name);
+ const { publicKey, privateKey } = await generateKeyPair("rsa", {
+ modulusLength: 4096,
+ publicKeyEncoding: {
+ type: "spki",
+ format: "pem",
+ },
+ privateKeyEncoding: {
+ type: "pkcs8",
+ format: "pem",
+ },
+ });
+
channel = await Channel.create({
name,
type,
@@ -403,6 +415,8 @@ export class Channel extends BaseClass {
}),
),
nsfw: false,
+ publicKey,
+ privateKey,
}).save();
}
diff --git a/src/util/entities/Message.ts b/src/util/entities/Message.ts
index bbbb2ac1..3bf3b9d0 100644
--- a/src/util/entities/Message.ts
+++ b/src/util/entities/Message.ts
@@ -16,7 +16,7 @@
along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
-import type { APAnnounce, APNote } from "activitypub-types";
+import type { APAnnounce, APCreate, APNote } from "activitypub-types";
import {
Column,
CreateDateColumn,
@@ -220,6 +220,9 @@ export class Message extends BaseClass {
@Column({ type: "simple-json", nullable: true })
components?: MessageComponent[];
+ @Column({ nullable: true })
+ federatedId: string;
+
toJSON(): Message {
return {
...this,
@@ -227,6 +230,7 @@ export class Message extends BaseClass {
member_id: undefined,
webhook_id: undefined,
application_id: undefined,
+ federatedId: undefined,
nonce: this.nonce ?? undefined,
tts: this.tts ?? false,
@@ -256,6 +260,19 @@ export class Message extends BaseClass {
};
}
+ toCreateAP(): APCreate {
+ const { webDomain } = Config.get().federation;
+
+ return {
+ "@context": "https://www.w3.org/ns/activitystreams",
+ type: "Create",
+ id: `https://${webDomain}/fed/channel/${this.channel_id}/messages/${this.id}`,
+ to: [],
+ actor: `https://${webDomain}/fed/user/${this.author_id}`,
+ object: this.toAP(),
+ };
+ }
+
// TODO: move to AP module
toAP(): APNote {
const { webDomain } = Config.get().federation;
diff --git a/src/util/entities/User.ts b/src/util/entities/User.ts
index f9213693..1594093f 100644
--- a/src/util/entities/User.ts
+++ b/src/util/entities/User.ts
@@ -252,6 +252,9 @@ export class User extends BaseClass {
@Column({ select: false })
privateKey: string;
+ @Column({ nullable: true })
+ federatedId: string;
+
// TODO: I don't like this method?
validate() {
if (this.discriminator) {
diff --git a/src/util/schemas/MessageAcknowledgeSchema.ts b/src/util/schemas/MessageAcknowledgeSchema.ts
index 28cd9c79..726dc21b 100644
--- a/src/util/schemas/MessageAcknowledgeSchema.ts
+++ b/src/util/schemas/MessageAcknowledgeSchema.ts
@@ -19,4 +19,7 @@
export interface MessageAcknowledgeSchema {
manual?: boolean;
mention_count?: number;
+ flags?: number;
+ last_viewed?: number;
+ token?: unknown; // was null
}
diff --git a/src/util/util/Event.ts b/src/util/util/Event.ts
index 01f4911a..76a529ed 100644
--- a/src/util/util/Event.ts
+++ b/src/util/util/Event.ts
@@ -17,9 +17,9 @@
*/
import { Channel } from "amqplib";
-import { RabbitMQ } from "./RabbitMQ";
-import EventEmitter from "events";
+import EventEmitter from "eventemitter2";
import { EVENT, Event } from "../interfaces";
+import { RabbitMQ } from "./RabbitMQ";
export const events = new EventEmitter();
export async function emitEvent(payload: Omit<Event, "created_at">) {
|