from pathlib import Path from tests.conftest import ( build_fastapi_test_client, build_postgres_database_url, build_postgres_engine, prepare_known_service_import, ) def test_memory_service_post_contract_matches_frontend( tmp_path: Path, monkeypatch, ) -> None: prepare_known_service_import("memory-service") from app.bootstrap.app import create_app from app.db.models import Base from core_db import create_session_factory database_url = build_postgres_database_url(tmp_path, "memory-api") monkeypatch.setenv("AGENT_PLATFORM_DATABASE_URL", database_url) monkeypatch.setenv("AGENT_PLATFORM_REDIS_URL", "") engine = build_postgres_engine(database_url) Base.metadata.create_all(engine) app = create_app() app.state.session_factory = create_session_factory(engine) client = build_fastapi_test_client(app) create_response = client.post( "/memories/create", json={ "scopeType": "user", "scopeId": "user-demo", "memoryType": "preference", "contentText": "用户喜欢先看结论,再看细节。", "metadata": {"source": "test"}, "importanceScore": 80, }, ) assert create_response.status_code == 200 memory = create_response.json()["data"] assert memory["scopeType"] == "user" assert memory["contentText"] == "用户喜欢先看结论,再看细节。" assert memory["createdTime"] assert "scope_type" not in memory list_response = client.post( "/memories/list", json={"page": 1, "pageSize": 20, "scopeType": "user", "keyword": "结论"}, ) assert list_response.status_code == 200 list_payload = list_response.json()["data"] assert list_payload["total"] == 1 assert list_payload["items"][0]["id"] == memory["id"] search_response = client.post( "/memories/search/query", json={"query": "用户偏好", "scopeType": "user", "scopeId": "user-demo", "limit": 5}, ) assert search_response.status_code == 200 search_payload = search_response.json()["data"] assert search_payload assert "scoreDetails" in search_payload[0] assert "score_json" not in search_payload[0] detail_response = client.post("/memories/detail", json={"memoryId": memory["id"]}) assert detail_response.status_code == 200 assert detail_response.json()["data"]["lastAccessedTime"] is not None update_response = client.post( "/memories/update", json={ "memoryId": memory["id"], "contentText": "用户喜欢先看结论,然后再展开执行细节。", "importanceScore": 90, }, ) assert update_response.status_code == 200 assert update_response.json()["data"]["importanceScore"] == 90 status_response = client.post( "/memories/status", json={"memoryId": memory["id"], "status": "archived"}, ) assert status_response.status_code == 200 assert status_response.json()["data"]["status"] == "archived" delete_response = client.post("/memories/delete", json={"memoryId": memory["id"]}) assert delete_response.status_code == 200 assert delete_response.json()["data"] == {"deleted": True, "memoryId": memory["id"]}