Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ offline_store:

entity_key_serialization_version: 3

mlflow:
enabled: true
tracking_uri: "http://localhost:5000"
enable_tracing: true

feature_server:
type: mcp
enabled: true
Expand Down
7 changes: 7 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ mssql = ["ibis-framework[mssql]>=10.0.0"]
oracle = ["ibis-framework[oracle]>=10.0.0"]
mysql = ["pymysql", "types-PyMySQL"]
openlineage = ["openlineage-python>=1.40.0"]
mlflow = [
"mlflow>=2.14.0",
"opentelemetry-api>=1.28.0",
"opentelemetry-sdk>=1.28.0",
"opentelemetry-instrumentation-fastapi>=0.49b0",
"opentelemetry-instrumentation-httpx>=0.49b0",
]
opentelemetry = ["prometheus_client", "psutil"]
spark = ["pyspark>=4.0.0"]
trino = ["trino>=0.305.0,<0.400.0", "regex"]
Expand Down
229 changes: 157 additions & 72 deletions sdk/python/feast/feature_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,39 @@ async def load_static_artifacts(app: FastAPI, store):
logger.warning(f"Failed to load static artifacts: {e}")


def _instrument_app_for_tracing(app: FastAPI, store: "feast.FeatureStore") -> None:
"""Add OTEL instrumentation to FastAPI if tracing is enabled.

This enables automatic extraction of ``traceparent`` HTTP headers from
incoming requests, creating server spans that link to the caller's trace.
This is the Tier 3 bridge: when an agent sends traceparent, server spans
become children of the agent's trace tree.
"""
mlflow_cfg = store.config.mlflow
if mlflow_cfg is None or not mlflow_cfg.enabled or not mlflow_cfg.enable_tracing:
return

from feast.tracing import _is_embedded_store

tracking_uri = mlflow_cfg.get_tracking_uri()
if _is_embedded_store(store) and tracking_uri and tracking_uri.startswith("http"):
logger.info(
"Skipping FastAPI OTEL instrumentation (embedded store + HTTP tracking)"
)
return

try:
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor

FastAPIInstrumentor.instrument_app(app)
logger.info("FastAPI OTEL instrumentation enabled for trace propagation")
except ImportError:
logger.debug(
"opentelemetry-instrumentation-fastapi not installed; "
"cross-process trace linking disabled"
)


