summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-06-07 11:07:27 +0100
committerErik Johnston <erik@matrix.org>2016-06-07 11:07:27 +0100
commitdbd788c2621d3f6af3dfe1624f81858ea561fcc5 (patch)
tree99a1c75673f8dc629b1405a1a1ebf5caa3823468
parentMerge branch 'develop' of github.com:matrix-org/synapse into erikj/timings (diff)
parentFix AS retries, but with correct ordering (diff)
downloadsynapse-erikj/timings.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/timings github/erikj/timings erikj/timings
-rw-r--r--synapse/app/pusher.py2
-rw-r--r--synapse/app/synchrotron.py7
-rw-r--r--synapse/handlers/presence.py73
-rw-r--r--synapse/storage/appservice.py2
-rw-r--r--synapse/storage/events.py50
-rw-r--r--tests/storage/test_appservice.py2
6 files changed, 97 insertions, 39 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index f1de1e7ce9..3c3fa38053 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -311,7 +311,7 @@ class PusherServer(HomeServer):
                 poke_pushers(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
-                sleep(30)
+                yield sleep(30)
 
 
 def setup(config_options):
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index aa81e1c5da..5c552ffb29 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -187,7 +187,10 @@ class SynchrotronPresence(object):
                 yield self._send_syncing_users_now()
 
         def _end():
-            if affect_presence:
+            # We check that the user_id is in user_to_num_current_syncs because
+            # user_to_num_current_syncs may have been cleared if we are
+            # shutting down.
+            if affect_presence and user_id in self.user_to_num_current_syncs:
                 self.user_to_num_current_syncs[user_id] -= 1
 
         @contextlib.contextmanager
@@ -443,7 +446,7 @@ class SynchrotronServer(HomeServer):
                 notify(result)
             except:
                 logger.exception("Error replicating from %r", replication_url)
-                sleep(5)
+                yield sleep(5)
 
     def build_presence_handler(self):
         return SynchrotronPresence(self)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 0e19f777b8..6b70fa3817 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -183,7 +183,7 @@ class PresenceHandler(object):
         # The initial delay is to allow disconnected clients a chance to
         # reconnect before we treat them as offline.
         self.clock.call_later(
-            30 * 1000,
+            30,
             self.clock.looping_call,
             self._handle_timeouts,
             5000,
@@ -283,44 +283,48 @@ class PresenceHandler(object):
         """Checks the presence of users that have timed out and updates as
         appropriate.
         """
+        logger.info("Handling presence timeouts")
         now = self.clock.time_msec()
 
-        with Measure(self.clock, "presence_handle_timeouts"):
-            # Fetch the list of users that *may* have timed out. Things may have
-            # changed since the timeout was set, so we won't necessarily have to
-            # take any action.
-            users_to_check = set(self.wheel_timer.fetch(now))
-
-            # Check whether the lists of syncing processes from an external
-            # process have expired.
-            expired_process_ids = [
-                process_id for process_id, last_update
-                in self.external_process_last_update.items()
-                if now - last_update > EXTERNAL_PROCESS_EXPIRY
-            ]
-            for process_id in expired_process_ids:
-                users_to_check.update(
-                    self.external_process_to_current_syncs.pop(process_id, ())
-                )
-                self.external_process_last_update.pop(process_id)
+        try:
+            with Measure(self.clock, "presence_handle_timeouts"):
+                # Fetch the list of users that *may* have timed out. Things may have
+                # changed since the timeout was set, so we won't necessarily have to
+                # take any action.
+                users_to_check = set(self.wheel_timer.fetch(now))
+
+                # Check whether the lists of syncing processes from an external
+                # process have expired.
+                expired_process_ids = [
+                    process_id for process_id, last_update
+                    in self.external_process_last_updated_ms.items()
+                    if now - last_update > EXTERNAL_PROCESS_EXPIRY
+                ]
+                for process_id in expired_process_ids:
+                    users_to_check.update(
+                        self.external_process_last_updated_ms.pop(process_id, ())
+                    )
+                    self.external_process_last_update.pop(process_id)
 
-            states = [
-                self.user_to_current_state.get(
-                    user_id, UserPresenceState.default(user_id)
-                )
-                for user_id in users_to_check
-            ]
+                states = [
+                    self.user_to_current_state.get(
+                        user_id, UserPresenceState.default(user_id)
+                    )
+                    for user_id in users_to_check
+                ]
 
-            timers_fired_counter.inc_by(len(states))
+                timers_fired_counter.inc_by(len(states))
 
-            changes = handle_timeouts(
-                states,
-                is_mine_fn=self.is_mine_id,
-                syncing_users=self.get_syncing_users(),
-                now=now,
-            )
+                changes = handle_timeouts(
+                    states,
+                    is_mine_fn=self.is_mine_id,
+                    syncing_user_ids=self.get_currently_syncing_users(),
+                    now=now,
+                )
 
-        preserve_fn(self._update_states)(changes)
+            preserve_fn(self._update_states)(changes)
+        except:
+            logger.exception("Exception in _handle_timeouts loop")
 
     @defer.inlineCallbacks
     def bump_presence_active_time(self, user):
@@ -402,7 +406,8 @@ class PresenceHandler(object):
             user_id for user_id, count in self.user_to_num_current_syncs.items()
             if count
         }
-        syncing_user_ids.update(self.external_process_to_current_syncs.values())
+        for user_ids in self.external_process_to_current_syncs.values():
+            syncing_user_ids.update(user_ids)
         return syncing_user_ids
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index ffb7d4a25b..d1ee533fac 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -320,7 +320,7 @@ class ApplicationServiceTransactionStore(SQLBaseStore):
 
         event_ids = json.loads(entry["event_ids"])
 
-        events = yield self.get_events(event_ids)
+        events = yield self._get_events(event_ids)
 
         defer.returnValue(AppServiceTransaction(
             service=service, id=entry["txn_id"], events=events
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5db24e86f9..6d978ffcd5 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -27,6 +27,9 @@ from synapse.api.constants import EventTypes
 from canonicaljson import encode_canonical_json
 from collections import deque, namedtuple
 
+import synapse
+import synapse.metrics
+
 
 import logging
 import math
@@ -35,6 +38,10 @@ import ujson as json
 logger = logging.getLogger(__name__)
 
 
+metrics = synapse.metrics.get_metrics_for(__name__)
+persist_event_counter = metrics.register_counter("persisted_events")
+
+
 def encode_json(json_object):
     if USE_FROZEN_DICTS:
         # ujson doesn't like frozen_dicts
@@ -261,6 +268,7 @@ class EventsStore(SQLBaseStore):
                         events_and_contexts=chunk,
                         backfilled=backfilled,
                     )
+                    persist_event_counter.inc_by(len(chunk))
 
     @defer.inlineCallbacks
     @log_function
@@ -278,6 +286,7 @@ class EventsStore(SQLBaseStore):
                         current_state=current_state,
                         backfilled=backfilled,
                     )
+                    persist_event_counter.inc()
         except _RollbackButIsFineException:
             pass
 
@@ -635,6 +644,8 @@ class EventsStore(SQLBaseStore):
             ],
         )
 
