I about me

[AI 에이전트 엔지니어링] Ch11. 개선 루프 본문

AI

[AI 에이전트 엔지니어링] Ch11. 개선 루프

ssungni 2026. 5. 30. 23:48

지속적 개선

(1) 피드백 파이프라인

: 자동화 + 인간 결합

 

1. 대규모 시스템의 데이터를 처리하는 1차 분석선

  • 입력: 로그, 에러 메시지, 만족도 점수
  • 이상 탐지 + 클러스터링(DSPy, Trace, APO)
  • 출력: 실행 가능한 인사이트

2. 이슈 탐지와 근본 원인 분석(RCA)

  • 탐지 대상: 반복 실패, 에러율 급증, 사용자 만족도 이상
  • RCA 단계: 로그 분석 → 어디서 문제 발생 → 패턴 인식 중대성 수준
  • 실패 원인은 기술 뿐만 아니라 모호한 작업 정의에서 온다.

3. 인간 개입 리뷰(Human-in-the-Loop)

  • 자동화의 한계를 보완하는 사람 판단

4. 프롬프트 정제

  • 모호성 제거, 예시 추가, 작업 분해, 컨텍스트 확장

 

(2) 실험

: 통제 환경에서 변경 검증

  • Shadow Deployment: 온라인 시스템에는 올려놓고, 외부에 공개하지는 않음
  • A/B 테스트: 50 → 기존 시스템, 50    새로운 시스템
  • 베이지안 밴딧: 승자 쪽으로 지속적으로 동적 재분배 

 

(3) 지속 학습

: 동적 적응 내재화

 

  • In-Context Learning: 프롬프트에 즉시 추가
    • 롤링 윈두우: 일부분 유지
    • 시맨틱 압축: 중요 부분만
    • 백터 기반 검색: 관련성
  • Offline Retraining (오프라인 재학습)
    • 수집된 성공 사례를 시스템에 내재화

 

조직과 문화

  • 조직 전체의 협력 필요
  • 실패를 문화로 보고 개선안 도출
    • 작고 자주 배포
    • 빠른 피드백 루프

 


import dspy
dspy.configure(lm=dspy.LM("gpt-5-mini"))

def lookup_threat_intel(indicator: str) -> str:
    """모의: 인디케이터에 대한 위협 인텔리전스를 조회합니다."""
    return f"Mock intel for {indicator}: potentially malicious"

def query_logs(query: str) -> str:
    """모의: 보안 로그를 검색하고 분석합니다."""
    return f"Mock logs for '{query}': suspicious activity detected"

# 소수의 합성 테스트 케이스(경보 → 기대 응답)
# 실제로는 실제 로그에서 파생하거나 실패 사례에 주석을 달아 수집합니다.
# 더 나은 최적화를 위해 100개+를 목표로 하세요.
trainset = [
    dspy.Example(alert='''Suspicious login attempt from IP 203.0.113.45 to 
                 admin account.''',
                 response='''Lookup threat intel for IP, query logs for activity, 
                     triage as true positive, isolate host if malicious.''')
                     .with_inputs('alert'),
    dspy.Example(alert="Unusual file download from URL example.com/malware.exe.",
                 response='''Lookup threat intel for URL and hash, query logs 
                     for endpoint activity, triage as true positive, isolate 
                     host.''').with_inputs('alert'),
    dspy.Example(alert="High network traffic to domain suspicious-site.net.",
                 response='''Lookup threat intel for domain, query logs for 
                     network and firewall, triage as false positive if 
                     benign.''').with_inputs('alert'),
    dspy.Example(alert='''Alert: Potential phishing email with attachment 
                 hash abc123.''',
                 response='''Lookup threat intel for hash, query logs for email 
                     and endpoint, triage as true positive, send analyst 
                     response.''').with_inputs('alert'),
    dspy.Example(alert='''Anomaly in user behavior: multiple failed logins from 
                 new device.''',
                 response='''Query logs for authentication, lookup threat intel 
                     for device IP, triage as true positive if pattern matches 
                     attack.''').with_inputs('alert'),
]

