def calculate_baseline(pet_id):
"""
최근 7일간 평균 및 표준편차 계산
"""
recent_data = spark.sql(f"""
SELECT
AVG(glucose) as baseline_mean,
STDDEV(glucose) as baseline_std
FROM glucose_raw
WHERE pet_id = '{pet_id}'
AND timestamp >= NOW() - INTERVAL 7 DAYS
""")
return recent_data.first()
def detect_personalized_anomaly(df):
"""
개체별 동적 임계값 적용
"""
# 1. 각 반려동물의 베이스라인 조회
baseline_df = spark.sql("""
SELECT
pet_id,
AVG(glucose) as mean,
STDDEV(glucose) as std
FROM glucose_raw
WHERE timestamp >= NOW() - INTERVAL 7 DAYS
GROUP BY pet_id
""")
# 2. 현재 데이터와 조인
df_with_baseline = df.join(baseline_df, "pet_id")
# 3. Z-Score 계산
df_with_zscore = df_with_baseline.withColumn(
"z_score",
(col("glucose") - col("mean")) / col("std")
)
# 4. Z-Score 기준 이상 탐지
# Z > 2: 상위 2.5% (이상)
# Z > 3: 상위 0.1% (매우 이상)
return df_with_zscore.withColumn("alert_type",
when(col("z_score") > 3, "CRITICAL_ANOMALY")
.when(col("z_score") > 2, "WARNING_ANOMALY")
.when(col("z_score") < -3, "CRITICAL_LOW_ANOMALY")
.when(col("z_score") < -2, "WARNING_LOW_ANOMALY")
.otherwise(None)
).filter(col("alert_type").isNotNull())