| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117 |
- import hashlib
- import math
- import re
- from collections import Counter
- from core_shared import JSONValue
- TOKEN_PATTERN = re.compile(r"[\w\u4e00-\u9fff]+", re.UNICODE)
- def split_text(text: str, *, chunk_size: int, chunk_overlap: int) -> list[str]:
- normalized_text = text.strip()
- if not normalized_text:
- return []
- safe_overlap = min(chunk_overlap, max(chunk_size - 1, 0))
- chunks: list[str] = []
- start = 0
- while start < len(normalized_text):
- end = min(start + chunk_size, len(normalized_text))
- chunks.append(normalized_text[start:end])
- if end == len(normalized_text):
- break
- start = end - safe_overlap
- return chunks
- def tokenize(text: str) -> list[str]:
- return [item.lower() for item in TOKEN_PATTERN.findall(text)]
- def build_hash_embedding(text: str, *, dimensions: int) -> list[float]:
- vector = [0.0 for _ in range(dimensions)]
- tokens = tokenize(text)
- if not tokens:
- return vector
- for token in tokens:
- digest = hashlib.sha256(token.encode("utf-8")).digest()
- index = int.from_bytes(digest[:4], "big") % dimensions
- sign = 1.0 if digest[4] % 2 == 0 else -1.0
- vector[index] += sign
- norm = math.sqrt(sum(item * item for item in vector))
- if norm == 0:
- return vector
- return [round(item / norm, 6) for item in vector]
- def cosine_similarity(left: list[float] | None, right: list[float] | None) -> float:
- if not left or not right or len(left) != len(right):
- return 0.0
- left_norm = math.sqrt(sum(item * item for item in left))
- right_norm = math.sqrt(sum(item * item for item in right))
- if left_norm == 0 or right_norm == 0:
- return 0.0
- return sum(a * b for a, b in zip(left, right, strict=True)) / (left_norm * right_norm)
- def keyword_score(query: str, text: str) -> float:
- query_tokens = tokenize(query)
- if not query_tokens:
- return 0.0
- text_counts = Counter(tokenize(text))
- if not text_counts:
- return 0.0
- matched = sum(1 for token in query_tokens if token in text_counts)
- frequency = sum(text_counts.get(token, 0) for token in query_tokens)
- return matched / len(set(query_tokens)) + min(frequency / 20.0, 1.0)
- def rerank_score(*, query: str, chunk_text: str, document_title: str | None = None) -> float:
- query_tokens = tokenize(query)
- if not query_tokens:
- return 0.0
- chunk_tokens = tokenize(chunk_text)
- title_tokens = tokenize(document_title or "")
- if not chunk_tokens and not title_tokens:
- return 0.0
- unique_query_tokens = set(query_tokens)
- chunk_token_set = set(chunk_tokens)
- title_token_set = set(title_tokens)
- coverage = len(unique_query_tokens & chunk_token_set) / len(unique_query_tokens)
- title_bonus = min(len(unique_query_tokens & title_token_set) / len(unique_query_tokens), 1.0)
- phrase_bonus = 1.0 if query.lower() in chunk_text.lower() else 0.0
- density = sum(1 for token in chunk_tokens if token in unique_query_tokens) / max(
- len(chunk_tokens),
- 1,
- )
- return min(coverage * 0.55 + title_bonus * 0.2 + phrase_bonus * 0.15 + density * 0.1, 1.0)
- def stable_content_hash(text: str) -> str:
- return hashlib.sha256(text.encode("utf-8")).hexdigest()
- def build_chunk_payloads(
- *,
- content_text: str,
- chunk_size: int,
- chunk_overlap: int,
- ) -> list[dict[str, JSONValue]]:
- chunks = split_text(
- content_text,
- chunk_size=chunk_size,
- chunk_overlap=chunk_overlap,
- )
- payloads: list[dict[str, JSONValue]] = []
- for index, chunk_text in enumerate(chunks):
- payloads.append(
- {
- "chunk_index": index,
- "content_text": chunk_text,
- "token_count": len(tokenize(chunk_text)),
- "embedding_model": None,
- "embedding_json": None,
- "metadata_json": {},
- }
- )
- return payloads
|