# SOC 인시던트 처리를 위한 리액트 모듈 정의
react = dspy.ReAct("alert -> response", tools=[lookup_threat_intel, query_logs])

# 단순 지표를 사용하는 옵티마이저
# (예시를 위해 exact match 사용. 프로덕션에서는
# 시맨틱 유사도 같은 더 정교한 지표를 사용하세요)
tp = dspy.MIPROv2(metric=dspy.evaluate.answer_exact_match, auto="light", 
                  num_threads=24)
optimized_react = tp.compile(react, trainset=trainset)

 

1. ReACT: 추론하고 도구를 사용하는 에이전트 방식

  • 수행 과정
    alert(경고 입력) → 생각 
     
    lookup_threat_intel(외부 위협 정보 확인) →  생각
      query_logs(실제 공격 흔적 확인) 생각  respone(기대 대응)

2. MIPROv2

  • Multi-prompt Instruction Proposal Optimizer v2

3. compile

  • 평가·탐색·최적화

import dspy
dspy.configure(lm=dspy.LM("gpt-5-mini"))

# 위협 분류 작업을 위한 DSPy 시그니처 정의
class ThreatClassifier(dspy.Signature):
   """주어진 인디케이터(IP, URL, 해시 등)의 위협 수준을 
   'benign', 'suspicious', 'malicious' 중 하나로 분류합니다."""
   indicator: str = dspy.InputField(desc="IP 주소, URL, 파일 해시 등 분류할 인디케이터.")
   threat_level: str = dspy.OutputField(desc="분류된 위협 수준: 'benign', 'suspicious', 또는 'malicious'.")

# 사려 있는 분류를 위한 ChainOfThought 기반 DSPy 모듈
class ThreatClassificationModule(dspy.Module):
   def __init__(self):
       super().__init__()
       self.classify = dspy.ChainOfThought(ThreatClassifier)
  
   def forward(self, indicator):
       return self.classify(indicator=indicator)

# 최적화를 위한 합성/수기 주석 데이터셋(실무에서는 실제 SOC 로그에서 50~200+ 예시 사용)
# 각 예시는 인디케이터와 정답 위협 수준을 포함합니다.
trainset = [
   dspy.Example(indicator="203.0.113.45", 
       threat_level="suspicious").with_inputs('indicator'),  # 알려진 악성 IP
   dspy.Example(indicator="example.com/malware.exe", 
       threat_level="malicious").with_inputs('indicator'),  # 악성 URL
   dspy.Example(indicator="benign-site.net", 
       threat_level="benign").with_inputs('indicator'),  # 안전한 도메인
   dspy.Example(indicator="abc123def456", 
       threat_level="malicious").with_inputs('indicator'),  # 멀웨어 해시
   dspy.Example(indicator="192.168.1.1", 
       threat_level="benign").with_inputs('indicator'),  # 로컬 IP
   dspy.Example(indicator="obfuscated.url/with?params", 
       threat_level="suspicious").with_inputs('indicator'),  # 난독화 URL 엣지 케이스
   dspy.Example(indicator="new-attack-vector-hash789", 
       threat_level="malicious").with_inputs('indicator'),  # 새로운 위협
]

# 평가 지표(위협 수준의 exact match.
# 프로덕션에서는 시맨틱 매치나 커스텀 스코어러 권장)
def threat_match_metric(example, pred, trace=None):
   return example.threat_level.lower() == pred.threat_level.lower()

# 모듈 최적화(다양한 사례 처리를 위한 내부 프롬프트를 정제)
optimizer = dspy.BootstrapFewShotWithRandomSearch(metric=threat_match_metric, 
    max_bootstrapped_demos=4, max_labeled_demos=4)
optimized_module = optimizer.compile(ThreatClassificationModule(), 
                                     trainset=trainset)

# 도구에서의 사용 예: 최적화 후 classify_threat에 사용
def classify_threat(indicator: str) -> str:
   """최적화된 DSPy 모듈을 사용해 위협 수준을 분류합니다."""
   prediction = optimized_module(indicator=indicator)
   return prediction.threat_level
  • compile()을 호출하는 순간 DSPy가 trainset을 이용해 평가, 탐색, 최적화를 시작함