def get_app(
store: "feast.FeatureStore",
registry_ttl_sec: int = DEFAULT_FEATURE_SERVER_REGISTRY_TTL,
Expand Down Expand Up @@ -360,86 +393,123 @@ async def lifespan(app: FastAPI):

app = FastAPI(lifespan=lifespan)

_instrument_app_for_tracing(app, store)

@app.post(
"/get-online-features",
dependencies=[Depends(inject_user_details)],
response_model=OnlineFeaturesResponse,
)
async def get_online_features(request: GetOnlineFeaturesRequest) -> Any:
with feast_metrics.track_request_latency(
"/get-online-features",
) as metrics_ctx:
features = await _get_features(request, store)
feat_count, fv_count = _resolve_feature_counts(features)
metrics_ctx.feature_count = feat_count
metrics_ctx.feature_view_count = fv_count

entity_count = len(next(iter(request.entities.values()), []))
feast_metrics.track_online_features_entities(entity_count)

read_params = dict(
features=features,
entity_rows=request.entities,
full_feature_names=request.full_feature_names,
include_feature_view_version_metadata=request.include_feature_view_version_metadata,
)
async def get_online_features(
request: GetOnlineFeaturesRequest, raw_request: Request
) -> Any:
from feast.tracing import traced_tool_span

if store._get_provider().async_supported.online.read:
response = await store.get_online_features_async(**read_params) # type: ignore
else:
response = await run_in_threadpool(
lambda: store.get_online_features(**read_params) # type: ignore
session_id = raw_request.headers.get("mcp-session-id", "")
feature_refs = ",".join(request.features) if request.features else ""
entity_count = len(next(iter(request.entities.values()), []))

with traced_tool_span(
store,
"feast.get_online_features",
attributes={
"feast.mcp_session_id": session_id,
"feast.feature_refs": feature_refs,
"feast.entity_count": str(entity_count),
"feast.project": store.config.project,
"feast.retrieval_type": "online",
},
):
with feast_metrics.track_request_latency(
"/get-online-features",
) as metrics_ctx:
features = await _get_features(request, store)
feat_count, fv_count = _resolve_feature_counts(features)
metrics_ctx.feature_count = feat_count
metrics_ctx.feature_view_count = fv_count

feast_metrics.track_online_features_entities(entity_count)

read_params = dict(
features=features,
entity_rows=request.entities,
full_feature_names=request.full_feature_names,
include_feature_view_version_metadata=request.include_feature_view_version_metadata,
)

response_dict = await run_in_threadpool(
MessageToDict,
response.proto,
preserving_proto_field_name=True,
float_precision=18,
)
return response_dict
if store._get_provider().async_supported.online.read:
response = await store.get_online_features_async(**read_params) # type: ignore
else:
response = await run_in_threadpool(
lambda: store.get_online_features(**read_params) # type: ignore
)

response_dict = await run_in_threadpool(
MessageToDict,
response.proto,
preserving_proto_field_name=True,
float_precision=18,
)
return response_dict

@app.post(
"/retrieve-online-documents",
dependencies=[Depends(inject_user_details)],
response_model=OnlineFeaturesResponse,
)
async def retrieve_online_documents(
request: GetOnlineDocumentsRequest,
request: GetOnlineDocumentsRequest, raw_request: Request
) -> Any:
with feast_metrics.track_request_latency("/retrieve-online-documents"):
logger.warning(
"This endpoint is in alpha and will be moved to /get-online-features when stable."
)
features = await _get_features(request, store)
from feast.tracing import traced_tool_span

read_params = dict(
features=features,
query=request.query,
top_k=request.top_k,
)
if request.api_version == 2 and request.query_string is not None:
read_params["query_string"] = request.query_string
session_id = raw_request.headers.get("mcp-session-id", "")
feature_refs = ",".join(request.features) if request.features else ""
top_k = str(request.top_k) if request.top_k else ""

if request.api_version == 2:
read_params["include_feature_view_version_metadata"] = (
request.include_feature_view_version_metadata
)
response = await run_in_threadpool(
lambda: store.retrieve_online_documents_v2(**read_params) # type: ignore
with traced_tool_span(
store,
"feast.retrieve_online_documents",
attributes={
"feast.mcp_session_id": session_id,
"feast.feature_refs": feature_refs,
"feast.top_k": top_k,
"feast.project": store.config.project,
"feast.retrieval_type": "document",
},
):
with feast_metrics.track_request_latency("/retrieve-online-documents"):
logger.warning(
"This endpoint is in alpha and will be moved to /get-online-features when stable."
)
else:
response = await run_in_threadpool(
lambda: store.retrieve_online_documents(**read_params) # type: ignore
features = await _get_features(request, store)

read_params = dict(
features=features,
query=request.query,
top_k=request.top_k,
)
if request.api_version == 2 and request.query_string is not None:
read_params["query_string"] = request.query_string

response_dict = await run_in_threadpool(
MessageToDict,
response.proto,
preserving_proto_field_name=True,
float_precision=18,
)
return response_dict
if request.api_version == 2:
read_params["include_feature_view_version_metadata"] = (
request.include_feature_view_version_metadata
)
response = await run_in_threadpool(
lambda: store.retrieve_online_documents_v2(**read_params) # type: ignore
)
else:
response = await run_in_threadpool(
lambda: store.retrieve_online_documents(**read_params) # type: ignore
)

response_dict = await run_in_threadpool(
MessageToDict,
response.proto,
preserving_proto_field_name=True,
float_precision=18,
)
return response_dict

@app.post("/push", dependencies=[Depends(inject_user_details)])
async def push(request: PushFeaturesRequest) -> Response:
Expand Down Expand Up @@ -550,19 +620,34 @@ async def _get_feast_object(
)

@app.post("/write-to-online-store", dependencies=[Depends(inject_user_details)])
async def write_to_online_store(request: WriteToFeatureStoreRequest) -> None:
df = pd.DataFrame(request.df)
feature_view_name = request.feature_view_name
allow_registry_cache = request.allow_registry_cache
resource = await _get_feast_object(feature_view_name, allow_registry_cache)
assert_permissions(resource=resource, actions=[AuthzedAction.WRITE_ONLINE])
await run_in_threadpool(
store.write_to_online_store,
feature_view_name=feature_view_name,
df=df,
allow_registry_cache=allow_registry_cache,
transform_on_write=request.transform_on_write,
)
async def write_to_online_store(
request: WriteToFeatureStoreRequest, raw_request: Request
) -> None:
from feast.tracing import traced_tool_span

session_id = raw_request.headers.get("mcp-session-id", "")

with traced_tool_span(
store,
"feast.write_to_online_store",
attributes={
"feast.mcp_session_id": session_id,
"feast.feature_view": request.feature_view_name,
"feast.project": store.config.project,
},
):
df = pd.DataFrame(request.df)
feature_view_name = request.feature_view_name
allow_registry_cache = request.allow_registry_cache
resource = await _get_feast_object(feature_view_name, allow_registry_cache)
assert_permissions(resource=resource, actions=[AuthzedAction.WRITE_ONLINE])
await run_in_threadpool(
store.write_to_online_store,
feature_view_name=feature_view_name,
df=df,
allow_registry_cache=allow_registry_cache,
transform_on_write=request.transform_on_write,
)

@app.get("/health")
async def health():
Expand Down
34 changes: 33 additions & 1 deletion sdk/python/feast/infra/mcp_servers/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,20 @@ def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastAp
return None

try:
# Create MCP server from the FastAPI app
# Create MCP server from the FastAPI app.
# Forward mcp-session-id so endpoint handlers can tag spans with it.
mcp = FastApiMCP(
app,
name=getattr(config, "mcp_server_name", "feast-feature-store"),
description="Feast Feature Store MCP Server - Access feature store data and operations through MCP",
headers=["authorization", "mcp-session-id"],
)

# Instrument the internal httpx client with OTEL so that trace
# context propagates from the /mcp server span into the internal
# ASGI calls to the actual FastAPI endpoints (Tier 3 enabler).
_instrument_mcp_http_client(mcp)

transport = getattr(config, "mcp_transport", "sse")
if transport == "http":
mount_http = getattr(mcp, "mount_http", None)
Expand Down Expand Up @@ -83,3 +90,28 @@ def add_mcp_support_to_app(app, store: FeatureStore, config) -> Optional["FastAp
except Exception as e:
logger.error(f"Failed to initialize MCP integration: {e}", exc_info=True)
return None


def _instrument_mcp_http_client(mcp: "FastApiMCP") -> None:
"""Instrument fastapi_mcp's internal httpx client with OTEL.

This ensures that when fastapi_mcp makes internal ASGI calls to
the actual FastAPI endpoints, the current OTEL trace context
(from the /mcp server span) is propagated via traceparent headers.
Without this, the endpoint spans would be orphaned from the
incoming trace.
"""
try:
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

http_client = getattr(mcp, "_http_client", None)
if http_client is not None:
HTTPXClientInstrumentor.instrument_client(http_client)
logger.info("MCP internal httpx client instrumented for trace propagation")
else:
logger.debug("Could not access fastapi_mcp internal httpx client")
except ImportError:
logger.debug(
"opentelemetry-instrumentation-httpx not installed; "
"internal trace propagation disabled"
)
Loading
Loading