services.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. from core_shared import JSONValue
  2. from core_shared.secrets import EncryptedSecret, SecretCipher
  3. from app.db.models import ToolBinding, ToolCredential, ToolDefinition, ToolVersion
  4. from app.domain.repositories import (
  5. ToolBindingRepository,
  6. ToolCredentialRepository,
  7. ToolDefinitionRepository,
  8. ToolVersionRepository,
  9. )
  10. from app.schemas.tool import (
  11. ToolBindingCreateRequest,
  12. ToolCreateRequest,
  13. ToolCredentialCreateRequest,
  14. ToolVersionCreateRequest,
  15. )
  16. class ToolApplicationService:
  17. def __init__(
  18. self,
  19. tool_definition_repository: ToolDefinitionRepository,
  20. tool_version_repository: ToolVersionRepository,
  21. tool_binding_repository: ToolBindingRepository,
  22. tool_credential_repository: ToolCredentialRepository,
  23. secret_cipher: SecretCipher) -> None:
  24. self.tool_definition_repository = tool_definition_repository
  25. self.tool_version_repository = tool_version_repository
  26. self.tool_binding_repository = tool_binding_repository
  27. self.tool_credential_repository = tool_credential_repository
  28. self.secret_cipher = secret_cipher
  29. def create_tool_definition(self, payload: ToolCreateRequest) -> ToolDefinition:
  30. return self.tool_definition_repository.create(
  31. plugin_id=payload.plugin_id,
  32. code=payload.code,
  33. name=payload.name,
  34. tool_type=payload.tool_type,
  35. description=payload.description)
  36. def list_tool_definitions(self) -> list[ToolDefinition]:
  37. return self.tool_definition_repository.list_all()
  38. def create_tool_version(self, payload: ToolVersionCreateRequest) -> ToolVersion:
  39. return self.tool_version_repository.create(
  40. tool_id=payload.tool_id,
  41. input_schema_json=payload.input_schema_json,
  42. output_schema_json=payload.output_schema_json,
  43. invoke_config_json=payload.invoke_config_json,
  44. timeout_ms=payload.timeout_ms,
  45. retry_policy_json=payload.retry_policy_json)
  46. def list_tool_versions(self, tool_id: str | None = None) -> list[ToolVersion]:
  47. if tool_id is None:
  48. return self.tool_version_repository.list_all()
  49. return self.tool_version_repository.list_by_tool(tool_id=tool_id)
  50. def create_tool_binding(self, payload: ToolBindingCreateRequest) -> ToolBinding:
  51. if payload.credential_id is not None:
  52. credential = self.tool_credential_repository.get_by_id(
  53. credential_id=payload.credential_id)
  54. if credential is None:
  55. raise ValueError(f"tool credential not found: {payload.credential_id}")
  56. return self.tool_binding_repository.create(
  57. app_id=payload.app_id,
  58. tool_version_id=payload.tool_version_id,
  59. credential_id=payload.credential_id,
  60. binding_scope=payload.binding_scope,
  61. enabled=payload.enabled,
  62. config_json=payload.config_json)
  63. def list_tool_bindings(self, app_id: str | None = None) -> list[ToolBinding]:
  64. return self.tool_binding_repository.list_by_scope(app_id=app_id)
  65. def create_tool_credential(self, payload: ToolCredentialCreateRequest) -> ToolCredential:
  66. encrypted = self.secret_cipher.encrypt_json(payload.secret_json)
  67. return self.tool_credential_repository.create(
  68. name=payload.name,
  69. credential_type=payload.credential_type,
  70. encrypted_payload_text=encrypted.ciphertext,
  71. secret_fingerprint=encrypted.fingerprint,
  72. encryption_algorithm=encrypted.algorithm,
  73. metadata_json=payload.metadata_json)
  74. def list_tool_credentials(self) -> list[ToolCredential]:
  75. return self.tool_credential_repository.list_all()
  76. def reveal_tool_credential(
  77. self,
  78. *,
  79. credential_id: str) -> tuple[ToolCredential, dict[str, JSONValue]] | None:
  80. credential = self.tool_credential_repository.get_by_id(
  81. credential_id=credential_id)
  82. if credential is None:
  83. return None
  84. secret_json = self.secret_cipher.decrypt_json(
  85. EncryptedSecret(
  86. ciphertext=credential.encrypted_payload_text,
  87. fingerprint=credential.secret_fingerprint,
  88. algorithm=credential.encryption_algorithm)
  89. )
  90. return credential, secret_json
  91. def get_tool_binding_detail(
  92. self,
  93. *,
  94. binding_id: str) -> tuple[ToolBinding, ToolVersion, ToolDefinition] | None:
  95. binding = self.tool_binding_repository.get_by_id(binding_id=binding_id)
  96. if binding is None:
  97. return None
  98. tool_version = self.tool_version_repository.get_by_id(
  99. tool_version_id=binding.tool_version_id)
  100. if tool_version is None:
  101. return None
  102. tool_definition = self.tool_definition_repository.get_by_id(
  103. tool_id=tool_version.tool_id)
  104. if tool_definition is None:
  105. return None
  106. return binding, tool_version, tool_definition