Skip to content

Commit

Permalink
feat(usersessions): Trigger signals on IP and ua changes
Browse files Browse the repository at this point in the history
Co-authored-by: Benjamin Gervan <[email protected]>
Co-committed-by: Benjamin Gervan <[email protected]>
  • Loading branch information
bgervan authored and pennersr committed Sep 12, 2024
1 parent 200d5f5 commit a68c38d
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 9 deletions.
51 changes: 42 additions & 9 deletions allauth/usersessions/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from django.conf import settings
from django.contrib.auth import get_user
from django.core.exceptions import ImproperlyConfigured
from django.db import models
from django.db import models, transaction
from django.http import HttpRequest
from django.utils import timezone
from django.utils.translation import gettext_lazy as _
Expand Down Expand Up @@ -36,16 +36,49 @@ def create_from_request(self, request):
ua = request.META.get("HTTP_USER_AGENT", "")[
0 : UserSession._meta.get_field("user_agent").max_length
]
UserSession.objects.update_or_create(
session_key=request.session.session_key,
defaults=dict(
user=request.user,
ip=get_adapter().get_client_ip(request),
user_agent=ua,
last_seen_at=timezone.now(),
),

defaults = dict(
user=request.user,
ip=get_adapter().get_client_ip(request),
user_agent=ua,
)

with transaction.atomic():
from allauth.usersessions.signals import session_client_changed

session, created = UserSession.objects.get_or_create(
session_key=request.session.session_key, defaults=defaults
)

if not created:
from_session = UserSession(
session_key=session.session_key,
user=session.user,
ip=session.ip,
user_agent=session.user_agent,
data=session.data,
created_at=session.created_at,
last_seen_at=session.last_seen_at,
)
# Update session
session.user = defaults["user"]
session.ip = defaults["ip"]
session.user_agent = defaults["user_agent"]
session.last_seen_at = timezone.now()

session.save()

if (
from_session.ip != session.ip
or from_session.user_agent != session.user_agent
):
session_client_changed.send(
sender=UserSession,
request=request,
from_session=from_session,
to_session=session,
)


class UserSession(models.Model):
objects = UserSessionManager()
Expand Down
6 changes: 6 additions & 0 deletions allauth/usersessions/signals.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
from django.dispatch import Signal

from allauth.account import app_settings

from .models import UserSession


# Provides the arguments "request", "from_session", "to_session"
session_client_changed = Signal()


def on_user_logged_in(sender, **kwargs):
request = kwargs["request"]
UserSession.objects.create_from_request(request)
Expand Down
51 changes: 51 additions & 0 deletions allauth/usersessions/tests/test_middleware.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from unittest.mock import Mock

from django.contrib.auth.models import AnonymousUser
from django.test.utils import override_settings

import pytest

from allauth.usersessions.middleware import UserSessionsMiddleware
from allauth.usersessions.models import UserSession
from allauth.usersessions.signals import session_client_changed


def test_mw_without_request_user(rf, db, settings):
Expand Down Expand Up @@ -40,3 +42,52 @@ def test_mw_with_anonymous_request_user(rf, db, settings):
request.session.session_key = "sess-123"
mw(request)
assert not UserSession.objects.exists()


@override_settings(USERSESSIONS_TRACK_ACTIVITY=True)
def test_mw_change_ip_and_useragent(rf, db, user):
mw = UserSessionsMiddleware(lambda request: None)

# First request
request1 = rf.get("/")
request1.user = user
request1.session = Mock()
request1.session.session_key = "sess-123"
request1.META["HTTP_USER_AGENT"] = "Old User Agent"
request1.META["REMOTE_ADDR"] = "1.1.1.1"
mw(request1)

# Second request with changed IP and User Agent
request2 = rf.get("/")
request2.user = user
request2.session = Mock()
request2.session.session_key = "sess-123"
request2.META["HTTP_USER_AGENT"] = "New User Agent"
request2.META["REMOTE_ADDR"] = "2.2.2.2"

# Set up signal receiver
signal_received = []

def signal_handler(sender, request, from_session, to_session, **kwargs):
signal_received.append((from_session, to_session))

session_client_changed.connect(signal_handler)

# Process second request
mw(request2)

# Check if UserSession was updated
user_session = UserSession.objects.get(session_key="sess-123", user=user)
assert user_session.ip == "2.2.2.2"
assert user_session.user_agent == "New User Agent"

# Check if signal was triggered
assert len(signal_received) == 1
from_session, to_session = signal_received[0]
assert from_session.ip == "1.1.1.1"
assert from_session.user_agent == "Old User Agent"
assert to_session.ip == "2.2.2.2"
assert to_session.user_agent == "New User Agent"

# Clean up signal connection
session_client_changed.disconnect(signal_handler)
10 changes: 10 additions & 0 deletions docs/usersessions/signals.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Signals
=======

There is a signal emitted during usersession store. You can
hook to them for your own needs.


- ``allauth.usersessions.signals.session_client_changed(request, from_session, to_session)``
Sent when IP or useragent changed for a user session.

0 comments on commit a68c38d

Please sign in to comment.