diff options
-rw-r--r-- | synapse/app/synchrotron.py | 21 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 1 | ||||
-rw-r--r-- | synapse/storage/_base.py | 1 |
3 files changed, 23 insertions, 0 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 623a73f67a..6a0592f41d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -16,10 +16,12 @@ import synapse +from synapse.api.constants import EventTypes from synapse.config._base import ConfigError from synapse.config.database import DatabaseConfig from synapse.config.logger import LoggingConfig from synapse.config.appservice import AppServiceConfig +from synapse.events import FrozenEvent from synapse.http.site import SynapseSite from synapse.http.server import JsonResource from synapse.metrics.resource import MetricsResource, METRICS_PREFIX @@ -49,6 +51,7 @@ from daemonize import Daemonize import sys import logging import contextlib +import ujson as json logger = logging.getLogger("synapse.app.synchrotron") @@ -226,10 +229,27 @@ class SynchrotronServer(HomeServer): store = self.get_datastore() replication_url = self.config.replication_url clock = self.get_clock() + notifier = self.get_notifier() def expire_broken_caches(): store.who_forgot_in_room.invalidate_all() + def notify(result): + stream = result.get("events") + if stream: + max_position = stream["position"] + for row in stream["rows"]: + position = row[0] + internal = json.loads(row[1]) + event_json = json.loads(row[2]) + event = FrozenEvent(event_json, internal_metadata_dict=internal) + extra_users = () + if event.type == EventTypes.Member: + extra_users = (event.state_key,) + notifier.on_new_room_event( + event, position, max_position, extra_users + ) + next_expire_broken_caches_ms = 0 while True: try: @@ -244,6 +264,7 @@ class SynchrotronServer(HomeServer): now_ms + store.BROKEN_CACHE_EXPIRY_MS ) yield store.process_replication(result) + notify(result) except: logger.exception("Error replicating from %r", replication_url) sleep(5) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index d768e2aabe..cbc1ae4190 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -123,6 +123,7 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_rooms = ( DataStore.get_room_events_stream_for_rooms.__func__ ) + get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__ _set_before_and_after = staticmethod(DataStore._set_before_and_after) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 56a0dd80f3..e0d7098692 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -153,6 +153,7 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs self._db_pool = hs.get_db_pool() + self._clock = hs.get_clock() self._previous_txn_total_time = 0 self._current_txn_total_time = 0 |