From 11880103b13146aa8e700827f36c81a26fb8e09e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 14:11:17 +0100 Subject: Make federation send queue take the current position --- synapse/federation/send_queue.py | 40 +++++++++++++++++++++++++--------------- 1 file changed, 25 insertions(+), 15 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index bbb0195228..4bde66fbf8 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -220,10 +220,15 @@ class FederationRemoteSendQueue(object): def get_current_token(self): return self.pos - 1 - def get_replication_rows(self, token, limit, federation_ack=None): - """ + def federation_ack(self, token): + self._clear_queue_before_pos(token) + + def get_replication_rows(self, from_token, to_token, limit, federation_ack=None): + """Get rows to be sent over federation between the two tokens + Args: - token (int) + from_token (int) + to_token(int) limit (int) federation_ack (int): Optional. The position where the worker is explicitly acknowledged it has handled. Allows us to drop @@ -232,8 +237,8 @@ class FederationRemoteSendQueue(object): # TODO: Handle limit. # To handle restarts where we wrap around - if token > self.pos: - token = -1 + if from_token > self.pos: + from_token = -1 rows = [] @@ -244,10 +249,11 @@ class FederationRemoteSendQueue(object): # Fetch changed presence keys = self.presence_changed.keys() - i = keys.bisect_right(token) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 dest_user_ids = set( (pos, dest_user_id) - for pos in keys[i:] + for pos in keys[i:j] for dest_user_id in self.presence_changed[pos] ) @@ -259,8 +265,9 @@ class FederationRemoteSendQueue(object): # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() - i = keys.bisect_right(token) - keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j]) for (pos, (destination, edu_key)) in keyed_edus: rows.append( @@ -272,16 +279,18 @@ class FederationRemoteSendQueue(object): # Fetch changed edus keys = self.edus.keys() - i = keys.bisect_right(token) - edus = set((k, self.edus[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + edus = set((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict()))) # Fetch changed failures keys = self.failures.keys() - i = keys.bisect_right(token) - failures = set((k, self.failures[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + failures = set((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: rows.append((pos, FAILURE_TYPE, ujson.dumps({ @@ -291,8 +300,9 @@ class FederationRemoteSendQueue(object): # Fetch changed device messages keys = self.device_messages.keys() - i = keys.bisect_right(token) - device_messages = set((k, self.device_messages[k]) for k in keys[i:]) + i = keys.bisect_right(from_token) + j = keys.bisect_right(to_token) + 1 + device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) for (pos, destination) in device_messages: rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({ -- cgit 1.5.1 From f10ce8944b8ef4c33694654c397bfcda44c6124a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Apr 2017 11:10:28 +0100 Subject: Don't double json encode federation replication data --- synapse/app/federation_sender.py | 4 +--- synapse/federation/send_queue.py | 19 +++++++++---------- 2 files changed, 10 insertions(+), 13 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index cbddc80ca9..145c01f3a3 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -51,7 +51,6 @@ from daemonize import Daemonize import sys import logging import gc -import ujson as json logger = logging.getLogger("synapse.app.appservice") @@ -290,8 +289,7 @@ class FederationSenderHandler(object): # Parse the rows in the stream for row in rows: typ = row.type - content_js = row.data - content = json.loads(content_js) + content = row.data if typ == send_queue.PRESENCE_TYPE: destination = content["destination"] diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 4bde66fbf8..78c852ed69 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -35,7 +35,6 @@ from synapse.util.metrics import Measure import synapse.metrics from blist import sorteddict -import ujson metrics = synapse.metrics.get_metrics_for(__name__) @@ -258,10 +257,10 @@ class FederationRemoteSendQueue(object): ) for (key, (dest, user_id)) in dest_user_ids: - rows.append((key, PRESENCE_TYPE, ujson.dumps({ + rows.append((key, PRESENCE_TYPE, { "destination": dest, "state": self.presence_map[user_id].as_dict(), - }))) + })) # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() @@ -271,10 +270,10 @@ class FederationRemoteSendQueue(object): for (pos, (destination, edu_key)) in keyed_edus: rows.append( - (pos, KEYED_EDU_TYPE, ujson.dumps({ + (pos, KEYED_EDU_TYPE, { "key": edu_key, "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(), - })) + }) ) # Fetch changed edus @@ -284,7 +283,7 @@ class FederationRemoteSendQueue(object): edus = set((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: - rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict()))) + rows.append((pos, EDU_TYPE, edu.get_internal_dict())) # Fetch changed failures keys = self.failures.keys() @@ -293,10 +292,10 @@ class FederationRemoteSendQueue(object): failures = set((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: - rows.append((pos, FAILURE_TYPE, ujson.dumps({ + rows.append((pos, FAILURE_TYPE, { "destination": destination, "failure": failure, - }))) + })) # Fetch changed device messages keys = self.device_messages.keys() @@ -305,9 +304,9 @@ class FederationRemoteSendQueue(object): device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) for (pos, destination) in device_messages: - rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({ + rows.append((pos, DEVICE_MESSAGE_TYPE, { "destination": destination, - }))) + })) # Sort rows based on pos rows.sort() -- cgit 1.5.1 From 449d1297cae8bb05140b257f6e08e4cc036bf481 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Apr 2017 11:48:27 +0100 Subject: Fix up federation SendQueue and document types --- synapse/app/federation_sender.py | 66 +---------- synapse/federation/send_queue.py | 246 ++++++++++++++++++++++++++++++++++----- 2 files changed, 221 insertions(+), 91 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 145c01f3a3..477e16e0fa 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -23,7 +23,6 @@ from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory from synapse.http.site import SynapseSite from synapse.federation import send_queue -from synapse.federation.units import Edu from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.events import SlavedEventStore @@ -33,7 +32,6 @@ from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine -from synapse.storage.presence import UserPresenceState from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -277,69 +275,7 @@ class FederationSenderHandler(object): # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. if stream_name == "federation": - # The federation stream containis a bunch of different types of - # rows that need to be handled differently. We parse the rows, put - # them into the appropriate collection and then send them off. - presence_to_send = {} - keyed_edus = {} - edus = {} - failures = {} - device_destinations = set() - - # Parse the rows in the stream - for row in rows: - typ = row.type - content = row.data - - if typ == send_queue.PRESENCE_TYPE: - destination = content["destination"] - state = UserPresenceState.from_dict(content["state"]) - - presence_to_send.setdefault(destination, []).append(state) - elif typ == send_queue.KEYED_EDU_TYPE: - key = content["key"] - edu = Edu(**content["edu"]) - - keyed_edus.setdefault( - edu.destination, {} - )[(edu.destination, tuple(key))] = edu - elif typ == send_queue.EDU_TYPE: - edu = Edu(**content) - - edus.setdefault(edu.destination, []).append(edu) - elif typ == send_queue.FAILURE_TYPE: - destination = content["destination"] - failure = content["failure"] - - failures.setdefault(destination, []).append(failure) - elif typ == send_queue.DEVICE_MESSAGE_TYPE: - device_destinations.add(content["destination"]) - else: - raise Exception("Unrecognised federation type: %r", typ) - - # We've finished collecting, send everything off - for destination, states in presence_to_send.items(): - self.federation_sender.send_presence(destination, states) - - for destination, edu_map in keyed_edus.items(): - for key, edu in edu_map.items(): - self.federation_sender.send_edu( - edu.destination, edu.edu_type, edu.content, key=key, - ) - - for destination, edu_list in edus.items(): - for edu in edu_list: - self.federation_sender.send_edu( - edu.destination, edu.edu_type, edu.content, key=None, - ) - - for destination, failure_list in failures.items(): - for failure in failure_list: - self.federation_sender.send_failure(destination, failure) - - for destination in device_destinations: - self.federation_sender.send_device_messages(destination) - + send_queue.process_rows_for_federation(self.federation_sender, rows) preserve_fn(self.update_token)(token) # We also need to poke the federation sender when new events happen diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 78c852ed69..8a6392c697 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -31,22 +31,17 @@ Events are replicated via a separate events stream. from .units import Edu +from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure import synapse.metrics from blist import sorteddict +from collections import namedtuple metrics = synapse.metrics.get_metrics_for(__name__) -PRESENCE_TYPE = "p" -KEYED_EDU_TYPE = "k" -EDU_TYPE = "e" -FAILURE_TYPE = "f" -DEVICE_MESSAGE_TYPE = "d" - - class FederationRemoteSendQueue(object): """A drop in replacement for TransactionQueue""" @@ -257,10 +252,10 @@ class FederationRemoteSendQueue(object): ) for (key, (dest, user_id)) in dest_user_ids: - rows.append((key, PRESENCE_TYPE, { - "destination": dest, - "state": self.presence_map[user_id].as_dict(), - })) + rows.append((key, PresenceRow( + destination=dest, + state=self.presence_map[user_id], + ))) # Fetch changes keyed edus keys = self.keyed_edu_changed.keys() @@ -269,12 +264,10 @@ class FederationRemoteSendQueue(object): keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j]) for (pos, (destination, edu_key)) in keyed_edus: - rows.append( - (pos, KEYED_EDU_TYPE, { - "key": edu_key, - "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(), - }) - ) + rows.append((pos, KeyedEduRow( + key=edu_key, + edu=self.keyed_edu[(destination, edu_key)], + ))) # Fetch changed edus keys = self.edus.keys() @@ -283,7 +276,7 @@ class FederationRemoteSendQueue(object): edus = set((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: - rows.append((pos, EDU_TYPE, edu.get_internal_dict())) + rows.append((pos, EduRow(edu))) # Fetch changed failures keys = self.failures.keys() @@ -292,10 +285,10 @@ class FederationRemoteSendQueue(object): failures = set((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: - rows.append((pos, FAILURE_TYPE, { - "destination": destination, - "failure": failure, - })) + rows.append((pos, FailureRow( + destination=destination, + failure=failure, + ))) # Fetch changed device messages keys = self.device_messages.keys() @@ -304,11 +297,212 @@ class FederationRemoteSendQueue(object): device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) for (pos, destination) in device_messages: - rows.append((pos, DEVICE_MESSAGE_TYPE, { - "destination": destination, - })) + rows.append((pos, DeviceRow( + destination=destination, + ))) # Sort rows based on pos rows.sort() - return rows + return [(pos, row.TypeId, row.to_data()) for pos, row in rows] + + +class BaseFederationRow(object): + TypeId = None + + @staticmethod + def from_data(data): + """Parse the data from the federation stream into a row. + """ + raise NotImplementedError() + + def to_data(self): + """Serialize this row to be sent over the federation stream + """ + raise NotImplementedError() + + def add_to_buffer(self, buff): + """Add this row to the appropriate field in the buffer ready for this + to be sent over federation. + + We use a buffer so that we can batch up events that have come in at + the same time and send them all at once. + + Args: + buff (BufferedToSend) + """ + raise NotImplementedError() + + +class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( + "destination", # str + "state", # UserPresenceState +))): + TypeId = "p" + + @staticmethod + def from_data(data): + return PresenceRow( + destination=data["destination"], + state=UserPresenceState.from_dict(data["state"]) + ) + + def to_data(self): + return { + "destination": self.destination, + "state": self.state.as_dict() + } + + def add_to_buffer(self, buff): + buff.presence.setdefault(self.destination, []).append(self.state) + + +class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( + "key", # tuple(str) - the edu key passed to send_edu + "edu", # Edu +))): + TypeId = "k" + + @staticmethod + def from_data(data): + return KeyedEduRow( + key=tuple(data["key"]), + edu=Edu(**data["edu"]), + ) + + def to_data(self): + return { + "key": self.key, + "edu": self.edu.get_internal_dict(), + } + + def add_to_buffer(self, buff): + buff.keyed_edus.setdefault( + self.edu.destination, {} + )[self.key] = self.edu + + +class EduRow(BaseFederationRow, namedtuple("EduRow", ( + "edu", # Edu +))): + TypeId = "e" + + @staticmethod + def from_data(data): + return EduRow(Edu(**data)) + + def to_data(self): + return self.edu.get_internal_dict() + + def add_to_buffer(self, buff): + buff.edus.setdefault(self.edu.destination, []).append(self.edu) + + +class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( + "destination", # str + "failure", +))): + TypeId = "f" + + @staticmethod + def from_data(data): + return FailureRow( + destination=data["destination"], + failure=data["failure"], + ) + + def to_data(self): + return { + "destination": self.destination, + "failure": self.failure, + } + + def add_to_buffer(self, buff): + buff.failures.setdefault(self.destination, []).append(self.failure) + + +class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( + "destination", # str +))): + TypeId = "d" + + @staticmethod + def from_data(data): + return DeviceRow(destination=data) + + def to_data(self): + return self.destination + + def add_to_buffer(self, buff): + buff.device_destinations.add(self.destination) + + +TypeToRow = { + Row.TypeId: Row + for Row in ( + PresenceRow, + KeyedEduRow, + EduRow, + FailureRow, + DeviceRow, + ) +} + + +BufferedToSend = namedtuple("BufferedToSend", ( + "presence", # dict of destination -> [UserPresenceState] + "keyed_edus", # dict of destination -> { key -> Edu } + "edus", # dict of destination -> [Edu] + "failures", # dict of destination -> [failures] + "device_destinations", # set of destinations +)) + + +def process_rows_for_federation(federation_sender, rows): + """Parse a list of rows from the federation stream and them send them out. + + Args: + federation_sender (TransactionQueue) + rows (list(FederationStreamRow)) + """ + + # The federation stream containis a bunch of different types of + # rows that need to be handled differently. We parse the rows, put + # them into the appropriate collection and then send them off. + + buff = BufferedToSend( + presence={}, + keyed_edus={}, + edus={}, + failures={}, + device_destinations=set(), + ) + + # Parse the rows in the stream and add to the buffer + for row in rows: + RowType = TypeToRow[row.type] + parsed_row = RowType.from_data(row.data) + parsed_row.add_to_buffer(buff) + + # We've finished collecting, send everything off + for destination, states in buff.presence.iteritems(): + federation_sender.send_presence(destination, states) + + for destination, edu_map in buff.keyed_edus.iteritems(): + for key, edu in edu_map.items(): + federation_sender.send_edu( + edu.destination, edu.edu_type, edu.content, key=key, + ) + + for destination, edu_list in buff.edus.iteritems(): + for edu in edu_list: + federation_sender.send_edu( + edu.destination, edu.edu_type, edu.content, key=None, + ) + + for destination, failure_list in buff.failures.iteritems(): + for failure in failure_list: + federation_sender.send_failure(destination, failure) + + for destination in buff.device_destinations: + federation_sender.send_device_messages(destination) -- cgit 1.5.1 From d4d176e5d0d130763a5379b317d3d3d039055ba4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Apr 2017 11:51:28 +0100 Subject: Add logging --- synapse/federation/send_queue.py | 8 ++++++++ 1 file changed, 8 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 8a6392c697..867cba0cf1 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -38,6 +38,10 @@ import synapse.metrics from blist import sorteddict from collections import namedtuple +import logging + +logger = logging.getLogger(__name__) + metrics = synapse.metrics.get_metrics_for(__name__) @@ -480,6 +484,10 @@ def process_rows_for_federation(federation_sender, rows): # Parse the rows in the stream and add to the buffer for row in rows: + if row.type not in TypeToRow: + logger.error("Unrecognized federation row type %r", row.type) + continue + RowType = TypeToRow[row.type] parsed_row = RowType.from_data(row.data) parsed_row.add_to_buffer(buff) -- cgit 1.5.1 From a828a64b754322f8a1483ec5256ab925039a6e39 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 7 Apr 2017 11:52:57 +0100 Subject: Comment --- synapse/federation/send_queue.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 867cba0cf1..c26da7acf8 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -312,16 +312,29 @@ class FederationRemoteSendQueue(object): class BaseFederationRow(object): - TypeId = None + """Base class for rows to be sent in the federation stream. + + Specifies how to identify, serialize and deserialize the different types. + """ + + TypeId = None # Unique string that ids the type. Must be overriden in sub classes. @staticmethod def from_data(data): """Parse the data from the federation stream into a row. + + Args: + data: The value of ``data`` from FederationStreamRow.data, type + depends on the type of stream """ raise NotImplementedError() def to_data(self): - """Serialize this row to be sent over the federation stream + """Serialize this row to be sent over the federation stream. + + Returns: + The value to be sent in FederationStreamRow.data. The type depends + on the type of stream. """ raise NotImplementedError() -- cgit 1.5.1 From ab904caf3324de82c338268984c979d66f00aed9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 10:02:17 +0100 Subject: Comments --- synapse/federation/send_queue.py | 10 ++++++---- synapse/replication/tcp/streams.py | 4 ++-- 2 files changed, 8 insertions(+), 6 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index c26da7acf8..657a930497 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -238,6 +238,8 @@ class FederationRemoteSendQueue(object): if from_token > self.pos: from_token = -1 + # list of tuple(int, BaseFederationRow), where the first is the position + # of the federation stream. rows = [] # There should be only one reader, so lets delete everything its @@ -476,14 +478,15 @@ BufferedToSend = namedtuple("BufferedToSend", ( def process_rows_for_federation(federation_sender, rows): - """Parse a list of rows from the federation stream and them send them out. + """Parse a list of rows from the federation stream and put them in the + transaction queue ready for sending to the relevant homeservers. Args: federation_sender (TransactionQueue) - rows (list(FederationStreamRow)) + rows (list(synapse.replication.tcp.streams.FederationStreamRow)) """ - # The federation stream containis a bunch of different types of + # The federation stream contains a bunch of different types of # rows that need to be handled differently. We parse the rows, put # them into the appropriate collection and then send them off. @@ -505,7 +508,6 @@ def process_rows_for_federation(federation_sender, rows): parsed_row = RowType.from_data(row.data) parsed_row.add_to_buffer(buff) - # We've finished collecting, send everything off for destination, states in buff.presence.iteritems(): federation_sender.send_presence(destination, states) diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 967b459e0e..369d5f2428 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -98,8 +98,8 @@ ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", ( "entity", # str )) FederationStreamRow = namedtuple("FederationStreamRow", ( - "type", # str - "data", # dict + "type", # str, the type of data as defined in the BaseFederationRows + "data", # dict, serialization of a federation.send_queue.BaseFederationRow )) TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", ( "user_id", # str -- cgit 1.5.1 From f8434db549acd400e880a1e2583ec2d077d46ebf Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 10:03:07 +0100 Subject: Change name --- synapse/federation/send_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 657a930497..df807e57a6 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -468,7 +468,7 @@ TypeToRow = { } -BufferedToSend = namedtuple("BufferedToSend", ( +ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( "presence", # dict of destination -> [UserPresenceState] "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] @@ -490,7 +490,7 @@ def process_rows_for_federation(federation_sender, rows): # rows that need to be handled differently. We parse the rows, put # them into the appropriate collection and then send them off. - buff = BufferedToSend( + buff = ParsedFederationStreamData( presence={}, keyed_edus={}, edus={}, -- cgit 1.5.1 From 8c5f03cec746a1414eab3a052b583d9053086d87 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 10:07:18 +0100 Subject: Revert to sending the same data type as before --- synapse/federation/send_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index df807e57a6..95fd20e434 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -447,10 +447,10 @@ class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( @staticmethod def from_data(data): - return DeviceRow(destination=data) + return DeviceRow(destination=data["destination"]) def to_data(self): - return self.destination + return {"destination": self.destination} def add_to_buffer(self, buff): buff.device_destinations.add(self.destination) -- cgit 1.5.1 From 0018491af213e19ae73af4e84e3570762dc83c7c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 12:44:17 +0100 Subject: Rename variable --- synapse/federation/send_queue.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 95fd20e434..748548bbe2 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -477,12 +477,12 @@ ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( )) -def process_rows_for_federation(federation_sender, rows): +def process_rows_for_federation(transaction_queue, rows): """Parse a list of rows from the federation stream and put them in the transaction queue ready for sending to the relevant homeservers. Args: - federation_sender (TransactionQueue) + transaction_queue (TransactionQueue) rows (list(synapse.replication.tcp.streams.FederationStreamRow)) """ @@ -509,23 +509,23 @@ def process_rows_for_federation(federation_sender, rows): parsed_row.add_to_buffer(buff) for destination, states in buff.presence.iteritems(): - federation_sender.send_presence(destination, states) + transaction_queue.send_presence(destination, states) for destination, edu_map in buff.keyed_edus.iteritems(): for key, edu in edu_map.items(): - federation_sender.send_edu( + transaction_queue.send_edu( edu.destination, edu.edu_type, edu.content, key=key, ) for destination, edu_list in buff.edus.iteritems(): for edu in edu_list: - federation_sender.send_edu( + transaction_queue.send_edu( edu.destination, edu.edu_type, edu.content, key=None, ) for destination, failure_list in buff.failures.iteritems(): for failure in failure_list: - federation_sender.send_failure(destination, failure) + transaction_queue.send_failure(destination, failure) for destination in buff.device_destinations: - federation_sender.send_device_messages(destination) + transaction_queue.send_device_messages(destination) -- cgit 1.5.1 From 29574fd5b3537cc272a4d792669b8d5be2a92b6f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 16:48:30 +0100 Subject: Reduce federation presence replication traffic This is mainly done by moving the calculation of where to send presence updates from the presence handler to the transaction queue, so we only need to send the presence event (and not the destinations) across the replication connection. Before we were duplicating by sending the full state across once per destination. --- synapse/app/federation_sender.py | 12 ++++ synapse/app/synchrotron.py | 6 +- synapse/federation/send_queue.py | 47 ++++++-------- synapse/federation/transaction_queue.py | 99 ++++++++++++++++++++++++++--- synapse/handlers/presence.py | 54 ++++------------ synapse/replication/slave/storage/events.py | 1 + 6 files changed, 139 insertions(+), 80 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 477e16e0fa..49efb602bc 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -32,6 +32,7 @@ from synapse.replication.slave.storage.transactions import TransactionStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.storage.engines import create_engine +from synapse.storage.presence import PresenceStore from synapse.util.async import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn @@ -80,6 +81,17 @@ class FederationSenderSlaveStore( return rows[0][0] if rows else -1 + # XXX: This is a bit broken because we don't persist the accepted list in a + # way that can be replicated. This means that we don't have a way to + # invalidate the cache correctly. + # This is fine since in practice nobody uses the presence list stuff... + get_presence_list_accepted = PresenceStore.__dict__[ + "get_presence_list_accepted" + ] + get_presence_list_observers_accepted = PresenceStore.__dict__[ + "get_presence_list_observers_accepted" + ] + class FederationSenderServer(HomeServer): def get_db_conn(self, run_new_connection=True): diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index d39e3161fe..7b6f82abdc 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -206,10 +206,8 @@ class SynchrotronPresence(object): @defer.inlineCallbacks def notify_from_replication(self, states, stream_id): - parties = yield self._get_interested_parties( - states, calculate_remote_hosts=False - ) - room_ids_to_states, users_to_states, _ = parties + parties = yield self._get_interested_parties(states) + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 748548bbe2..a12c18f4df 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -53,6 +53,7 @@ class FederationRemoteSendQueue(object): self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() + self.is_mine_id = hs.is_mine_id self.presence_map = {} self.presence_changed = sorteddict() @@ -120,7 +121,9 @@ class FederationRemoteSendQueue(object): del self.presence_changed[key] user_ids = set( - user_id for uids in self.presence_changed.values() for _, user_id in uids + user_id + for uids in self.presence_changed.itervalues() + for user_id in uids ) to_del = [ @@ -187,18 +190,14 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() - def send_presence(self, destination, states): + def send_presence(self, states): """As per TransactionQueue""" pos = self._next_pos() - self.presence_map.update({ - state.user_id: state - for state in states - }) + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - self.presence_changed[pos] = [ - (destination, state.user_id) for state in states - ] + self.presence_map.update({state.user_id: state for state in local_states}) + self.presence_changed[pos] = [state.user_id for state in local_states] self.notifier.on_new_replication_data() @@ -251,15 +250,14 @@ class FederationRemoteSendQueue(object): keys = self.presence_changed.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - dest_user_ids = set( - (pos, dest_user_id) + dest_user_ids = [ + (pos, user_id) for pos in keys[i:j] - for dest_user_id in self.presence_changed[pos] - ) + for user_id in self.presence_changed[pos] + ] - for (key, (dest, user_id)) in dest_user_ids: + for (key, user_id) in dest_user_ids: rows.append((key, PresenceRow( - destination=dest, state=self.presence_map[user_id], ))) @@ -354,7 +352,6 @@ class BaseFederationRow(object): class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( - "destination", # str "state", # UserPresenceState ))): TypeId = "p" @@ -362,18 +359,14 @@ class PresenceRow(BaseFederationRow, namedtuple("PresenceRow", ( @staticmethod def from_data(data): return PresenceRow( - destination=data["destination"], - state=UserPresenceState.from_dict(data["state"]) + state=UserPresenceState.from_dict(data) ) def to_data(self): - return { - "destination": self.destination, - "state": self.state.as_dict() - } + return self.state.as_dict() def add_to_buffer(self, buff): - buff.presence.setdefault(self.destination, []).append(self.state) + buff.presence.append(self.state) class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( @@ -469,7 +462,7 @@ TypeToRow = { ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", ( - "presence", # dict of destination -> [UserPresenceState] + "presence", # list(UserPresenceState) "keyed_edus", # dict of destination -> { key -> Edu } "edus", # dict of destination -> [Edu] "failures", # dict of destination -> [failures] @@ -491,7 +484,7 @@ def process_rows_for_federation(transaction_queue, rows): # them into the appropriate collection and then send them off. buff = ParsedFederationStreamData( - presence={}, + presence=[], keyed_edus={}, edus={}, failures={}, @@ -508,8 +501,8 @@ def process_rows_for_federation(transaction_queue, rows): parsed_row = RowType.from_data(row.data) parsed_row.add_to_buffer(buff) - for destination, states in buff.presence.iteritems(): - transaction_queue.send_presence(destination, states) + if buff.presence: + transaction_queue.send_presence(buff.presence) for destination, edu_map in buff.keyed_edus.iteritems(): for key, edu in edu_map.items(): diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c27ce7c5f3..fd9e1fa01c 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -21,7 +21,7 @@ from .units import Transaction, Edu from synapse.api.errors import HttpResponseException from synapse.util.async import run_on_reactor -from synapse.util.logcontext import preserve_context_over_fn +from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id @@ -78,6 +78,7 @@ class TransactionQueue(object): self.pending_edus_by_dest = edus = {} # Presence needs to be separate as we send single aggragate EDUs + self.pending_presence = {} self.pending_presence_by_dest = presence = {} self.pending_edus_keyed_by_dest = edus_keyed = {} @@ -113,6 +114,8 @@ class TransactionQueue(object): self._is_processing = False self._last_poked_id = -1 + self._processing_pending_presence = False + def can_send_to(self, destination): """Can we send messages to the given server? @@ -224,17 +227,95 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - def send_presence(self, destination, states): - if not self.can_send_to(destination): + @preserve_fn + @defer.inlineCallbacks + def send_presence(self, states): + """Send the new presence states to the appropriate destinations. + + Args: + states (list(UserPresenceState)) + """ + + # First we queue up the new presence by user ID, so multiple presence + # updates in quick successtion are correctly handled + self.pending_presence.update({state.user_id: state for state in states}) + + # We then handle the new pending presence in batches, first figuring + # out the destinations we need to send each state to and then poking it + # to attempt a new transaction. We linearize this so that we don't + # accidentally mess up the ordering and send multiple presence updates + # in the wrong order + if self._processing_pending_presence: return - self.pending_presence_by_dest.setdefault(destination, {}).update({ - state.user_id: state for state in states - }) + self._processing_pending_presence = True + try: + while True: + states = self.pending_presence + self.pending_presence = {} - preserve_context_over_fn( - self._attempt_new_transaction, destination - ) + if not states: + break + + yield self._process_presence_inner(states) + finally: + self._processing_pending_presence = False + + @measure_func("txnqueue._process_presence") + @defer.inlineCallbacks + def _process_presence_inner(self, states): + """Given a list of states populate self.pending_presence_by_dest and + poke to send a new transaction to each destination + + Args: + states (list(UserPresenceState)) + """ + # First we look up the rooms each user is in (as well as any explicit + # subscriptions), then for each distinct room we look up the remote + # hosts in those rooms. + room_ids_to_states = {} + users_to_states = {} + for state in states.itervalues(): + room_ids = yield self.store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) + + plist = yield self.store.get_presence_list_observers_accepted( + state.user_id, + ) + for u in plist: + users_to_states.setdefault(u, []).append(state) + + hosts_and_states = [] + for room_id, states in room_ids_to_states.items(): + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + if not local_states: + continue + + hosts = yield self.store.get_hosts_in_room(room_id) + hosts_and_states.append((hosts, local_states)) + + for user_id, states in users_to_states.items(): + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + if not local_states: + continue + + host = get_domain_from_id(user_id) + hosts_and_states.append(([host], local_states)) + + # And now finally queue up new transactions + for destinations, states in hosts_and_states: + for destination in destinations: + if not self.can_send_to(destination): + continue + + self.pending_presence_by_dest.setdefault( + destination, {} + ).update({ + state.user_id: state for state in states + }) + + preserve_fn(self._attempt_new_transaction)(destination) def send_edu(self, destination, edu_type, content, key=None): edu = Edu( diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 9ed5af3cb4..c1c0dd4d3d 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -318,11 +318,7 @@ class PresenceHandler(object): if to_federation_ping: federation_presence_out_counter.inc_by(len(to_federation_ping)) - _, _, hosts_to_states = yield self._get_interested_parties( - to_federation_ping.values() - ) - - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(to_federation_ping.values()) def _handle_timeouts(self): """Checks the presence of users that have timed out and updates as @@ -615,12 +611,12 @@ class PresenceHandler(object): defer.returnValue(states) @defer.inlineCallbacks - def _get_interested_parties(self, states, calculate_remote_hosts=True): + def _get_interested_parties(self, states): """Given a list of states return which entities (rooms, users, servers) are interested in the given states. Returns: - 3-tuple: `(room_ids_to_states, users_to_states, hosts_to_states)`, + 2-tuple: `(room_ids_to_states, users_to_states)`, with each item being a dict of `entity_name` -> `[UserPresenceState]` """ room_ids_to_states = {} @@ -637,30 +633,10 @@ class PresenceHandler(object): # Always notify self users_to_states.setdefault(state.user_id, []).append(state) - hosts_to_states = {} - if calculate_remote_hosts: - for room_id, states in room_ids_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - hosts = yield self.store.get_hosts_in_room(room_id) - - for host in hosts: - hosts_to_states.setdefault(host, []).extend(local_states) - - for user_id, states in users_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - host = get_domain_from_id(user_id) - hosts_to_states.setdefault(host, []).extend(local_states) - # TODO: de-dup hosts_to_states, as a single host might have multiple # of same presence - defer.returnValue((room_ids_to_states, users_to_states, hosts_to_states)) + defer.returnValue((room_ids_to_states, users_to_states)) @defer.inlineCallbacks def _persist_and_notify(self, states): @@ -670,33 +646,32 @@ class PresenceHandler(object): stream_id, max_token = yield self.store.update_presence(states) parties = yield self._get_interested_parties(states) - room_ids_to_states, users_to_states, hosts_to_states = parties + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states.keys()] + users=[UserID.from_string(u) for u in users_to_states] ) - self._push_to_remotes(hosts_to_states) + self._push_to_remotes(states) @defer.inlineCallbacks def notify_for_states(self, state, stream_id): parties = yield self._get_interested_parties([state]) - room_ids_to_states, users_to_states, hosts_to_states = parties + room_ids_to_states, users_to_states = parties self.notifier.on_new_event( "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=[UserID.from_string(u) for u in users_to_states.keys()] + users=[UserID.from_string(u) for u in users_to_states] ) - def _push_to_remotes(self, hosts_to_states): + def _push_to_remotes(self, states): """Sends state updates to remote servers. Args: - hosts_to_states (dict): Mapping `server_name` -> `[UserPresenceState]` + hosts_to_states (list): list(state) """ - for host, states in hosts_to_states.items(): - self.federation.send_presence(host, states) + self.federation.send_presence(states) @defer.inlineCallbacks def incoming_presence(self, origin, content): @@ -837,14 +812,13 @@ class PresenceHandler(object): if self.is_mine(user): state = yield self.current_state_for_user(user.to_string()) - hosts = set(get_domain_from_id(u) for u in user_ids) - self._push_to_remotes({host: (state,) for host in hosts}) + self._push_to_remotes([state]) else: user_ids = filter(self.is_mine_id, user_ids) states = yield self.current_state_for_users(user_ids) - self._push_to_remotes({user.domain: states.values()}) + self._push_to_remotes(states.values()) @defer.inlineCallbacks def get_presence_list(self, observer_user, accepted=None): diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 5fd47706ef..4ca1e5aa8c 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -71,6 +71,7 @@ class SlavedEventStore(BaseSlavedStore): # to reach inside the __dict__ to extract them. get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"] get_users_in_room = RoomMemberStore.__dict__["get_users_in_room"] + get_hosts_in_room = RoomMemberStore.__dict__["get_hosts_in_room"] get_users_who_share_room_with_user = ( RoomMemberStore.__dict__["get_users_who_share_room_with_user"] ) -- cgit 1.5.1 From 40453b3f84c933dddb58cb3d2746091a80272546 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 16:49:51 +0100 Subject: Dedupe KeyedEdu and Devices federation repl traffic --- synapse/federation/send_queue.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 748548bbe2..7d79c069f1 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -267,9 +267,12 @@ class FederationRemoteSendQueue(object): keys = self.keyed_edu_changed.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:j]) + # We purposefully clobber based on the key here, python dict comprehensions + # always use the last value, so this will correctly point to the last + # stream position. + keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]} - for (pos, (destination, edu_key)) in keyed_edus: + for ((destination, edu_key), pos) in keyed_edus.iteritems(): rows.append((pos, KeyedEduRow( key=edu_key, edu=self.keyed_edu[(destination, edu_key)], @@ -279,7 +282,7 @@ class FederationRemoteSendQueue(object): keys = self.edus.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - edus = set((k, self.edus[k]) for k in keys[i:j]) + edus = [(k, self.edus[k]) for k in keys[i:j]] for (pos, edu) in edus: rows.append((pos, EduRow(edu))) @@ -288,7 +291,7 @@ class FederationRemoteSendQueue(object): keys = self.failures.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - failures = set((k, self.failures[k]) for k in keys[i:j]) + failures = [(k, self.failures[k]) for k in keys[i:j]] for (pos, (destination, failure)) in failures: rows.append((pos, FailureRow( @@ -300,9 +303,9 @@ class FederationRemoteSendQueue(object): keys = self.device_messages.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - device_messages = set((k, self.device_messages[k]) for k in keys[i:j]) + device_messages = {self.device_messages[k]: k for k in keys[i:j]} - for (pos, destination) in device_messages: + for (destination, pos) in device_messages.iteritems(): rows.append((pos, DeviceRow( destination=destination, ))) -- cgit 1.5.1 From 84fbb80c8f6cef2e37195e3fd321c3ddab760c1f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 10 Apr 2017 16:55:56 +0100 Subject: Use generators --- synapse/federation/send_queue.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 7d79c069f1..fcd9929990 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -282,7 +282,7 @@ class FederationRemoteSendQueue(object): keys = self.edus.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - edus = [(k, self.edus[k]) for k in keys[i:j]] + edus = ((k, self.edus[k]) for k in keys[i:j]) for (pos, edu) in edus: rows.append((pos, EduRow(edu))) @@ -291,7 +291,7 @@ class FederationRemoteSendQueue(object): keys = self.failures.keys() i = keys.bisect_right(from_token) j = keys.bisect_right(to_token) + 1 - failures = [(k, self.failures[k]) for k in keys[i:j]] + failures = ((k, self.failures[k]) for k in keys[i:j]) for (pos, (destination, failure)) in failures: rows.append((pos, FailureRow( -- cgit 1.5.1 From b9b72bc6e2bdb3c6684db3e05e18b632755c7ccc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 15:15:34 +0100 Subject: Comments --- synapse/federation/transaction_queue.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index fd9e1fa01c..eb361d904c 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -78,8 +78,18 @@ class TransactionQueue(object): self.pending_edus_by_dest = edus = {} # Presence needs to be separate as we send single aggragate EDUs + + # Map of user_id -> UserPresenceState for all the pending presence + # to be sent out by user_id. Entries here get processed and put in + # pending_presence_by_dest self.pending_presence = {} + # Map of destination -> user_id -> UserPresenceState of pending presence + # to be sent to each destinations self.pending_presence_by_dest = presence = {} + + # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered + # based on their key (e.g. typing events by room_id) + # Map of destination -> (edu_type, key) -> Edu self.pending_edus_keyed_by_dest = edus_keyed = {} metrics.register_callback( @@ -227,11 +237,14 @@ class TransactionQueue(object): self._attempt_new_transaction, destination ) - @preserve_fn + @preserve_fn # the caller should not yield on this @defer.inlineCallbacks def send_presence(self, states): """Send the new presence states to the appropriate destinations. + This actually queues up the presence states ready for sending and + triggers a background task to process them and send out the transactions. + Args: states (list(UserPresenceState)) """ -- cgit 1.5.1 From 6308ac45b08ff5bf7259f09a2a767212f3f97860 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 15:19:26 +0100 Subject: Move get_interested_remotes back to presence handler --- synapse/federation/transaction_queue.py | 41 ++++---------------------- synapse/handlers/presence.py | 52 +++++++++++++++++++++++++++++++-- 2 files changed, 55 insertions(+), 38 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index eb361d904c..08ceda31a6 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -25,7 +25,7 @@ from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.types import get_domain_from_id -from synapse.handlers.presence import format_user_presence_state +from synapse.handlers.presence import format_user_presence_state, get_interested_remotes import synapse.metrics import logging @@ -251,7 +251,10 @@ class TransactionQueue(object): # First we queue up the new presence by user ID, so multiple presence # updates in quick successtion are correctly handled - self.pending_presence.update({state.user_id: state for state in states}) + self.pending_presence.update({ + state.user_id: state for state in states + if self.is_mine_id(state.user_id) + }) # We then handle the new pending presence in batches, first figuring # out the destinations we need to send each state to and then poking it @@ -283,40 +286,8 @@ class TransactionQueue(object): Args: states (list(UserPresenceState)) """ - # First we look up the rooms each user is in (as well as any explicit - # subscriptions), then for each distinct room we look up the remote - # hosts in those rooms. - room_ids_to_states = {} - users_to_states = {} - for state in states.itervalues(): - room_ids = yield self.store.get_rooms_for_user(state.user_id) - for room_id in room_ids: - room_ids_to_states.setdefault(room_id, []).append(state) - - plist = yield self.store.get_presence_list_observers_accepted( - state.user_id, - ) - for u in plist: - users_to_states.setdefault(u, []).append(state) - - hosts_and_states = [] - for room_id, states in room_ids_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - hosts = yield self.store.get_hosts_in_room(room_id) - hosts_and_states.append((hosts, local_states)) - - for user_id, states in users_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue - - host = get_domain_from_id(user_id) - hosts_and_states.append(([host], local_states)) + hosts_and_states = yield get_interested_remotes(self.store, states) - # And now finally queue up new transactions for destinations, states in hosts_and_states: for destination in destinations: if not self.can_send_to(destination): diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index c1c0dd4d3d..685373ff28 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -633,9 +633,6 @@ class PresenceHandler(object): # Always notify self users_to_states.setdefault(state.user_id, []).append(state) - # TODO: de-dup hosts_to_states, as a single host might have multiple - # of same presence - defer.returnValue((room_ids_to_states, users_to_states)) @defer.inlineCallbacks @@ -1318,3 +1315,52 @@ def handle_update(prev_state, new_state, is_mine, wheel_timer, now): persist_and_notify = True return new_state, persist_and_notify, federation_ping + +@defer.inlineCallbacks +def get_interested_remotes(store, states): + """Given a list of presence states figure out which remote servers + should be sent which. + + All the presence states should be for local users only. + + Args: + store (DataStore) + states (list(UserPresenceState)) + + Returns: + Deferred list of ([destinations], [UserPresenceState]), where for + each row the list of UserPresenceState should be sent to each + destination + """ + # First we look up the rooms each user is in (as well as any explicit + # subscriptions), then for each distinct room we look up the remote + # hosts in those rooms. + room_ids_to_states = {} + users_to_states = {} + for state in states.itervalues(): + room_ids = yield store.get_rooms_for_user(state.user_id) + for room_id in room_ids: + room_ids_to_states.setdefault(room_id, []).append(state) + + plist = yield store.get_presence_list_observers_accepted( + state.user_id, + ) + for u in plist: + users_to_states.setdefault(u, []).append(state) + + hosts_and_states = [] + for room_id, states in room_ids_to_states.items(): + if not local_states: + continue + + hosts = yield store.get_hosts_in_room(room_id) + hosts_and_states.append((hosts, local_states)) + + for user_id, states in users_to_states.items(): + if not local_states: + continue + + host = get_domain_from_id(user_id) + hosts_and_states.append(([host], local_states)) + + defer.returnValue(hosts_and_states) -- cgit 1.5.1 From 2be8a281d2aac8e2e3829b9aff6eb366506d22d1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 15:28:24 +0100 Subject: Comments --- synapse/federation/send_queue.py | 14 +++++++------- synapse/handlers/presence.py | 5 +++-- 2 files changed, 10 insertions(+), 9 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index a12c18f4df..7e52a55eda 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -55,17 +55,17 @@ class FederationRemoteSendQueue(object): self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id - self.presence_map = {} - self.presence_changed = sorteddict() + self.presence_map = {} # Pending presence map user_id -> UserPresenceState + self.presence_changed = sorteddict() # Stream position -> user_id - self.keyed_edu = {} - self.keyed_edu_changed = sorteddict() + self.keyed_edu = {} # (destination, key) -> EDU + self.keyed_edu_changed = sorteddict() # stream position -> (destination, key) - self.edus = sorteddict() + self.edus = sorteddict() # stream position -> Edu - self.failures = sorteddict() + self.failures = sorteddict() # stream position -> (destination, Failure) - self.device_messages = sorteddict() + self.device_messages = sorteddict() # stream position -> destination self.pos = 1 self.pos_time = sorteddict() diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 685373ff28..98e736be5b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -666,7 +666,7 @@ class PresenceHandler(object): """Sends state updates to remote servers. Args: - hosts_to_states (list): list(state) + hosts_to_states (list(UserPresenceState)) """ self.federation.send_presence(states) @@ -1332,6 +1332,8 @@ def get_interested_remotes(store, states): each row the list of UserPresenceState should be sent to each destination """ + hosts_and_states = [] # Final result to return + # First we look up the rooms each user is in (as well as any explicit # subscriptions), then for each distinct room we look up the remote # hosts in those rooms. @@ -1348,7 +1350,6 @@ def get_interested_remotes(store, states): for u in plist: users_to_states.setdefault(u, []).append(state) - hosts_and_states = [] for room_id, states in room_ids_to_states.items(): if not local_states: continue -- cgit 1.5.1 From a8c8e4efd4055be316c2b38624e7afd77c57b971 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 11 Apr 2017 15:34:55 +0100 Subject: Comment --- synapse/federation/send_queue.py | 8 +++++++- synapse/federation/transaction_queue.py | 2 ++ 2 files changed, 9 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 7e52a55eda..19cb757acc 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -191,9 +191,15 @@ class FederationRemoteSendQueue(object): self.notifier.on_new_replication_data() def send_presence(self, states): - """As per TransactionQueue""" + """As per TransactionQueue + + Args: + states (list(UserPresenceState)) + """ pos = self._next_pos() + # We only want to send presence for our own users, so lets always just + # filter here just in case. local_states = filter(lambda s: self.is_mine_id(s.user_id), states) self.presence_map.update({state.user_id: state for state in local_states}) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 08ceda31a6..260a472255 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -251,6 +251,8 @@ class TransactionQueue(object): # First we queue up the new presence by user ID, so multiple presence # updates in quick successtion are correctly handled + # We only want to send presence for our own users, so lets always just + # filter here just in case. self.pending_presence.update({ state.user_id: state for state in states if self.is_mine_id(state.user_id) -- cgit 1.5.1 From 11dbceb761feef8c325a28e19805aee0f431f53b Mon Sep 17 00:00:00 2001 From: "Paul \"LeoNerd\" Evans" Date: Tue, 11 Apr 2017 17:16:12 +0100 Subject: Add a counter metric for successfully-sent transactions --- synapse/federation/transaction_queue.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index c27ce7c5f3..0e43891647 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -41,6 +41,8 @@ sent_pdus_destination_dist = client_metrics.register_distribution( ) sent_edus_counter = client_metrics.register_counter("sent_edus") +sent_transactions_counter = client_metrics.register_counter("sent_transactions") + class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -374,6 +376,7 @@ class TransactionQueue(object): destination, pending_pdus, pending_edus, pending_failures, ) if success: + sent_transactions_counter.inc() # Remove the acknowledged device messages from the database # Only bother if we actually sent some device messages if device_message_edus: -- cgit 1.5.1 From c7ddb5ef7ac3dc7370010ee6685497ee73f46fa2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Apr 2017 10:11:43 +0100 Subject: Reuse get_interested_parties --- synapse/federation/transaction_queue.py | 6 +++--- synapse/handlers/presence.py | 21 +++++---------------- 2 files changed, 8 insertions(+), 19 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 260a472255..feb1605019 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -269,13 +269,13 @@ class TransactionQueue(object): self._processing_pending_presence = True try: while True: - states = self.pending_presence + states_map = self.pending_presence self.pending_presence = {} - if not states: + if not states_map: break - yield self._process_presence_inner(states) + yield self._process_presence_inner(states_map.values()) finally: self._processing_pending_presence = False diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index b9ce997a94..f3707afcd0 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -641,7 +641,7 @@ class PresenceHandler(object): """Sends state updates to remote servers. Args: - hosts_to_states (list(UserPresenceState)) + states (list(UserPresenceState)) """ self.federation.send_presence(states) @@ -1337,29 +1337,18 @@ def get_interested_remotes(store, states): each row the list of UserPresenceState should be sent to each destination """ - hosts_and_states = [] # Final result to return + hosts_and_states = [] # First we look up the rooms each user is in (as well as any explicit # subscriptions), then for each distinct room we look up the remote # hosts in those rooms. - room_ids_to_states = {} - users_to_states = {} - for state in states.itervalues(): - room_ids = yield store.get_rooms_for_user(state.user_id) - for room_id in room_ids: - room_ids_to_states.setdefault(room_id, []).append(state) - - plist = yield store.get_presence_list_observers_accepted( - state.user_id, - ) - for u in plist: - users_to_states.setdefault(u, []).append(state) + room_ids_to_states, users_to_states = yield get_interested_parties(store, states) - for room_id, states in room_ids_to_states.items(): + for room_id, states in room_ids_to_states.iteritems(): hosts = yield store.get_hosts_in_room(room_id) hosts_and_states.append((hosts, states)) - for user_id, states in users_to_states.items(): + for user_id, states in users_to_states.iteritems(): host = get_domain_from_id(user_id) hosts_and_states.append(([host], states)) -- cgit 1.5.1 From 17450695438c0ed59d64e669ef6861316cef43af Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Apr 2017 10:17:10 +0100 Subject: Comment --- synapse/federation/transaction_queue.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index feb1605019..2919c2351a 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -77,12 +77,11 @@ class TransactionQueue(object): # destination -> list of tuple(edu, deferred) self.pending_edus_by_dest = edus = {} - # Presence needs to be separate as we send single aggragate EDUs - # Map of user_id -> UserPresenceState for all the pending presence # to be sent out by user_id. Entries here get processed and put in # pending_presence_by_dest self.pending_presence = {} + # Map of destination -> user_id -> UserPresenceState of pending presence # to be sent to each destinations self.pending_presence_by_dest = presence = {} -- cgit 1.5.1 From 26ae5178a4705f46f1eef605d6af1cbf8f7b7a85 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 12 Apr 2017 10:36:29 +0100 Subject: Add some comments --- synapse/federation/send_queue.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index fcd9929990..b952e59518 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -383,6 +383,10 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( "key", # tuple(str) - the edu key passed to send_edu "edu", # Edu ))): + """Streams EDUs that have an associated key that is ued to clobber. For example, + typing EDUs clobber based on room_id. + """ + TypeId = "k" @staticmethod @@ -407,6 +411,8 @@ class KeyedEduRow(BaseFederationRow, namedtuple("KeyedEduRow", ( class EduRow(BaseFederationRow, namedtuple("EduRow", ( "edu", # Edu ))): + """Streams EDUs that don't have keys. See KeyedEduRow + """ TypeId = "e" @staticmethod @@ -424,6 +430,11 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( "destination", # str "failure", ))): + """Streams failures to a remote server. Failures are issued when there was + something wrong with a transaction the remote sent us, e.g. it included + an event that was invalid. + """ + TypeId = "f" @staticmethod @@ -446,6 +457,10 @@ class FailureRow(BaseFederationRow, namedtuple("FailureRow", ( class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ( "destination", # str ))): + """Streams the fact that either a) there is pending to device messages for + users on the remote, or b) a local users device has changed and needs to + be sent to the remote. + """ TypeId = "d" @staticmethod -- cgit 1.5.1 From 4903ccf159552319a36708069c8eb52ce53542dd Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 21 Apr 2017 00:46:54 +0100 Subject: Fix some lies, and other clarifications, in docstrings The documentation on get_json has been wrong ever since the very first commit to synapse... --- contrib/cmdclient/http.py | 16 ++++------- synapse/federation/federation_client.py | 50 +++++++++++++++++++++++++++++++-- synapse/federation/transport/client.py | 20 +++++++++++++ synapse/http/matrixfederationclient.py | 21 +++++++++----- 4 files changed, 88 insertions(+), 19 deletions(-) (limited to 'synapse/federation') diff --git a/contrib/cmdclient/http.py b/contrib/cmdclient/http.py index 4186897316..c833f3f318 100644 --- a/contrib/cmdclient/http.py +++ b/contrib/cmdclient/http.py @@ -36,15 +36,13 @@ class HttpClient(object): the request body. This will be encoded as JSON. Returns: - Deferred: Succeeds when we get *any* HTTP response. - - The result of the deferred is a tuple of `(code, response)`, - where `response` is a dict representing the decoded JSON body. + Deferred: Succeeds when we get a 2xx HTTP response. The result + will be the decoded JSON body. """ pass def get_json(self, url, args=None): - """ Get's some json from the given host homeserver and path + """ Gets some json from the given host homeserver and path Args: url (str): The URL to GET data from. @@ -54,10 +52,8 @@ class HttpClient(object): and *not* a string. Returns: - Deferred: Succeeds when we get *any* HTTP response. - - The result of the deferred is a tuple of `(code, response)`, - where `response` is a dict representing the decoded JSON body. + Deferred: Succeeds when we get a 2xx HTTP response. The result + will be the decoded JSON body. """ pass @@ -214,4 +210,4 @@ class _JsonProducer(object): pass def stopProducing(self): - pass \ No newline at end of file + pass diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index deee0f4904..861441708b 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -474,8 +474,13 @@ class FederationClient(FederationBase): content (object): Any additional data to put into the content field of the event. Return: - A tuple of (origin (str), event (object)) where origin is the remote - homeserver which generated the event. + Deferred: resolves to a tuple of (origin (str), event (object)) + where origin is the remote homeserver which generated the event. + + Fails with a ``CodeMessageException`` if the chosen remote server + returns a 300/400 code. + + Fails with a ``RuntimeError`` if no servers were reachable. """ valid_memberships = {Membership.JOIN, Membership.LEAVE} if membership not in valid_memberships: @@ -528,6 +533,27 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def send_join(self, destinations, pdu): + """Sends a join event to one of a list of homeservers. + + Doing so will cause the remote server to add the event to the graph, + and send the event out to the rest of the federation. + + Args: + destinations (str): Candidate homeservers which are probably + participating in the room. + pdu (BaseEvent): event to be sent + + Return: + Deferred: resolves to a dict with members ``origin`` (a string + giving the serer the event was sent to, ``state`` (?) and + ``auth_chain``. + + Fails with a ``CodeMessageException`` if the chosen remote server + returns a 300/400 code. + + Fails with a ``RuntimeError`` if no servers were reachable. + """ + for destination in destinations: if destination == self.server_name: continue @@ -635,6 +661,26 @@ class FederationClient(FederationBase): @defer.inlineCallbacks def send_leave(self, destinations, pdu): + """Sends a leave event to one of a list of homeservers. + + Doing so will cause the remote server to add the event to the graph, + and send the event out to the rest of the federation. + + This is mostly useful to reject received invites. + + Args: + destinations (str): Candidate homeservers which are probably + participating in the room. + pdu (BaseEvent): event to be sent + + Return: + Deferred: resolves to None. + + Fails with a ``CodeMessageException`` if the chosen remote server + returns a non-200 code. + + Fails with a ``RuntimeError`` if no servers were reachable. + """ for destination in destinations: if destination == self.server_name: continue diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 15a03378f5..8887c624da 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -193,6 +193,26 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function def make_membership_event(self, destination, room_id, user_id, membership): + """Asks a remote server to build and sign us a membership event + + Note that this does not append any events to any graphs. + + Args: + destination (str): address of remote homeserver + room_id (str): room to join/leave + user_id (str): user to be joined/left + membership (str): one of join/leave + + Returns: + Deferred: Succeeds when we get a 2xx HTTP response. The result + will be the decoded JSON body (ie, the new event). + + Fails with ``HTTPRequestException`` if we get an HTTP response + code >= 300. + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. + """ valid_memberships = {Membership.JOIN, Membership.LEAVE} if membership not in valid_memberships: raise RuntimeError( diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 62b4d7e93d..747a791f83 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -125,6 +125,8 @@ class MatrixFederationHttpClient(object): code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. + (May also fail with plenty of other Exceptions for things like DNS + failures, connection failures, SSL failures.) """ limiter = yield synapse.util.retryutils.get_retry_limiter( destination, @@ -302,8 +304,10 @@ class MatrixFederationHttpClient(object): Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result - will be the decoded JSON body. On a 4xx or 5xx error response a - CodeMessageException is raised. + will be the decoded JSON body. + + Fails with ``HTTPRequestException`` if we get an HTTP response + code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. @@ -360,8 +364,10 @@ class MatrixFederationHttpClient(object): try the request anyway. Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result - will be the decoded JSON body. On a 4xx or 5xx error response a - CodeMessageException is raised. + will be the decoded JSON body. + + Fails with ``HTTPRequestException`` if we get an HTTP response + code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. @@ -410,10 +416,11 @@ class MatrixFederationHttpClient(object): ignore_backoff (bool): true to ignore the historical backoff data and try the request anyway. Returns: - Deferred: Succeeds when we get *any* HTTP response. + Deferred: Succeeds when we get a 2xx HTTP response. The result + will be the decoded JSON body. - The result of the deferred is a tuple of `(code, response)`, - where `response` is a dict representing the decoded JSON body. + Fails with ``HTTPRequestException`` if we get an HTTP response + code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready to retry this server. -- cgit 1.5.1 From 91b39818007e955b15407c3dd9fd2d310d08842b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 21 Apr 2017 01:50:36 +0100 Subject: Try harder when sending leave events When we're rejecting invites, ignore the backoff data, so that we have a better chance of not getting the room out of sync. --- synapse/federation/transport/client.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 8887c624da..52b2a717d2 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -221,11 +221,23 @@ class TransportLayerClient(object): ) path = PREFIX + "/make_%s/%s/%s" % (membership, room_id, user_id) + ignore_backoff = False + retry_on_dns_fail = False + + if membership == Membership.LEAVE: + # we particularly want to do our best to send leave events. The + # problem is that if it fails, we won't retry it later, so if the + # remote server was just having a momentary blip, the room will be + # out of sync. + ignore_backoff = True + retry_on_dns_fail = True + content = yield self.client.get_json( destination=destination, path=path, - retry_on_dns_fail=False, + retry_on_dns_fail=retry_on_dns_fail, timeout=20000, + ignore_backoff=ignore_backoff, ) defer.returnValue(content) @@ -252,6 +264,12 @@ class TransportLayerClient(object): destination=destination, path=path, data=content, + + # we want to do our best to send this through. The problem is + # that if it fails, we won't retry it later, so if the remote + # server was just having a momentary blip, the room will be out of + # sync. + ignore_backoff=True, ) defer.returnValue(response) -- cgit 1.5.1 From 7166854f4169999fee0cd40a5ed389cc684b6dc8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 2 May 2017 10:36:35 +0100 Subject: Add cache for get_current_hosts_in_room --- synapse/federation/transaction_queue.py | 6 +----- synapse/state.py | 11 ++++++++++ synapse/storage/roommember.py | 38 +++++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 5 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index dee387eb7f..695f1a7375 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -24,7 +24,6 @@ from synapse.util.async import run_on_reactor from synapse.util.logcontext import preserve_context_over_fn, preserve_fn from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func -from synapse.types import get_domain_from_id from synapse.handlers.presence import format_user_presence_state, get_interested_remotes import synapse.metrics @@ -183,15 +182,12 @@ class TransactionQueue(object): # Otherwise if the last member on a server in a room is # banned then it won't receive the event because it won't # be in the room after the ban. - users_in_room = yield self.state.get_current_user_in_room( + destinations = yield self.state.get_current_hosts_in_room( event.room_id, latest_event_ids=[ prev_id for prev_id, _ in event.prev_events ], ) - destinations = set( - get_domain_from_id(user_id) for user_id in users_in_room - ) if send_on_behalf_of is not None: # If we are sending the event on behalf of another server # then it already has the event and there is no reason to diff --git a/synapse/state.py b/synapse/state.py index f6b83d888a..f8b18a4a2d 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -175,6 +175,17 @@ class StateHandler(object): ) defer.returnValue(joined_users) + @defer.inlineCallbacks + def get_current_hosts_in_room(self, room_id, latest_event_ids=None): + if not latest_event_ids: + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + logger.debug("calling resolve_state_groups from get_current_user_in_room") + entry = yield self.resolve_state_groups(room_id, latest_event_ids) + joined_hosts = yield self.store.get_joined_hosts( + room_id, entry.state_id, entry.state + ) + defer.returnValue(joined_hosts) + @defer.inlineCallbacks def compute_event_context(self, event, old_state=None): """Build an EventContext structure for the event. diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 7ad2198d96..1c0fa8a680 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -482,6 +482,44 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(False) + def get_joined_hosts(self, room_id, state_group, state_ids): + if not state_group: + # If state_group is None it means it has yet to be assigned a + # state group, i.e. we need to make sure that calls with a state_group + # of None don't hit previous cached calls with a None state_group. + # To do this we set the state_group to a new object as object() != object() + state_group = object() + + return self._get_joined_hosts( + room_id, state_group, state_ids + ) + + @cachedInlineCallbacks(num_args=3) + def _get_joined_hosts(self, room_id, state_group, current_state_ids): + # We don't use `state_group`, its there so that we can cache based + # on it. However, its important that its never None, since two current_state's + # with a state_group of None are likely to be different. + # See bulk_get_push_rules_for_room for how we work around this. + assert state_group is not None + + joined_hosts = set() + for (etype, state_key), event_id in current_state_ids.items(): + if etype == EventTypes.Member: + try: + host = get_domain_from_id(state_key) + except: + logger.warn("state_key not user_id: %s", state_key) + continue + + if host in joined_hosts: + continue + + event = yield self.get_event(event_id, allow_none=True) + if event and event.content["membership"] == Membership.JOIN: + joined_hosts.add(host) + + defer.returnValue(joined_hosts) + @defer.inlineCallbacks def _background_add_membership_profile(self, progress, batch_size): target_min_stream_id = progress.get( -- cgit 1.5.1 From db7d0c31272954f01d89b2dbd653d54db0cbb040 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 May 2017 10:34:53 +0100 Subject: Always mark remotes as up if we receive a signed request from them --- synapse/federation/transport/server.py | 7 +++++++ 1 file changed, 7 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index c840da834c..828dcd01a7 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -79,6 +79,7 @@ class Authenticator(object): def __init__(self, hs): self.keyring = hs.get_keyring() self.server_name = hs.hostname + self.store = hs.get_datastore() # A method just so we can pass 'self' as the authenticator to the Servlets @defer.inlineCallbacks @@ -138,6 +139,12 @@ class Authenticator(object): logger.info("Request from %s", origin) request.authenticated_entity = origin + # If we get a valid signed request from the other side, its probably + # alive + retry_timings = yield self.store.get_destination_retry_timings(origin) + if retry_timings and retry_timings["retry_last_ts"]: + self.store.set_destination_retry_timings(origin, 0, 0) + defer.returnValue(origin) -- cgit 1.5.1 From 310b1ccdc1e80164811d4b1287c0a504d0a33c77 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 May 2017 13:41:19 +0100 Subject: Use preserve_fn and add logs --- synapse/federation/transport/server.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 828dcd01a7..3d676e7d8b 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -24,6 +24,7 @@ from synapse.http.servlet import ( ) from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string +from synapse.util.logcontext import preserve_fn from synapse.types import ThirdPartyInstanceID import functools @@ -143,7 +144,8 @@ class Authenticator(object): # alive retry_timings = yield self.store.get_destination_retry_timings(origin) if retry_timings and retry_timings["retry_last_ts"]: - self.store.set_destination_retry_timings(origin, 0, 0) + logger.info("Marking origin %r as up", origin) + preserve_fn(self.store.set_destination_retry_timings)(origin, 0, 0) defer.returnValue(origin) -- cgit 1.5.1 From de042b3b885aba6b1508ca50e033fb7a95893553 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 9 May 2017 19:01:39 +0100 Subject: Do some logging when one-time-keys get claimed might help us figure out if https://github.com/vector-im/riot-web/issues/3868 has happened. --- synapse/federation/federation_server.py | 10 ++++++++++ synapse/handlers/e2e_keys.py | 10 ++++++++++ tests/handlers/test_e2e_keys.py | 34 +++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index bc20b9c201..51e3fdea06 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -440,6 +440,16 @@ class FederationServer(FederationBase): key_id: json.loads(json_bytes) } + logger.info( + "Claimed one-time-keys: %s", + ",".join(( + "%s for %s:%s" % (key_id, user_id, device_id) + for user_id, user_keys in json_result.iteritems() + for device_id, device_keys in user_keys.iteritems() + for key_id, _ in device_keys.iteritems() + )), + ) + defer.returnValue({"one_time_keys": json_result}) @defer.inlineCallbacks diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 9d994a8f71..73921a5307 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -262,6 +262,16 @@ class E2eKeysHandler(object): for destination in remote_queries ])) + logger.info( + "Claimed one-time-keys: %s", + ",".join(( + "%s for %s:%s" % (key_id, user_id, device_id) + for user_id, user_keys in json_result.iteritems() + for device_id, device_keys in user_keys.iteritems() + for key_id, _ in device_keys.iteritems() + )), + ) + defer.returnValue({ "one_time_keys": json_result, "failures": failures diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index f10a80a8e1..19f5ed6bce 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -142,3 +142,37 @@ class E2eKeysHandlerTestCase(unittest.TestCase): self.fail("No error when replacing dict key") except errors.SynapseError: pass + + @unittest.DEBUG + @defer.inlineCallbacks + def test_claim_one_time_key(self): + local_user = "@boris:" + self.hs.hostname + device_id = "xyz" + keys = { + "alg1:k1": "key1", + } + + res = yield self.handler.upload_keys_for_user( + local_user, device_id, {"one_time_keys": keys}, + ) + self.assertDictEqual(res, { + "one_time_key_counts": {"alg1": 1} + }) + + res2 = yield self.handler.claim_one_time_keys({ + "one_time_keys": { + local_user: { + device_id: "alg1" + } + } + }, timeout=None) + self.assertEqual(res2, { + "failures": {}, + "one_time_keys": { + local_user: { + device_id: { + "alg1:k1": "key1" + } + } + } + }) -- cgit 1.5.1 From ec5c4499f4ab24445c6df7310007353b466020ce Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 16 May 2017 14:46:16 +0100 Subject: Make presence use cached users/hosts in room --- synapse/federation/transaction_queue.py | 2 +- synapse/handlers/presence.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 695f1a7375..a15198e05d 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -285,7 +285,7 @@ class TransactionQueue(object): Args: states (list(UserPresenceState)) """ - hosts_and_states = yield get_interested_remotes(self.store, states) + hosts_and_states = yield get_interested_remotes(self.store, states, self.state) for destinations, states in hosts_and_states: for destination in destinations: diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index f3707afcd0..c7c0b0a1e2 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -780,12 +780,12 @@ class PresenceHandler(object): # don't need to send to local clients here, as that is done as part # of the event stream/sync. # TODO: Only send to servers not already in the room. - user_ids = yield self.store.get_users_in_room(room_id) if self.is_mine(user): state = yield self.current_state_for_user(user.to_string()) self._push_to_remotes([state]) else: + user_ids = yield self.store.get_users_in_room(room_id) user_ids = filter(self.is_mine_id, user_ids) states = yield self.current_state_for_users(user_ids) @@ -1322,7 +1322,7 @@ def get_interested_parties(store, states): @defer.inlineCallbacks -def get_interested_remotes(store, states): +def get_interested_remotes(store, states, state_handler): """Given a list of presence states figure out which remote servers should be sent which. @@ -1345,7 +1345,7 @@ def get_interested_remotes(store, states): room_ids_to_states, users_to_states = yield get_interested_parties(store, states) for room_id, states in room_ids_to_states.iteritems(): - hosts = yield store.get_hosts_in_room(room_id) + hosts = yield state_handler.get_current_hosts_in_room(room_id) hosts_and_states.append((hosts, states)) for user_id, states in users_to_states.iteritems(): -- cgit 1.5.1