From a5798de06784470d941ddc8084655e9d0e038eb7 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Mar 2019 09:58:42 +0000 Subject: Move replication.tcp.streams into a package --- synapse/app/federation_sender.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 9711a7147c..1d43f2b075 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -38,7 +38,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler -from synapse.replication.tcp.streams import ReceiptsStream +from synapse.replication.tcp.streams._base import ReceiptsStream from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.types import ReadReceipt -- cgit 1.4.1 From 1f6d6f918a57183083c2dd1bba6179373102b918 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Mar 2019 15:18:28 +0000 Subject: Make EventStream rows have a type ... as a precursor to combining it with the CurrentStateDelta stream. --- synapse/app/synchrotron.py | 5 +- synapse/replication/slave/storage/events.py | 8 ++- synapse/replication/tcp/protocol.py | 4 +- synapse/replication/tcp/streams/events.py | 98 +++++++++++++++++++++++++---- 4 files changed, 98 insertions(+), 17 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 9163b56d86..5388def28a 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -48,6 +48,7 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.streams.events import EventsStreamEventRow from synapse.rest.client.v1 import events from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.rest.client.v1.room import RoomInitialSyncRestServlet @@ -369,7 +370,9 @@ class SyncReplicationHandler(ReplicationClientHandler): # We shouldn't get multiple rows per token for events stream, so # we don't need to optimise this for multiple rows. for row in rows: - event = yield self.store.get_event(row.event_id) + if row.type != EventsStreamEventRow.TypeId: + continue + event = yield self.store.get_event(row.data.event_id) extra_users = () if event.type == EventTypes.Member: extra_users = (event.state_key,) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 4830c68f35..c57385d92f 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -16,6 +16,7 @@ import logging from synapse.api.constants import EventTypes +from synapse.replication.tcp.streams.events import EventsStreamEventRow from synapse.storage.event_federation import EventFederationWorkerStore from synapse.storage.event_push_actions import EventPushActionsWorkerStore from synapse.storage.events_worker import EventsWorkerStore @@ -79,9 +80,12 @@ class SlavedEventStore(EventFederationWorkerStore, if stream_name == "events": self._stream_id_gen.advance(token) for row in rows: + if row.type != EventsStreamEventRow.TypeId: + continue + data = row.data self.invalidate_caches_for_event( - token, row.event_id, row.room_id, row.type, row.state_key, - row.redacts, + token, data.event_id, data.room_id, data.type, data.state_key, + data.redacts, backfilled=False, ) elif stream_name == "backfill": diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 9daec2c995..b51590cf8f 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -42,8 +42,8 @@ indicate which side is sending, these are *not* included on the wire:: > POSITION backfill 1 > POSITION caches 1 > RDATA caches 2 ["get_user_by_id",["@01register-user:localhost:8823"],1490197670513] - > RDATA events 14 ["$149019767112vOHxz:localhost:8823", - "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null] + > RDATA events 14 ["ev", ["$149019767112vOHxz:localhost:8823", + "!AFDCvgApUmpdfVjIXm:localhost:8823","m.room.guest_access","",null]] < PING 1490197675618 > ERROR server stopping * connection closed by server * diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 511dd6bcc7..928028e893 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -13,28 +13,102 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from collections import namedtuple + +import attr + +from twisted.internet import defer from ._base import Stream -EventStreamRow = namedtuple("EventStreamRow", ( - "event_id", # str - "room_id", # str - "type", # str - "state_key", # str, optional - "redacts", # str, optional -)) + +"""Handling of the 'events' replication stream + +This stream contains rows of various types. Each row therefore contains a 'type' +identifier before the real data. For example:: + + RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]] + +An "ev" row is sent for each new event. The fields in the data part are: + + * The new event id + * The room id for the event + * The type of the new event + * The state key of the event, for state events + * The event id of an event which is redacted by this event. + +""" + + +@attr.s(slots=True, frozen=True) +class EventsStreamRow(object): + """A parsed row from the events replication stream""" + type = attr.ib() # str: the TypeId of one of the *EventsStreamRows + data = attr.ib() # BaseEventsStreamRow + + +class BaseEventsStreamRow(object): + """Base class for rows to be sent in the events stream. + + Specifies how to identify, serialize and deserialize the different types. + """ + + TypeId = None # Unique string that ids the type. Must be overriden in sub classes. + + @classmethod + def from_data(cls, data): + """Parse the data from the replication stream into a row. + + By default we just call the constructor with the data list as arguments + + Args: + data: The value of the data object from the replication stream + """ + return cls(*data) + + +@attr.s(slots=True, frozen=True) +class EventsStreamEventRow(BaseEventsStreamRow): + TypeId = "ev" + + event_id = attr.ib() # str + room_id = attr.ib() # str + type = attr.ib() # str + state_key = attr.ib() # str, optional + redacts = attr.ib() # str, optional + + +TypeToRow = { + Row.TypeId: Row + for Row in ( + EventsStreamEventRow, + ) +} class EventsStream(Stream): """We received a new event, or an event went from being an outlier to not """ NAME = "events" - ROW_TYPE = EventStreamRow def __init__(self, hs): - store = hs.get_datastore() - self.current_token = store.get_current_events_token - self.update_function = store.get_all_new_forward_event_rows + self._store = hs.get_datastore() + self.current_token = self._store.get_current_events_token super(EventsStream, self).__init__(hs) + + @defer.inlineCallbacks + def update_function(self, from_token, current_token, limit=None): + event_rows = yield self._store.get_all_new_forward_event_rows( + from_token, current_token, limit, + ) + event_updates = ( + (row[0], EventsStreamEventRow.TypeId, row[1:]) + for row in event_rows + ) + defer.returnValue(event_updates) + + @classmethod + def parse_row(cls, row): + (typ, data) = row + data = TypeToRow[typ].from_data(data) + return EventsStreamRow(typ, data) -- cgit 1.4.1 From 4b91c313a94a4a89998e097e79a96a4423cf1b9f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 27 Mar 2019 16:15:59 +0000 Subject: Combine the CurrentStateDeltaStream into the EventStream --- synapse/app/user_dir.py | 17 +++++++++------ synapse/replication/tcp/streams/__init__.py | 1 - synapse/replication/tcp/streams/_base.py | 21 ------------------ synapse/replication/tcp/streams/events.py | 34 ++++++++++++++++++++++++++++- synapse/storage/events.py | 3 --- 5 files changed, 43 insertions(+), 33 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index d1ab9512cd..355f5aa71d 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -36,6 +36,10 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.replication.tcp.streams.events import ( + EventsStream, + EventsStreamCurrentStateRow, +) from synapse.rest.client.v2_alpha import user_directory from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -73,19 +77,18 @@ class UserDirectorySlaveStore( prefilled_cache=curr_state_delta_prefill, ) - self._current_state_delta_pos = events_max - def stream_positions(self): result = super(UserDirectorySlaveStore, self).stream_positions() - result["current_state_deltas"] = self._current_state_delta_pos return result def process_replication_rows(self, stream_name, token, rows): - if stream_name == "current_state_deltas": - self._current_state_delta_pos = token + if stream_name == EventsStream.NAME: + self._stream_id_gen.advance(token) for row in rows: + if row.type != EventsStreamCurrentStateRow.TypeId: + continue self._curr_state_delta_stream_cache.entity_has_changed( - row.room_id, token + row.data.room_id, token ) return super(UserDirectorySlaveStore, self).process_replication_rows( stream_name, token, rows @@ -170,7 +173,7 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): yield super(UserDirectoryReplicationHandler, self).on_rdata( stream_name, token, rows ) - if stream_name == "current_state_deltas": + if stream_name == EventsStream.NAME: run_in_background(self._notify_directory) @defer.inlineCallbacks diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py index 5c715e3bfa..634f636dc9 100644 --- a/synapse/replication/tcp/streams/__init__.py +++ b/synapse/replication/tcp/streams/__init__.py @@ -44,7 +44,6 @@ STREAMS_MAP = { federation.FederationStream, _base.TagAccountDataStream, _base.AccountDataStream, - _base.CurrentStateDeltaStream, _base.GroupServerStream, ) } diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 13ab1bee05..8971a6a22e 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -91,12 +91,6 @@ AccountDataStreamRow = namedtuple("AccountDataStream", ( "data_type", # str "data", # dict )) -CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( - "room_id", # str - "type", # str - "state_key", # str - "event_id", # str, optional -)) GroupsStreamRow = namedtuple("GroupsStreamRow", ( "group_id", # str "user_id", # str @@ -428,21 +422,6 @@ class AccountDataStream(Stream): defer.returnValue(results) -class CurrentStateDeltaStream(Stream): - """Current state for a room was changed - """ - NAME = "current_state_deltas" - ROW_TYPE = CurrentStateDeltaStreamRow - - def __init__(self, hs): - store = hs.get_datastore() - - self.current_token = store.get_max_current_state_delta_stream_id - self.update_function = store.get_all_updated_current_state_deltas - - super(CurrentStateDeltaStream, self).__init__(hs) - - class GroupServerStream(Stream): NAME = "groups" ROW_TYPE = GroupsStreamRow diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index 928028e893..e0f6e29248 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -13,6 +13,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import heapq import attr @@ -26,6 +27,7 @@ from ._base import Stream This stream contains rows of various types. Each row therefore contains a 'type' identifier before the real data. For example:: + RDATA events batch ["state", ["!room:id", "m.type", "", "$event:id"]] RDATA events 12345 ["ev", ["$event:id", "!room:id", "m.type", null, null]] An "ev" row is sent for each new event. The fields in the data part are: @@ -36,6 +38,14 @@ An "ev" row is sent for each new event. The fields in the data part are: * The state key of the event, for state events * The event id of an event which is redacted by this event. +A "state" row is sent whenever the "current state" in a room changes. The fields in the +data part are: + + * The room id for the state change + * The event type of the state which has changed + * The state_key of the state which has changed + * The event id of the new state + """ @@ -77,10 +87,21 @@ class EventsStreamEventRow(BaseEventsStreamRow): redacts = attr.ib() # str, optional +@attr.s(slots=True, frozen=True) +class EventsStreamCurrentStateRow(BaseEventsStreamRow): + TypeId = "state" + + room_id = attr.ib() # str + type = attr.ib() # str + state_key = attr.ib() # str + event_id = attr.ib() # str, optional + + TypeToRow = { Row.TypeId: Row for Row in ( EventsStreamEventRow, + EventsStreamCurrentStateRow, ) } @@ -105,7 +126,18 @@ class EventsStream(Stream): (row[0], EventsStreamEventRow.TypeId, row[1:]) for row in event_rows ) - defer.returnValue(event_updates) + + state_rows = yield self._store.get_all_updated_current_state_deltas( + from_token, current_token, limit + ) + state_updates = ( + (row[0], EventsStreamCurrentStateRow.TypeId, row[1:]) + for row in state_rows + ) + + all_updates = heapq.merge(event_updates, state_updates) + + defer.returnValue(all_updates) @classmethod def parse_row(cls, row): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 428300ea0a..0b0a4dcdd3 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2287,9 +2287,6 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"]))) - def get_max_current_state_delta_stream_id(self): - return self._stream_id_gen.get_current_token() - def get_all_updated_current_state_deltas(self, from_token, to_token, limit): def get_all_updated_current_state_deltas_txn(txn): sql = """ -- cgit 1.4.1 From b25e387c0d0eef77e0ba20cb9f12b67272664bae Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Mon, 8 Apr 2019 15:47:39 +0100 Subject: add context to phonehome stats (#5020) add context to phonehome stats --- changelog.d/5020.feature | 1 + docs/sample_config.yaml | 3 +++ synapse/app/homeserver.py | 2 +- synapse/config/server.py | 4 ++++ 4 files changed, 9 insertions(+), 1 deletion(-) create mode 100644 changelog.d/5020.feature (limited to 'synapse/app') diff --git a/changelog.d/5020.feature b/changelog.d/5020.feature new file mode 100644 index 0000000000..71f7a8db2e --- /dev/null +++ b/changelog.d/5020.feature @@ -0,0 +1 @@ +Add context to phonehome stats. diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index f6b3fac6cd..a380b5f9a8 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -236,6 +236,9 @@ listeners: # - medium: 'email' # address: 'reserved_user@example.com' +# Used by phonehome stats to group together related servers. +#server_context: context + ## TLS ## diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 869c028d1f..79be977ea6 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -518,6 +518,7 @@ def run(hs): uptime = 0 stats["homeserver"] = hs.config.server_name + stats["server_context"] = hs.config.server_context stats["timestamp"] = now stats["uptime_seconds"] = uptime version = sys.version_info @@ -558,7 +559,6 @@ def run(hs): stats["database_engine"] = hs.get_datastore().database_engine_name stats["database_server_version"] = hs.get_datastore().get_server_version() - logger.info("Reporting stats to matrix.org: %s" % (stats,)) try: yield hs.get_simple_http_client().put_json( diff --git a/synapse/config/server.py b/synapse/config/server.py index 08e4e45482..c5e5679d52 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -37,6 +37,7 @@ class ServerConfig(Config): def read_config(self, config): self.server_name = config["server_name"] + self.server_context = config.get("server_context", None) try: parse_and_validate_server_name(self.server_name) @@ -484,6 +485,9 @@ class ServerConfig(Config): #mau_limit_reserved_threepids: # - medium: 'email' # address: 'reserved_user@example.com' + + # Used by phonehome stats to group together related servers. + #server_context: context """ % locals() def read_arguments(self, args): -- cgit 1.4.1 From d5adf297e6d6688d9410e39d7d90a0f9f01173eb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Apr 2019 17:13:16 +0100 Subject: Move some rest endpoints to client reader --- docs/workers.rst | 3 +++ synapse/app/client_reader.py | 11 +++++++---- 2 files changed, 10 insertions(+), 4 deletions(-) (limited to 'synapse/app') diff --git a/docs/workers.rst b/docs/workers.rst index d80fc04d2e..d9d545610b 100644 --- a/docs/workers.rst +++ b/docs/workers.rst @@ -227,6 +227,9 @@ following regular expressions:: ^/_matrix/client/(api/v1|r0|unstable)/account/3pid$ ^/_matrix/client/(api/v1|r0|unstable)/keys/query$ ^/_matrix/client/(api/v1|r0|unstable)/keys/changes$ + ^/_matrix/client/versions$ + ^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$ + ^/_matrix/client/(api/v1|r0|unstable)/pushrules/.*$ Additionally, the following REST endpoints can be handled, but all requests must be routed to the same instance:: diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index beaea64a61..1e9e686107 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -45,6 +45,7 @@ from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1.login import LoginRestServlet +from synapse.rest.client.v1.push_rule import PushRuleRestServlet from synapse.rest.client.v1.room import ( JoinedRoomMemberListRestServlet, PublicRoomListRestServlet, @@ -52,9 +53,11 @@ from synapse.rest.client.v1.room import ( RoomMemberListRestServlet, RoomStateRestServlet, ) +from synapse.rest.client.v1.voip import VoipRestServlet from synapse.rest.client.v2_alpha.account import ThreepidRestServlet from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet from synapse.rest.client.v2_alpha.register import RegisterRestServlet +from synapse.rest.client.versions import VersionsRestServlet from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree @@ -109,12 +112,12 @@ class ClientReaderServer(HomeServer): ThreepidRestServlet(self).register(resource) KeyQueryServlet(self).register(resource) KeyChangesServlet(self).register(resource) + VoipRestServlet(self).register(resource) + PushRuleRestServlet(self).register(resource) + VersionsRestServlet(self).register(resource) resources.update({ - "/_matrix/client/r0": resource, - "/_matrix/client/unstable": resource, - "/_matrix/client/v2_alpha": resource, - "/_matrix/client/api/v1": resource, + "/_matrix/client": resource, }) root_resource = create_resource_tree(resources, NoResource()) -- cgit 1.4.1 From 38642614cffb107bb69abf215eb5144681bc4491 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Apr 2019 19:21:32 +0100 Subject: VersionRestServlet doesn't take a param --- synapse/app/client_reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 1e9e686107..864f1eac48 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -114,7 +114,7 @@ class ClientReaderServer(HomeServer): KeyChangesServlet(self).register(resource) VoipRestServlet(self).register(resource) PushRuleRestServlet(self).register(resource) - VersionsRestServlet(self).register(resource) + VersionsRestServlet().register(resource) resources.update({ "/_matrix/client": resource, -- cgit 1.4.1