|
| 1 | +import hashlib |
| 2 | +from typing import List, Literal, Optional, Type, Union |
| 3 | + |
| 4 | +from pydantic import ConfigDict, Field |
| 5 | + |
| 6 | +from inference.core.cache.lru_cache import LRUCache |
| 7 | +from inference.core.entities.requests.perception_encoder import ( |
| 8 | + PerceptionEncoderImageEmbeddingRequest, |
| 9 | + PerceptionEncoderTextEmbeddingRequest, |
| 10 | +) |
| 11 | +from inference.core.env import ( |
| 12 | + HOSTED_CORE_MODEL_URL, |
| 13 | + LOCAL_INFERENCE_API_URL, |
| 14 | + WORKFLOWS_REMOTE_API_TARGET, |
| 15 | +) |
| 16 | +from inference.core.managers.base import ModelManager |
| 17 | +from inference.core.workflows.core_steps.common.entities import StepExecutionMode |
| 18 | +from inference.core.workflows.core_steps.common.utils import load_core_model |
| 19 | +from inference.core.workflows.execution_engine.entities.base import ( |
| 20 | + OutputDefinition, |
| 21 | + WorkflowImageData, |
| 22 | +) |
| 23 | +from inference.core.workflows.execution_engine.entities.types import ( |
| 24 | + EMBEDDING_KIND, |
| 25 | + IMAGE_KIND, |
| 26 | + STRING_KIND, |
| 27 | + Selector, |
| 28 | +) |
| 29 | +from inference.core.workflows.prototypes.block import ( |
| 30 | + BlockResult, |
| 31 | + WorkflowBlock, |
| 32 | + WorkflowBlockManifest, |
| 33 | +) |
| 34 | +from inference_sdk import InferenceHTTPClient |
| 35 | + |
| 36 | +LONG_DESCRIPTION = """ |
| 37 | +Use the Meta Perception Encoder model to create semantic embeddings of text and images. |
| 38 | +
|
| 39 | +This block accepts an image or string and returns an embedding. The embedding can be used to compare |
| 40 | +similarity between different images or between images and text. |
| 41 | +""" |
| 42 | + |
| 43 | + |
| 44 | +class BlockManifest(WorkflowBlockManifest): |
| 45 | + model_config = ConfigDict( |
| 46 | + json_schema_extra={ |
| 47 | + "name": "Perception Encoder Embedding Model", |
| 48 | + "version": "v1", |
| 49 | + "short_description": "Generate an embedding of an image or string.", |
| 50 | + "long_description": LONG_DESCRIPTION, |
| 51 | + "license": "MIT", |
| 52 | + "block_type": "model", |
| 53 | + "ui_manifest": { |
| 54 | + "section": "model", |
| 55 | + "icon": "far fa-paperclip", |
| 56 | + "blockPriority": 9.9, |
| 57 | + }, |
| 58 | + } |
| 59 | + ) |
| 60 | + type: Literal["roboflow_core/perception_encoder@v1"] |
| 61 | + name: str = Field(description="Unique name of step in workflows") |
| 62 | + data: Union[Selector(kind=[IMAGE_KIND, STRING_KIND]), str] = Field( |
| 63 | + title="Data", |
| 64 | + description="The string or image to generate an embedding for.", |
| 65 | + examples=["$inputs.image", "$steps.cropping.crops"], |
| 66 | + ) |
| 67 | + version: Union[ |
| 68 | + Literal[ |
| 69 | + "PE-Core-B16-224", |
| 70 | + "PE-Core-L14-336", |
| 71 | + "PE-Core-G14-448", |
| 72 | + ], |
| 73 | + Selector(kind=[STRING_KIND]), |
| 74 | + ] = Field( |
| 75 | + default="PE-Core-L14-336", |
| 76 | + description="Variant of Perception Encoder model", |
| 77 | + examples=["PE-Core-B16-224", "$inputs.variant"], |
| 78 | + ) |
| 79 | + |
| 80 | + @classmethod |
| 81 | + def describe_outputs(cls) -> List[OutputDefinition]: |
| 82 | + return [OutputDefinition(name="embedding", kind=[EMBEDDING_KIND])] |
| 83 | + |
| 84 | + @classmethod |
| 85 | + def get_execution_engine_compatibility(cls) -> Optional[str]: |
| 86 | + return ">=1.3.0,<2.0.0" |
| 87 | + |
| 88 | + |
| 89 | +text_cache = LRUCache() |
| 90 | + |
| 91 | + |
| 92 | +class PerceptionEncoderModelBlockV1(WorkflowBlock): |
| 93 | + def __init__( |
| 94 | + self, |
| 95 | + model_manager: ModelManager, |
| 96 | + api_key: Optional[str], |
| 97 | + step_execution_mode: StepExecutionMode, |
| 98 | + ): |
| 99 | + self._model_manager = model_manager |
| 100 | + self._api_key = api_key |
| 101 | + self._step_execution_mode = step_execution_mode |
| 102 | + |
| 103 | + @classmethod |
| 104 | + def get_init_parameters(cls) -> List[str]: |
| 105 | + return ["model_manager", "api_key", "step_execution_mode"] |
| 106 | + |
| 107 | + @classmethod |
| 108 | + def get_manifest(cls) -> Type[WorkflowBlockManifest]: |
| 109 | + return BlockManifest |
| 110 | + |
| 111 | + def run( |
| 112 | + self, |
| 113 | + data: Union[WorkflowImageData, str], |
| 114 | + version: str, |
| 115 | + ) -> BlockResult: |
| 116 | + if self._step_execution_mode is StepExecutionMode.LOCAL: |
| 117 | + return self.run_locally(data=data, version=version) |
| 118 | + elif self._step_execution_mode is StepExecutionMode.REMOTE: |
| 119 | + return self.run_remotely(data=data, version=version) |
| 120 | + else: |
| 121 | + raise ValueError( |
| 122 | + f"Unknown step execution mode: {self._step_execution_mode}" |
| 123 | + ) |
| 124 | + |
| 125 | + def run_locally( |
| 126 | + self, |
| 127 | + data: Union[WorkflowImageData, str], |
| 128 | + version: str, |
| 129 | + ) -> BlockResult: |
| 130 | + if isinstance(data, str): |
| 131 | + hash_key = hashlib.md5((version + data).encode("utf-8")).hexdigest() |
| 132 | + cached_value = text_cache.get(hash_key) |
| 133 | + if cached_value is not None: |
| 134 | + return {"embedding": cached_value} |
| 135 | + inference_request = PerceptionEncoderTextEmbeddingRequest( |
| 136 | + perception_encoder_version_id=version, |
| 137 | + text=[data], |
| 138 | + api_key=self._api_key, |
| 139 | + ) |
| 140 | + pe_model_id = load_core_model( |
| 141 | + model_manager=self._model_manager, |
| 142 | + inference_request=inference_request, |
| 143 | + core_model="perception_encoder", |
| 144 | + ) |
| 145 | + predictions = self._model_manager.infer_from_request_sync( |
| 146 | + pe_model_id, inference_request |
| 147 | + ) |
| 148 | + text_cache.set(hash_key, predictions.embeddings[0]) |
| 149 | + return {"embedding": predictions.embeddings[0]} |
| 150 | + else: |
| 151 | + inference_request = PerceptionEncoderImageEmbeddingRequest( |
| 152 | + perception_encoder_version_id=version, |
| 153 | + image=[data.to_inference_format(numpy_preferred=True)], |
| 154 | + api_key=self._api_key, |
| 155 | + ) |
| 156 | + pe_model_id = load_core_model( |
| 157 | + model_manager=self._model_manager, |
| 158 | + inference_request=inference_request, |
| 159 | + core_model="perception_encoder", |
| 160 | + ) |
| 161 | + predictions = self._model_manager.infer_from_request_sync( |
| 162 | + pe_model_id, inference_request |
| 163 | + ) |
| 164 | + return {"embedding": predictions.embeddings[0]} |
| 165 | + |
| 166 | + def run_remotely( |
| 167 | + self, |
| 168 | + data: Union[WorkflowImageData, str], |
| 169 | + version: str, |
| 170 | + ) -> BlockResult: |
| 171 | + api_url = ( |
| 172 | + LOCAL_INFERENCE_API_URL |
| 173 | + if WORKFLOWS_REMOTE_API_TARGET != "hosted" |
| 174 | + else HOSTED_CORE_MODEL_URL |
| 175 | + ) |
| 176 | + client = InferenceHTTPClient(api_url=api_url, api_key=self._api_key) |
| 177 | + if WORKFLOWS_REMOTE_API_TARGET == "hosted": |
| 178 | + client.select_api_v0() |
| 179 | + if isinstance(data, str): |
| 180 | + result = client.get_perception_encoder_text_embeddings( |
| 181 | + text=data, |
| 182 | + perception_encoder_version=version, |
| 183 | + ) |
| 184 | + else: |
| 185 | + result = client.get_perception_encoder_image_embeddings( |
| 186 | + inference_input=data.base64_image, |
| 187 | + perception_encoder_version=version, |
| 188 | + ) |
| 189 | + return {"embedding": result["embeddings"][0]} |
0 commit comments