Vier-Augen-Prinzip (Snowflake)
Diese Seite ist eine Anleitung, mit welcher das Vier-Augen-Prinzip in der DMC implementiert werden kann. Da die Implementierung aufgrund der Rollenverwaltung und den angebundenen Folgeprozessen sehr individuell ist, werden Codes/Skripte bereitgestellt, welche nur als Fundament zu sehen sind. An manchen Stellen müssen diese individuell angepasst werden.
Ziel dabei ist folgende Funktionalität: Es wird im Vorhinein festgelegt, welche Tabellen von zwei verschiedenen Personen freigegeben werden müssen. Wenn in einer der Tabellen ein neuer Eintrag existiert, muss die Tabelle nach dem Vier-Augen-Prinzip freigegeben werden. Diese Funktionalität wird durch eine robuste, QS-geprüfte Hilfstabelle realisiert.
Die Freigabetabelle
Die Grundlage der Datenfreigabe ist die Freigabetabelle. Diese hat die Funktion, die Freigaben zu historisieren und als "Ampel" zu fungieren. Sie stellt also die Information bereit, welche Tabellen zur Beladung freigegeben wurden und welche nicht.
Um diese zu erstellen, wird die Funktion Tabelle manuell erstellen genutzt. Es wird empfohlen, die Tabelle wie folgt zu konfigurieren, da alle Prozeduren und Funktionen auf dieser Tabellenstruktur zugreifen:
Schritt 1: Informationen zu Tabelle
DB-Tabellenname: ADM_DATENFREIGABE
DMC-Anzeigename: Datenfreigabe
Beschreibung: Zur Freigabe von Tabellen
DMC-Kategorie: Muss vom Administrator festgelegt werden
Schritt 2: Spalten hinzufügen
Hier sind DB-Spaltenname und DMC-Anzeigename identisch
Spalte 1: SOURCE_TABLE, Datentyp Zeichenkette, Verpflichtend
Spalte 2: TARGET_SYSTEM, Datentyp Zeichenkette
Spalte 3: STATUS, Datentyp Zeichenkette
Spalte 4: RELEASE_FOR_EXPORT, Datentyp Checkbox, Verpflichtend
Spalte 5: DMC_NAME, Datentyp Zeichenkette, Verpflichtend
Nun bitte überpfügen, ob die Tabelle in der richtigen Kategorie zu sehen ist.
Die CHANGEDATES View
Um die folgenden Schritte in Snowflake erfolgreich umsetzen zu können, muss die Rolle, welche auch den Snowflakecontainer gestartet hatdie folgenden Befehle ausführen. Später werden über die DMC mehrere QS-Regeln in der Freigabetabelle sowie eine Prozedur implementiert, welche auf diese View zurückgreifen. Ein QS-Check überprüft, wann die letzte Änderung an einer freizugebenden Tabelle stattgefunden hat. Die Logik hinter dieser View ist die Akkumulierung aller Historisierungseinträge der zu exportierenden Tabellen. Hier sollen die Tabellen eingetragen werden, welche in der Freigabetabelle später gelistet werden sollen:
Klicke hier, um den SQL-Code anzuzeigen
-- THESE ARE ONLY SAMPLE TABLES FOR TESTING! PLEASE UPDATE
CREATE OR REPLACE VIEW ADM_TAB_CHANGEDATES_V
(TABLE_NAME, LAST_CHANGE)
AS
SELECT
'golden_records' AS TABLE_NAME,
MAX(HIST_FROM) AS LAST_CHANGE
FROM
golden_records
UNION ALL
SELECT
'mitarbeiter' AS TABLE_NAME,
MAX(HIST_FROM) AS LAST_CHANGE
FROM
mitarbeiter
UNION ALL
SELECT
'plandaten' AS TABLE_NAME,
MAX(HIST_FROM) AS LAST_CHANGE
FROM
plandaten;
Die FREIGABE_STATUS View
Nun folgt das Skript für eine Ansicht, die nur in der Datenbank zu sehen ist und nicht in der DMC verwaltet wird. Die View liefert Informationen für die Exportprozesse. Dabei werden nur die Tabellen angezeigt, welche den Freigabeprozess erfolgreich abgeschlossen haben. Die View muss an die benutzten Exportprozesse angepasst werden.
Klicke hier, um den SQL-Code anzuzeigen
CREATE OR REPLACE VIEW ADM_FREIGABE_STATUS_V
AS
SELECT
ADF.SOURCE_TABLE,
ADF.STATUS,
ADF.HIST_UNTIL
FROM
ADM_DATENFREIGABE AS ADF
JOIN
ADM_TAB_CHANGEDATES_V AS ATC
ON
ADF.SOURCE_TABLE = ATC.TABLE_NAME -- Zuordnung über DMC_NAME und TABLE_NAME
WHERE
ADF.RELEASE_FOR_EXPORT = 1
AND ADF.STATUS = 'FREIGEGEBEN'
GROUP BY
ADF.SOURCE_TABLE,
ADF.STATUS,
ADF.HIST_UNTIL; -- Gruppierung aller ausgewählten Spalten, um die gewünschten Datensätze zu erhalten
Die Prozedur zur Umsetzung des Vier-Augen-Prinzips
Die nachfolgende Prozedur überprüft, ob es zwei freigegebene Einträge gibt, und historisiert diese anschließend. Diese Prozedur muss in der Datenbank gespeichert werden, damit diese in der DMC genutzt werden kann: Um das Prinzip umzusetzen, ist das Generieren eines RSA Keypaars notwendig. Eine Anleitung ist in diesem Mediumartikel unter Schritt 2 zu finden. Außerdem muss ein technischer User und eine Rolle erstellt werden. Hier ist es DMC_SERVICE_ACCOUNT und DMC_SNOWPARK_ROLE. Danach muss die API-Prozedur in Snowflake erstellt werden:
Wenn der Container, auf welchem die DMC läuft, geupdatet/neugestartet wird, müssen der Endpoint und die Network-Rule updatet werden.
Klicke hier, um den SQL-Code anzuzeigen
USE ROLE DMC_SNOWPARK_ROLE; --Service Management Role with access to all tables. To follow the least privileged paradigm, create a role with access to the necessary tables (Freigabe-Tabelle, Freizugebende Tabellen)
GRANT MODIFY PROGRAMMATIC AUTHENTICATION METHODS ON USER DMC_SERVICE_ACCOUNT --Grant the privilege to assign a public key to a Snowflake user
TO ROLE DMC_SNOWPARK_ROLE;
-- Defining private Key
ALTER USER DMC_SERVICE_ACCOUNT SET RSA_PUBLIC_KEY='<public key>';
-- Creating Private Key
CREATE OR REPLACE SECRET DMC_PRIVATE_KEY
TYPE = GENERIC_STRING
SECRET_STRING = '-----BEGIN PRIVATE KEY-----
<private key>
-----END PRIVATE KEY-----';
GRANT USAGE ON SECRET EMC_SNOWPARK_DB.DMC.DMC_PRIVATE_KEY TO ROLE DMC_SNOWPARK_ROLE;
--Creating network rule to handle network trafic out of Snowflake
CREATE OR REPLACE NETWORK RULE dmc_rule
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('<Service Endpoint>', '<Servername>'); -- Service Endpoint by sql: 'SHOW ENDPOINTS IN SERVICE DMC;' | Servername by sql: 'select current_organization_name()||'-'||current_account_name()'
GRANT USAGE ON NETWORK RULE dmc_rule TO ROLE DMC_SNOWPARK_ROLE;
--Creating external access integration to handle external access to Snowflake
USE ROLE ACCOUNTADMIN;
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION dmc_ext_int
ALLOWED_NETWORK_RULES = (dmc_rule)
ENABLED = TRUE
ALLOWED_AUTHENTICATION_SECRETS = (DMC_PRIVATE_KEY);
GRANT OWNERSHIP ON INTEGRATION dmc_ext_int TO ROLE DMC_SNOWPARK_ROLE;
USE ROLE DMC_SNOWPARK_ROLE;
GRANT USAGE ON PROCEDURE CALL_DMC_SERVICE() TO ROLE DMC_SNOWPARK_ROLE;
CREATE OR REPLACE PROCEDURE call_dmc_service(
techUser STRING,
ingress_hostname STRING,
api_endpoint STRING,
payload VARIANT,
call_type STRING )
RETURNS STRING
LANGUAGE PYTHON
RUNTIME_VERSION = '3.10'
PACKAGES = ('requests', 'snowflake-snowpark-python','pyjwt','cryptography')
EXTERNAL_ACCESS_INTEGRATIONS = (dmc_ext_int)
SECRETS = ('DMC_PRIVATE_KEY' = DMC_PRIVATE_KEY)
HANDLER = 'run'
EXECUTE AS CALLER --This to enforce access control security to retrieve public key fingerprint
AS
$$
import os
import requests
import _snowflake
import jwt
import json
from snowflake.snowpark import Session
from datetime import datetime, timezone, timedelta
# Custom exceptions for better error handling
class ApiError(Exception):
"""Base exception for API related errors."""
pass
class ApiHttpError(ApiError):
"""Raised for HTTP errors (4xx/5xx)."""
def __init__(self, status_code, text):
self.status_code = status_code
self.text = text
super().__init__(f"HTTP error {status_code}: {text}")
class ApiEmptyResponseError(ApiError):
"""Raised when the API returns an empty response."""
pass
def run(session: Session, tech_user, ingress_hostname,api_endpoint, payload, call_type) -> str:
try:
priv_key = _snowflake.get_generic_secret_string('DMC_PRIVATE_KEY')
account = session.sql("select current_organization_name()||'-'||current_account_name()").first()[0]
public_key_fp = "SHA256:" + str(session.sql(f"""DESC USER {tech_user}
->> SELECT SUBSTR(
(SELECT "value" FROM $1
WHERE "property" = 'RSA_PUBLIC_KEY_FP'),
LEN('SHA256:') + 1) AS key;""").first()[0])
jwt_token = generate_jwt_token(priv_key, account, tech_user, public_key_fp)
access_token = exchange_jwt_for_access_token(jwt_token, account, ingress_hostname)
return make_api_request(access_token, f"https://{ingress_hostname}" + api_endpoint, payload, call_type)
except ApiEmptyResponseError:
return "Fehler: Die API lieferte eine leere Antwort."
# Function to generate JWT token
def generate_jwt_token(private_key, account, tech_user, public_key_fp):
# Uppercase account and tech_user
account = account.upper()
tech_user = tech_user.upper()
qualified_username = account + "." + tech_user
# Current time and token lifetime
now = datetime.now(timezone.utc)
lifetime = timedelta(seconds=30)
# Payload for the token
payload = {
"iss": qualified_username + '.' + public_key_fp,
"sub": qualified_username,
"iat": now,
"exp": now + lifetime
}
# Generate the JWT token
encoding_algorithm = "RS256"
token = jwt.encode(payload, key=private_key, algorithm=encoding_algorithm)
# Convert to string if necessary
if isinstance(token, bytes):
token = token.decode('utf-8')
return token
def exchange_jwt_for_access_token(jwt, account, ingress_hostname):
account_url = account.replace("_", "-") # Modifies Account name to insert into URL
url = f"https://{account_url}.snowflakecomputing.com/oauth/token"
data = {
"grant_type": "urn:ietf:params:oauth:grant-type:jwt-bearer",
"scope": ingress_hostname,
"assertion": jwt
}
resp = requests.post(url, data=data)
if resp.status_code != 200:
text = resp.text or "<empty response>"
raise Exception(f"Token exchange HTTP error {resp.status_code}: {text}")
return resp.text
def make_api_request(access_token, api_endpoint, payload, call_type):
# Define headers
headers = {
"content-type": "application/json;charset=UTF-8",
"Authorization": f"Snowflake Token=\"{access_token}\"",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "de,de-DE;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
}
call_type = call_type.upper()
# Make the request, Case statements for Call type
if ( call_type == "GET" ):
response = requests.get(api_endpoint, headers=headers)
elif ( call_type == "POST"):
response = requests.post(api_endpoint, headers=headers, json=payload)
elif ( call_type == "PUT"):
response = requests.put(api_endpoint, headers=headers, json=payload)
try:
response.raise_for_status() # Raises HTTPError for 4xx/5xx errors
except requests.exceptions.HTTPError as e:
raise ApiHttpError(status_code=response.status_code, text=response.text)
# Ensure there's a body
if not response.text:
raise ApiEmptyResponseError("API Antwort leer")
# Optionally check Content-Type header
ct = response.headers.get("Content-Type", "")
if "application/json" not in ct:
raise ApiError(f"Expected JSON but got content type: {ct}; body: {response.text}")
# Safe parse
try:
data = response.json()
except ValueError:
raise ApiError(f"Invalid JSON response: {response.text}")
return data;
$$;
Nun muss die Prozedur definiert werden, welche das Vier-Augen-Prinzip umsetzt:
Es wird die TableID der Freigabetabelle benötigt. Um dies herauszufinden, empflieht sich der API Call: CALL call_dmc_service('*TechnischerUser*','*ServiceEndpoint*','/api/tables', '', 'GET');
Klicke hier, um den SQL-Code anzuzeigen
CREATE OR REPLACE PROCEDURE vier_augen_prinzip(
SourceTableParam VARCHAR(50),
HistEditorParam VARCHAR(100) DEFAULT 'admin'
)
RETURNS BOOLEAN
LANGUAGE SQL
EXECUTE AS CALLER
AS
$$
DECLARE
CurrentProcessingTime TIMESTAMP_NTZ(3) DEFAULT CURRENT_TIMESTAMP();
entryCount INT;
jsonArrayPayload1 ARRAY;
jsonArrayPayload2 ARRAY;
BEGIN
-- Temporary table for temporarily storing the active data records to be processed.
CREATE OR REPLACE TEMPORARY TABLE AffectedOldEntries (
SOURCE_TABLE VARCHAR(30),
TARGET_SYSTEM VARCHAR(20),
HIST_BASE VARCHAR(100),
DMC_NAME VARCHAR(50),
HIST_FROM_PK TIMESTAMP_NTZ(3),
NEW_HIST_UNTIL TIMESTAMP_NTZ(3) NULL
);
-- Collect the relevant data records that are currently valid (‘9999-12-31’)
-- and have been released for export (RELEASE_FOR_EXPORT = 1).
INSERT INTO AffectedOldEntries
SELECT
df.SOURCE_TABLE,
df.TARGET_SYSTEM,
df.HIST_BASE,
df.DMC_NAME,
df.HIST_FROM,
tc.LAST_CHANGE
FROM ADM_DATENFREIGABE AS df
LEFT JOIN ADM_TAB_CHANGEDATES_V AS tc
ON df.SOURCE_TABLE = tc.TABLE_NAME
WHERE df.SOURCE_TABLE = :SourceTableParam
AND df.RELEASE_FOR_EXPORT = 1
AND df.HIST_UNTIL = '9999-12-31 00:00:00.000';
-- Counts the number of entries found.
SELECT COUNT(*) INTO :entryCount FROM AffectedOldEntries;
-- Checks whether the condition (exactly 2 entries) is met.
IF (entryCount = 2) THEN
-- Update old values with the history
-- Insert new records
-- Create a new table ‘JsonResults’ with the results
-- First update to set “Status” to “APPROVED”
CREATE OR REPLACE TEMPORARY TABLE JsonResults AS
SELECT
OBJECT_CONSTRUCT(
'target_system', t.TARGET_SYSTEM,
'source_table', t.SOURCE_TABLE,
'dmc_name', t.DMC_NAME,
'release_for_export', 'true',
'status','FREIGEGEBEN',
'hist_from', '',
'hist_until', '',
'hist_editor', '',
'hist_base', t.HIST_BASE,
'hashkey', t.HIST_BASE
) AS GENERATED_JSON
FROM AffectedOldEntries t;
--Storing the JSONs in a JSON array
SELECT ARRAY_AGG(j.GENERATED_JSON)
INTO :jsonArrayPayload1
FROM JsonResults j;
--API CALL
CALL call_dmc_service('DMC_SERVICE_ACCOUNT','<SERVICE ENDPOINT>','/api/tables/17/data', :jsonArrayPayload1, 'PUT');
--second update to set “release_for_export” to 0 and ‘Status’ to “NICHT FREIGEGEBEN ”
CREATE OR REPLACE TEMPORARY TABLE JsonResults AS
SELECT
OBJECT_CONSTRUCT(
'target_system', t.TARGET_SYSTEM,
'source_table', t.SOURCE_TABLE,
'dmc_name', t.DMC_NAME,
'release_for_export', 'false',
'status','NICHT FREIGEGEBEN',
'hist_from', '',
'hist_until', '',
'hist_editor', '',
'hist_base', t.HIST_BASE,
'hashkey', t.HIST_BASE
) AS GENERATED_JSON
FROM AffectedOldEntries t;
--Storing the JSONs in a JSON array
SELECT ARRAY_AGG(j.GENERATED_JSON)
INTO :jsonArrayPayload2
FROM JsonResults j;
--API CALL
CALL call_dmc_service('DMC_SERVICE_ACCOUNT','<SERVICE ENDPOINT>','/api/tables/17/data', :jsonArrayPayload2, 'PUT');
DROP TABLE IF EXISTS AffectedOldEntries;
DROP TABLE IF EXISTS JsonResults;
RETURN TRUE;
END IF;
-- Clears the temporary table and returns FALSE if the condition was not met.
DROP TABLE IF EXISTS AffectedOldEntries;
DROP TABLE IF EXISTS JsonResults;
RETURN FALSE;
END;
$$;
Implementierung der QS-Checks
Nun können in der DMC QS-Regeln für die Freigabetabelle angelegt werden. Es wird empfohlen, folgende drei QS-Regeln in der Freigabetabelle zu implementieren. Der Fehlertyp aller QS-Checks ist Fehler und der Regeltyp ist Datensatzebene
- Freigabe in der Vergangenheit: Die Daten wurden seit der letzten Freigabe nochmal bearbeitet:
RELEASE_FOR_EXPORT = 1
AND EXISTS (
SELECT 1
FROM ADM_TAB_CHANGEDATES_V v_change
WHERE v_change.TABLE_NAME = SOURCE_TABLE
AND HIST_FROM < v_change.LAST_CHANGE
)
- Vier Augen Prozess: Tabelle ADM_DATENFREIGABE - Verletzung des 4-Augen-Prozesses. Derselbe User darf nicht beide Freigaben durchführen:
(source_table, hist_editor) IN (
SELECT
source_table,
hist_editor
FROM
ADM_DATENFREIGABE
WHERE
release_for_export = 1
-- Standard-Datumsformat für Snowflake verwenden
AND HIST_UNTIL = '9999-12-31'::TIMESTAMP_NTZ
GROUP BY
source_table,
hist_editor
HAVING
COUNT(*) > 1
)
- Show QS: Prüft die Datenfreigabe auf Status = ERROR und zeigt die QS-Markierung:
status = 'ERROR'
Einrichten der Commands
Nachdem die QS-Regeln angelegt wurden, muss der Command, welcher die vorher angelegte Prozedur durchführt, angelegt werden. Dazu soll die Commandverwaltung aufgerufen und wie folgt konfiguriert werden:
Tabelle: Freigabe
Name: Start Freigabeprozess
Command-Typ: DB-Prozedur
Befehl:
Call vier_augen_prinzip('Tabelle1');
Anzahl der Sekunden, die Command auf Abschluss einer QS-Prüfung wartet: eigenständig aussuchen, 10 ist ein guter Richtwert
QS-Prüfung vor Ausführung: Bei Fehlern abbrechen
QS-Prüfung nach Ausführung: Keine Prüfung (Dies sollte geändert werden, falls QS-Checks nach der Ausführung aufgerufen werden sollen)
Für jede freizugebende Tabelle muss ein neuer Command angelegt werden.
Einrichten der Tasks
Nun muss auf die Tasks Verwalten navigiert werden. Hier werden zwei Tasks erstellt: einer, welcher die Prozedur ausführt und der andere führt die QS-Checks der Freigabetabelle aus.
Zunächst fügen wir die Prozedur hinzu. Der Task soll mit folgenden Parametern erstellt werden:
Typ: Command ausführen
Bezeichnung: Task Freigabeprozess
Optionen: Start Freigabeprozess
Nun muss der QS Check hinzugefügt werden mit folgenden Parametern:
Typ: QS-Regeln prüfen
Bezeichnung: QS-Checks Freigabe
Tabellen: Freigabe
Einrichten der Trigger
Nun haben wir beide Tasks angelegt. Damit diese automatisch ausgeführt werden, müssen die Trigger konfiguriert werden. Um einen Trigger zu verwalten, muss bei dem Task auf die Uhr gedrückt werden, welche in der Spalte 'Trigger' zu sehen ist.
Der Trigger für die Task 'Task Freigabe' sollte wie folgt konfiguriert werden:
Typ: Zeitgesteuert
Bezeichnung: Trigger für Task Freigabe
Cron-Ausdruck: 0 */5 * * * * (damit wird der Cronjob alle 5 Minuten ausgeführt)
Der Trigger für die Task 'Task QS-Checks Freigabe' soll wie folgt konfiguriert werden:
Typ: Zeigesteuert
Bezeichnung: Trigger für Task QS-Checks Freigabe
Cron-Ausdruck: */30 * * * * * (damit wird der Cronjob alle 30 Sekunden ausgeführt)
Einfügen der Einträge in die Freigabetabelle
Damit das Prinzip funktioniert, müssen zwei Einträge pro Tabelle, welche freigegeben werden sollen, in die Freigabetabelle hinzugefügt werden. Dafür müssen zwei Datensätze hinzugefügt werden. Dabei sollten die Eintäge wie folgt befüllt werden:
Source_table: Hier den Tabellennamen eintragen, welcher in der DB vergeben wurde.
Release_for_export: Nein
Dmc_name: Hier soll der Name der freizugebenden Tabelle eingetragen werden, welcher in der DMC vergeben wurde.
Bevor der Datensatz hinzugefügt wird, soll der Knopf 'Weiterer Datensatz (mit Werten)' genutzt werden. Damit werden gleich die zwei benötigten Datensätze erstellt, ohne die Maske erneut ausfüllen zu müssen.
Berechtigungskonzept
Damit nur berechtigte Personen Einträge in der Freigabetabelle sehen und bearbeiten können, kann der Administrator unter 'Berechtigung' über die 'Datenberechtigung' die Zugriffsberechtigung uaf Datensatzebene für bestimmte User einstellen. Weitere Informationen sind in der Dokumentation zu finden.
Veränderungen in den Tabellen nachvollziehen
Um Veränderungen in den Einträgen der Tabelle nachvollziehen zu können, kann die "Historie" genutzt werden.