Benutzer-Synchronisation zwischen Entra ID und Keycloak per Python-Skript

Die Synchronisation von Benutzerkonten zwischen Microsoft Entra ID und Keycloak kann effizient per Python-Skript erfolgen. Falls du dich mit den grundlegenden Ansätzen der Nutzer-Synchronisation zwischen Entra ID und Keycloak vertraut machen möchtest, empfehlen wir dir die Blogartikel Nutzer-Synchronisation zwischen Entra ID und Keycloak: Python-Skript vs. SCIM und Keycloak als Broker: Ist eine Nutzersynchronisation notwendig?

1. Konfiguration in Entra ID

Damit dein Skript auf die Microsoft Graph API zugreifen und Nutzer in Entra ID ermitteln kann, musst du zunächst eine App-Registrierung in Entra ID vornehmen. Dort notierst du Dir die Client ID, erzeugst ein Client Secret und weist die notwendigen Berechtigungen für die Graph API zu, um Benutzerinformationen abrufen zu können.

2. Konfiguration in Keycloak

In Keycloak muss eine Client-Applikation registriert werden, damit das Python-Skript über die REST API Nutzer verwalten kann.

  • Registrierung eines neuen Clients in Keycloak.
  • Hinterlegung der Client ID und des Secrets im Skript.
  • Erteilung der benötigten Berechtigungen über Service Account Roles, um Nutzer in Keycloak anlegen, aktualisieren und löschen zu können

3. Einführung des source-Attributs in Keycloak

Um eine klare Trennung zwischen synchronisierten und manuell angelegten Nutzern in Keycloak zu gewährleisten, führen wir das benutzerdefinierte Attribut source ein. Dieses Attribut ermöglicht es, Nutzer, die aus Entra ID synchronisiert wurden, eindeutig zu kennzeichnen und von manuell erstellten Benutzern zu unterscheiden.

Dafür legen wir ein neues Attribut source im User Profile an:

Beim Anlegen eines neuen Nutzers über das Skript wird das Attribut mit source=entraID gesetzt. Manuell erstellte Nutzer bleiben unverändert und wird kein Wert für das source Attribut zugeordnet. Dank dieser Kennzeichnung können ausschließlich synchronisierte Nutzer aktualisiert oder entfernt werden, ohne dass manuell erstellte Benutzer unbeabsichtigt betroffen sind. Das Python-Skript nutzt dieses Attribut, um sicherzustellen, dass es nur Nutzer verwaltet, die tatsächlich aus Entra ID stammen.

4. Implementierung des Python-Skripts

Nachdem du die Konfiguration in Entra ID und Keycloak abgeschlossen hast, kannst du das Python-Skript implementieren. Das Python-Skript ruft regelmäßig Benutzerinformationen aus Entra ID über die Microsoft Graph API ab und gleicht diese mit den in Keycloak gespeicherten Nutzern ab. Dabei werden nur Mitglieder synchronisiert, während Gastbenutzer ignoriert werden. Nutzer, die in Entra ID existieren, aber nicht in Keycloak, werden neu angelegt. Falls ein Nutzer in Entra ID nicht mehr vorhanden ist, wird dieser aus Keycloak entfernt. Manuell angelegte Keycloak-Nutzer bleiben unberührt

import requests

"""
Dieses Skript synchronisiert Benutzer aus Entra ID mit Keycloak.

Funktionalität:
- Nutzer aus Entra ID abrufen (keine Gastnutzer, nur Mitglieder).
- Nutzer aus Keycloak abrufen (nur Nutzer mit `source=entraID` werden verwaltet).
- Neue Nutzer aus Entra ID in Keycloak anlegen.
- Nutzer in Keycloak löschen, wenn sie nicht mehr in Entra ID existieren.
- Manuell angelegte Keycloak-Nutzer bleiben unberührt.
"""

# Keycloak Konfigurationswerte
KEYCLOAK_URL = "https://<your-keycloak-url>"
REALM_NAME = "<your keycloak realm>"
CLIENT_ID = "<the client id in keycloak>"
CLIENT_SECRET = "<your client secret>"

# Entra ID Konfigurationswerte
tenant_id = "<your azure tenant id>"
client_id = "<the client id in entra id>"
client_secret = "<the client secret in entra id>"

def get_keycloak_token():
    """Holt ein Zugriffstoken für Keycloak."""
    url = f"{KEYCLOAK_URL}/realms/{REALM_NAME}/protocol/openid-connect/token"
    payload = {"grant_type": "client_credentials", "client_id": CLIENT_ID, "client_secret": CLIENT_SECRET}
    response = requests.post(url, data=payload)
    response.raise_for_status()
    return response.json().get("access_token")

