summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2014-12-04 11:27:59 +0000
committerErik Johnston <erik@matrix.org>2014-12-04 11:27:59 +0000
commit5d7c9ab7898f2721aa3f60ab76c53dc44322be77 (patch)
tree7a6209af97b35e8d62db575327217c23592e68af
parentWIP for new way of managing events. (diff)
downloadsynapse-5d7c9ab7898f2721aa3f60ab76c53dc44322be77.tar.xz
Begin converting things to use the new Event structure
-rw-r--r--synapse/api/auth.py11
-rw-r--r--synapse/events/__init__.py3
-rw-r--r--synapse/federation/replication.py12
-rw-r--r--synapse/handlers/_base.py65
-rw-r--r--synapse/handlers/federation.py9
-rw-r--r--synapse/handlers/room.py1
-rw-r--r--synapse/state.py18
-rw-r--r--synapse/storage/signatures.py16
8 files changed, 96 insertions, 39 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 50d4be113f..5261c3e3bf 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -393,18 +393,11 @@ class Auth(object):
             if member_event.content["membership"] == Membership.JOIN:
                 auth_events.append(member_event.event_id)
 
-        hashes = yield self.store.get_event_reference_hashes(
+        auth_events = yield self.store.add_event_hashes(
             auth_events
         )
-        hashes = [
-            {
-                k: encode_base64(v) for k, v in h.items()
-                if k == "sha256"
-            }
-            for h in hashes
-        ]
 
-        defer.returnValue(zip(auth_events, hashes))
+        defer.returnValue(auth_events)
 
     @log_function
     def _can_send_event(self, event, auth_events):
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 6748198917..6a05ba2d16 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -81,6 +81,9 @@ class EventBase(object):
     type = _event_dict_property("type")
     user_id = _event_dict_property("sender")
 
+    def is_state(self):
+        return hasattr(self, "state_key")
+
     def get_dict(self):
         d = dict(self._original)
         d.update({
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 01f87fe423..bd56a4c108 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -112,7 +112,7 @@ class ReplicationLayer(object):
         self.query_handlers[query_type] = handler
 
     @log_function
-    def send_pdu(self, pdu):
+    def send_pdu(self, pdu, destinations):
         """Informs the replication layer about a new PDU generated within the
         home server that should be transmitted to others.
 
@@ -131,7 +131,7 @@ class ReplicationLayer(object):
         logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
 
         # TODO, add errback, etc.
-        self._transaction_queue.enqueue_pdu(pdu, order)
+        self._transaction_queue.enqueue_pdu(pdu, destinations, order)
 
         logger.debug(
             "[%s] transaction_layer.enqueue_pdu... done",
@@ -705,15 +705,13 @@ class _TransactionQueue(object):
 
     @defer.inlineCallbacks
     @log_function
-    def enqueue_pdu(self, pdu, order):
+    def enqueue_pdu(self, pdu, destinations, order):
         # We loop through all destinations to see whether we already have
         # a transaction in progress. If we do, stick it in the pending_pdus
         # table and we'll get back to it later.
 
-        destinations = set([
-            d for d in pdu.destinations
-            if d != self.server_name
-        ])
+        destinations = set(destinations)
+        destinations.remove(self.server_name)
 
         logger.debug("Sending to: %s", str(destinations))
 
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index a45715bf60..890b51be30 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -15,11 +15,11 @@
 
 from twisted.internet import defer
 
-from synapse.api.errors import LimitExceededError
+from synapse.api.errors import LimitExceededError, SynapseError
 from synapse.util.async import run_on_reactor
 from synapse.crypto.event_signing import add_hashes_and_signatures
 from synapse.api.events.room import RoomMemberEvent
-from synapse.api.constants import Membership
+from synapse.api.constants import Membership, EventTypes
 
 from synapse.events.snapshot import EventSnapshot, EventContext
 
@@ -59,7 +59,7 @@ class BaseHandler(object):
             )
 
     @defer.inlineCallbacks
-    def _handle_new_client_event(self, builder):
+    def _create_new_client_event(self, builder):
         latest_ret = yield self.store.get_latest_events_in_room(
             builder.room_id,
         )
@@ -67,16 +67,27 @@ class BaseHandler(object):
         depth = max([d for _, _, d in latest_ret])
         prev_events = [(e, h) for e, h, _ in latest_ret]
 
-        group, curr_state = yield self.state_handler.resolve_state_groups(
-            [e for e, _ in prev_events]
-        )
+        state_handler = self.state_handler
+        if builder.is_state():
+            ret = yield state_handler.resolve_state_groups(
+                [e for e, _ in prev_events],
+                event_type=builder.event_type,
+                state_key=builder.state_key,
+            )
 
-        snapshot = EventSnapshot(
-            prev_events=prev_events,
-            depth=depth,
-            current_state=curr_state,
-            current_state_group=group,
-        )
+            group, curr_state, prev_state = ret
+
+            prev_state = yield self.store.add_event_hashes(
+                prev_state
+            )
+
+            builder.prev_state = prev_state
+        else:
+            group, curr_state, _ = yield state_handler.resolve_state_groups(
+                [e for e, _ in prev_events],
+            )
+
+        builder.internal_metadata.state_group = group
 
         builder.prev_events = prev_events
         builder.depth = depth
@@ -103,9 +114,39 @@ class BaseHandler(object):
             auth_events=curr_auth_events,
         )
 
+        defer.returnValue(
+            (event, context,)
+        )
+
+    @defer.inlineCallbacks
+    def _handle_new_client_event(self, event, context):
+        # We now need to go and hit out to wherever we need to hit out to.
+
         self.auth.check(event, auth_events=context.auth_events)
 
+        yield self.store.persist_event(event)
 
+        destinations = set()
+        for k, s in context.current_state.items():
+            try:
+                if k[0] == EventTypes.Member:
+                    if s.content["membership"] == Membership.JOIN:
+                        destinations.add(
+                            self.hs.parse_userid(s.state_key).domain
+                        )
+            except SynapseError:
+                logger.warn(
+                    "Failed to get destination from event %s", s.event_id
+                )
+
+        yield self.notifier.on_new_room_event(event)
+
+        federation_handler = self.hs.get_handlers().federation_handler
+        yield federation_handler.handle_new_event(
+            event,
+            None,
+            destinations=destinations,
+        )
 
     @defer.inlineCallbacks
     def _on_new_room_event(self, event, snapshot, extra_destinations=[],
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 925eb5376e..7bd36e415e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -76,7 +76,7 @@ class FederationHandler(BaseHandler):
 
     @log_function
     @defer.inlineCallbacks
-    def handle_new_event(self, event, snapshot):
+    def handle_new_event(self, event, snapshot, destinations):
         """ Takes in an event from the client to server side, that has already
         been authed and handled by the state module, and sends it to any
         remote home servers that may be interested.
@@ -92,12 +92,7 @@ class FederationHandler(BaseHandler):
 
         yield run_on_reactor()
 
-        pdu = event
-
-        if not hasattr(pdu, "destinations") or not pdu.destinations:
-            pdu.destinations = []
-
-        yield self.replication_layer.send_pdu(pdu)
+        yield self.replication_layer.send_pdu(event, destinations)
 
     @log_function
     @defer.inlineCallbacks
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 6e1c37df03..52a9788823 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -378,6 +378,7 @@ class RoomMemberHandler(BaseHandler):
         else:
             # This is not a JOIN, so we can handle it normally.
 
+            # FIXME: This isn't idempotency.
             if prev_state and prev_state.membership == event.membership:
                 # double same action, treat this event as a NOOP.
                 defer.returnValue({})
diff --git a/synapse/state.py b/synapse/state.py
index 430665f7ba..8a556a27f6 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -89,7 +89,7 @@ class StateHandler(object):
         ids = [e for e, _ in event.prev_events]
 
         ret = yield self.resolve_state_groups(ids)
-        state_group, new_state = ret
+        state_group, new_state, _ = ret
 
         event.old_state_events = copy.deepcopy(new_state)
 
@@ -137,7 +137,7 @@ class StateHandler(object):
 
     @defer.inlineCallbacks
     @log_function
-    def resolve_state_groups(self, event_ids):
+    def resolve_state_groups(self, event_ids, event_type=None, state_key=""):
         """ Given a list of event_ids this method fetches the state at each
         event, resolves conflicts between them and returns them.
 
@@ -156,7 +156,10 @@ class StateHandler(object):
                 (e.type, e.state_key): e
                 for e in state_list
             }
-            defer.returnValue((name, state))
+            prev_state = state.get((event_type, state_key), None)
+            if prev_state:
+                prev_state = prev_state.event_id
+            defer.returnValue((name, state, [prev_state]))
 
         state = {}
         for group, g_state in state_groups.items():
@@ -177,6 +180,13 @@ class StateHandler(object):
             if len(v.values()) > 1
         }
 
+        if event_type:
+            prev_states = conflicted_state.get(
+                (event_type, state_key), {}
+            ).keys()
+        else:
+            prev_states = []
+
         try:
             new_state = {}
             new_state.update(unconflicted_state)
@@ -186,7 +196,7 @@ class StateHandler(object):
             logger.exception("Failed to resolve state")
             raise
 
-        defer.returnValue((None, new_state))
+        defer.returnValue((None, new_state, prev_states))
 
     def _get_power_level_from_event_state(self, event, user_id):
         if hasattr(event, "old_state_events") and event.old_state_events:
diff --git a/synapse/storage/signatures.py b/synapse/storage/signatures.py
index eea4f21065..e2f11c7ffc 100644
--- a/synapse/storage/signatures.py
+++ b/synapse/storage/signatures.py
@@ -15,6 +15,8 @@
 
 from _base import SQLBaseStore
 
+from syutil.base64util import encode_base64
+
 
 class SignatureStore(SQLBaseStore):
     """Persistence for event signatures and hashes"""
@@ -67,6 +69,20 @@ class SignatureStore(SQLBaseStore):
             f
         )
 
+    def add_event_hashes(self, event_ids):
+        hashes = yield self.store.get_event_reference_hashes(
+            event_ids
+        )
+        hashes = [
+            {
+                k: encode_base64(v) for k, v in h.items()
+                if k == "sha256"
+            }
+            for h in hashes
+        ]
+
+        defer.returnValue(zip(event_ids, hashes))
+
     def _get_event_reference_hashes_txn(self, txn, event_id):
         """Get all the hashes for a given PDU.
         Args: