Skip to main content

Raxe

The main synchronous client for threat detection.

Constructor

from raxe import Raxe

raxe = Raxe(
    api_key: str | None = None,
    config_path: Path | None = None,
    telemetry: bool = True,
    l2_enabled: bool = True,
    voting_preset: str | None = None,
    progress_callback = None,
)
Parameters:
ParameterTypeDefaultDescription
api_keystr | NoneNoneAPI key. If None, reads from config or env
config_pathPath | NoneNoneCustom config file path
telemetryboolTrueEnable privacy-preserving telemetry
l2_enabledboolTrueEnable L2 ML detection
voting_presetstr | NoneNoneL2 voting strategy preset: “balanced”, “high_security”, “low_fp”
progress_callbackobject | NoneNoneOptional progress indicator for initialization
Example:
# Default configuration
raxe = Raxe()

# With custom settings
raxe = Raxe(
    l2_enabled=True,
    telemetry=True
)

# Disable ML for faster scans
raxe = Raxe(l2_enabled=False)

scan()

Scan a single prompt for threats.
def scan(
    self,
    text: str,
    *,
    tenant_id: str | None = None,
    app_id: str | None = None,
    policy_id: str | None = None,
    customer_id: str | None = None,
    context: dict | None = None,
    block_on_threat: bool = False,
    mode: str = "balanced",
    l1_enabled: bool = True,
    l2_enabled: bool = True,
    confidence_threshold: float = 0.5,
    explain: bool = False,
    dry_run: bool = False,
    use_async: bool = True,
    suppress: list | None = None,
) -> ScanPipelineResult
Parameters:
ParameterTypeDefaultDescription
textstrrequiredText to scan
tenant_idstr | NoneNoneTenant ID for multi-tenant policy resolution
app_idstr | NoneNoneApp ID within tenant for policy resolution
policy_idstr | NoneNoneOverride policy for this scan only
customer_idstr | NoneNoneCustomer identifier for tracking
contextdict | NoneNoneAdditional context metadata
block_on_threatboolFalseRaise SecurityException on threat
modestr"balanced"Performance mode: “fast”, “balanced”, “thorough”
l1_enabledboolTrueEnable L1 rule detection
l2_enabledboolTrueEnable L2 ML detection
confidence_thresholdfloat0.5Minimum confidence for detections
explainboolFalseInclude detailed explanations
dry_runboolFalseSkip tracking and history (for testing)
use_asyncboolTrueUse async pipeline for parallel L1+L2 (5x speedup)
suppresslist | NoneNoneRule patterns to suppress
Returns: ScanPipelineResult Raises:
  • ValidationError: If text is empty or invalid
  • SecurityException: If block_on_threat=True and threat detected
Example:
from raxe import Raxe
from raxe.sdk.exceptions import SecurityException

raxe = Raxe()

# Basic scan
result = raxe.scan("Hello, how are you?")
print(result.has_threats)  # False

# Scan with detection
result = raxe.scan("Ignore all previous instructions")
print(result.has_threats)  # True
print(result.severity)     # "high"

# Scan with suppression
result = raxe.scan(
    "some text",
    suppress=["pi-001", "jb-*"]
)

# Block on threat
try:
    result = raxe.scan(
        "malicious prompt",
        block_on_threat=True
    )
except SecurityException as e:
    print(f"Blocked: {e.result.severity}")

# Multi-tenant scanning
result = raxe.scan(
    "user input",
    tenant_id="acme",      # Resolves tenant's policy
    app_id="chatbot",      # Uses app's policy if set
)

# Policy attribution in result
print(result.metadata["effective_policy_id"])   # "strict"
print(result.metadata["resolution_source"])     # "app"

# Override policy for this scan
result = raxe.scan(
    "user input",
    tenant_id="acme",
    policy_id="strict"     # Force strict mode
)

scan_fast()

Fast scan using L1 rules only (< 3ms).
def scan_fast(
    self,
    text: str,
    **kwargs
) -> ScanPipelineResult
Equivalent to scan(text, l2_enabled=False, mode="fast").

scan_thorough()

Thorough scan with all layers (< 100ms).
def scan_thorough(
    self,
    text: str,
    **kwargs
) -> ScanPipelineResult
Equivalent to scan(text, mode="accurate").