def get_entraid_token():
    """Holt ein Zugriffstoken für Entra ID."""
    url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
    payload = {"grant_type": "client_credentials", "client_id": client_id, "client_secret": client_secret, "scope": "https://graph.microsoft.com/.default"}
    response = requests.post(url, data=payload)
    response.raise_for_status()
    return response.json().get("access_token")

def get_keycloak_users():
    """Ruft alle verwalteten Keycloak-Nutzer (mit `source=entraID`) ab."""
    token = get_keycloak_token()
    url = f"{KEYCLOAK_URL}/admin/realms/{REALM_NAME}/users"
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.get(url, headers=headers)
    response.raise_for_status()
    users = {}
    for user in response.json():
        username = user["username"]
        user_id = user["id"]
        attributes = user.get("attributes", {})
        if attributes.get("source") == ["entraID"]:
            users[username] = user_id  # Nur Entra ID Nutzer tracken
    return users

def get_entraid_users():
    """Ruft alle Mitglieder-Nutzer aus Entra ID ab (keine Gäste)."""
    token = get_entraid_token()
    url = "https://graph.microsoft.com/v1.0/users"
    headers = {"Authorization": f"Bearer {token}"}
    users = {}
    while url:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        data = response.json()
        for user in data.get("value", []):
            if "#EXT#" not in user["userPrincipalName"]:  # Ignorieren von Gastbenutzern
                users[user["userPrincipalName"]] = user
        url = data.get("@odata.nextLink")
    return users

def create_keycloak_user(username, given_name, surname, email):
    """Erstellt einen neuen Benutzer in Keycloak mit dem Attribut `source=entraID`."""
    token = get_keycloak_token()
    url = f"{KEYCLOAK_URL}/admin/realms/{REALM_NAME}/users"
    headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
    payload = {
        "username": username,
        "enabled": True,
        "firstName": given_name,
        "lastName": surname,
        "email": email,
        "emailVerified": True,
        "attributes": {"source": "entraID"}  # Kennzeichnen als synchronisiert
    }
    response = requests.post(url, headers=headers, json=payload)
    if response.status_code != 201:
        print(f"Fehler beim Erstellen des Benutzers {username}: {response.status_code}, {response.text}")
    response.raise_for_status()

def delete_keycloak_user(user_id):
    """Löscht einen Benutzer in Keycloak anhand der ID."""
    token = get_keycloak_token()
    url = f"{KEYCLOAK_URL}/admin/realms/{REALM_NAME}/users/{user_id}"
    headers = {"Authorization": f"Bearer {token}"}
    response = requests.delete(url, headers=headers)
    response.raise_for_status()

def sync_users():
    """Vergleicht Entra ID mit Keycloak und synchronisiert die Benutzer."""
    keycloak_users = get_keycloak_users()
    entraid_users = get_entraid_users()
    
    # Nutzer anlegen, die in Entra ID existieren, aber nicht in Keycloak
    for username, user_data in entraid_users.items():
        if username not in keycloak_users:
            create_keycloak_user(username, user_data.get("givenName", ""), user_data.get("surname", ""), user_data.get("mail", ""))
            print(f"Benutzer {username} in Keycloak erstellt.")
    
    # Nur Entra ID synchronisierte Nutzer löschen
    for username, user_id in keycloak_users.items():
        if username not in entraid_users:
            delete_keycloak_user(user_id)
            print(f"Benutzer {username} aus Keycloak entfernt.")

if __name__ == "__main__":
    try:
        sync_users()
    except Exception as e:
        print(f"Fehler: {e}")

5. Skalierung und Performance-Optimierung

Das Skript wurde nicht für eine effiziente Verarbeitung sehr großer Nutzermengen optimiert. Bei mehreren Tausend Benutzern kann die Laufzeit erheblich ansteigen, da jeder Nutzer einzeln abgefragt, erstellt oder gelöscht wird. In einem produktiven Einsatz wäre es sinnvoll, Optimierungen wie Batch-Operationen, asynchrone Verarbeitung oder Delta-Synchronisation einzubauen, um die Performance zu verbessern. Zudem sollten mögliche API-Limits von Entra ID und Keycloak berücksichtigt werden, um Rate-Limit-Fehler zu vermeiden. Eine Skalierung für größere Datenmengen müsste daher vor einer produktiven Nutzung noch erfolgen.

Nach oben scrollen