+        self._add_to_cache(txn, events_and_contexts)
+
         if backfilled:
             # Backfilled events come before the current state so we don't need
             # to update the current state table
@@ -676,6 +687,45 @@ class EventsStore(SQLBaseStore):
 
         return
 
+    def _add_to_cache(self, txn, events_and_contexts):
+        to_prefill = []
+
+        rows = []
+        N = 200
+        for i in range(0, len(events_and_contexts), N):
+            ev_map = {
+                e[0].event_id: e[0]
+                for e in events_and_contexts[i:i + N]
+            }
+            if not ev_map:
+                break
+
+            sql = (
+                "SELECT "
+                " e.event_id as event_id, "
+                " r.redacts as redacts,"
+                " rej.event_id as rejects "
+                " FROM events as e"
+                " LEFT JOIN rejections as rej USING (event_id)"
+                " LEFT JOIN redactions as r ON e.event_id = r.redacts"
+                " WHERE e.event_id IN (%s)"
+            ) % (",".join(["?"] * len(ev_map)),)
+
+            txn.execute(sql, ev_map.keys())
+            rows = self.cursor_to_dict(txn)
+            for row in rows:
+                event = ev_map[row["event_id"]]
+                if not row["rejects"] and not row["redacts"]:
+                    to_prefill.append(_EventCacheEntry(
+                        event=event,
+                        redacted_event=None,
+                    ))
+
+        def prefill():
+            for cache_entry in to_prefill:
+                self._get_event_cache.prefill((cache_entry[0].event_id,), cache_entry)
+        txn.call_after(prefill)
+
     def _store_redaction(self, txn, event):
         # invalidate the cache for the redacted event
         txn.call_after(self._invalidate_get_event_cache, event.redacts)
diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py
index f44c4870e3..3e2862daae 100644
--- a/tests/storage/test_appservice.py
+++ b/tests/storage/test_appservice.py
@@ -357,7 +357,7 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase):
         other_events = [Mock(event_id="e5"), Mock(event_id="e6")]
 
         # we aren't testing store._base stuff here, so mock this out
-        self.store.get_events = Mock(return_value=events)
+        self.store._get_events = Mock(return_value=events)
 
         yield self._insert_txn(self.as_list[1]["id"], 9, other_events)
         yield self._insert_txn(service.id, 10, events)