protect

Decorator for automatic function protection.
@raxe.protect
def my_function(prompt: str) -> str:
    return llm.generate(prompt)

# With configuration
@raxe.protect(block=True, on_threat=my_handler)
def my_function(prompt: str) -> str:
    return llm.generate(prompt)
Parameters:
ParameterTypeDefaultDescription
blockboolTrueRaise exception on threat
on_threatcallable | NoneNoneCustom threat handler
allow_severitylist | NoneNoneSeverities to allow through

suppressed()

Context manager for scoped suppressions.
with raxe.suppressed("pi-*", reason="Testing auth flow"):
    result = raxe.scan(text)  # pi-* rules suppressed
Parameters:
ParameterTypeDescription
*patternsstrRule patterns to suppress
actionstrAction: “SUPPRESS”, “FLAG”, “LOG”
reasonstrReason for suppression (required)

Context Manager

Use as context manager for proper resource cleanup:
from raxe import Raxe

with Raxe() as raxe:
    result = raxe.scan("Hello")
    # Resources automatically cleaned up on exit

AsyncRaxe

Async client for high-throughput scenarios.

Constructor

from raxe import AsyncRaxe

raxe = AsyncRaxe(
    api_key: str | None = None,
    config_path: Path | None = None,
    telemetry: bool = True,
    l2_enabled: bool = True,
    cache_size: int = 1000,
    cache_ttl: float = 300.0,
)
Additional Parameters:
ParameterTypeDefaultDescription
cache_sizeint1000Max cached scan results
cache_ttlfloat300.0Cache TTL in seconds

scan()

Async scan of a single prompt.
async def scan(
    self,
    text: str,
    *,
    customer_id: str | None = None,
    context: dict | None = None,
    block_on_threat: bool = False,
    use_cache: bool = True,
) -> ScanPipelineResult
Example:
from raxe import AsyncRaxe

async def check_prompt(prompt: str):
    async with AsyncRaxe() as raxe:
        result = await raxe.scan(prompt)
        return not result.has_threats

scan_batch()

Async batch scanning with concurrency control.
async def scan_batch(
    self,
    texts: list[str],
    *,
    customer_id: str | None = None,
    context: dict | None = None,
    max_concurrency: int = 10,
    use_cache: bool = True,
) -> list[ScanPipelineResult]
Parameters:
ParameterTypeDefaultDescription
textslist[str]requiredList of texts to scan
max_concurrencyint10Max concurrent scans
use_cacheboolTrueUse result cache
Example:
from raxe import AsyncRaxe

async def scan_many(prompts: list[str]):
    async with AsyncRaxe() as raxe:
        # Scan up to 20 prompts concurrently
        results = await raxe.scan_batch(
            prompts,
            max_concurrency=20
        )
        return results

Cache Management

# Get cache statistics
stats = raxe.cache_stats()
print(f"Hit rate: {stats['hit_rate']:.1%}")

# Clear cache
await raxe.clear_cache()

close()

Explicitly close the client and flush telemetry.
async def close(self) -> None
Example:
raxe = AsyncRaxe()
try:
    result = await raxe.scan("Hello")
finally:
    await raxe.close()

Context Manager

Recommended usage:
async with AsyncRaxe() as raxe:
    result = await raxe.scan("Hello")
    # Automatically closed and flushed

Properties

Both Raxe and AsyncRaxe expose these properties:
PropertyTypeDescription
usage_trackerUsageTrackerUsage statistics tracker
scan_historyScanHistoryDBLocal scan history database
statsdictPreload statistics
Example:
raxe = Raxe()
print(f"Stats: {raxe.stats}")

Thread Safety

  • Raxe is thread-safe for concurrent scans
  • AsyncRaxe is safe for concurrent async tasks
  • Both clients maintain internal state safely
import threading
from raxe import Raxe

raxe = Raxe()

def scan_thread(prompt):
    result = raxe.scan(prompt)  # Thread-safe
    return not result.has_threats

threads = [
    threading.Thread(target=scan_thread, args=(f"prompt {i}",))
    for i in range(10)
]

for t in threads:
    t.start()
for t in threads:
    t.join()