-
-
Notifications
You must be signed in to change notification settings - Fork 52
Expand file tree
/
Copy pathgoogle_gemini.py
More file actions
3400 lines (3008 loc) · 138 KB
/
google_gemini.py
File metadata and controls
3400 lines (3008 loc) · 138 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
"""
title: Google Gemini Pipeline
author: owndev, olivier-lacroix
author_url: https://github.com/owndev/
project_url: https://github.com/owndev/Open-WebUI-Functions
funding_url: https://github.com/sponsors/owndev
version: 1.14.1
required_open_webui_version: 0.8.0
license: Apache License 2.0
description: Highly optimized Google Gemini pipeline with advanced image and video generation capabilities, intelligent compression, and streamlined processing workflows.
features:
- Optimized asynchronous API calls for maximum performance
- Intelligent model caching with configurable TTL
- Streamlined dynamic model specification with automatic prefix handling
- Smart streaming response handling with safety checks
- Advanced multimodal input support (text and images)
- Unified image generation and editing with Gemini 2.5 Flash Image Preview
- Intelligent image optimization with size-aware compression algorithms
- Automated image upload to Open WebUI with robust fallback support
- Optimized text-to-image and image-to-image workflows
- Non-streaming mode for image generation to prevent chunk overflow
- Progressive status updates for optimal user experience
- Consolidated error handling and comprehensive logging
- Seamless Google Generative AI and Vertex AI integration
- Advanced generation parameters (temperature, max tokens, etc.)
- Configurable safety settings with environment variable support
- Military-grade encrypted storage of sensitive API keys
- Intelligent grounding with Google search integration
- Vertex AI Search grounding for RAG
- Native tool calling support with automatic signature management
- URL context grounding for specified web pages
- Unified image processing with consolidated helper methods
- Optimized payload creation for image generation models
- Configurable image processing parameters (size, quality, compression)
- Flexible upload fallback options and optimization controls
- Configurable thinking levels for Gemini 3 models with model-specific validation
- Configurable thinking budgets (0-32768 tokens) for Gemini 2.5 models
- Configurable image generation aspect ratio (1:1, 16:9, etc.) and resolution (1K, 2K, 4K)
- Model whitelist for filtering available models
- Additional model support for SDK-unsupported models
- Video generation with Google Veo models (Veo 3.1, 3, 2)
- Configurable video generation parameters (aspect ratio, resolution, duration)
- Asynchronous video generation with progressive polling status updates
- Automatic video upload to Open WebUI with embedded playback
- Image-to-video generation support for Veo models
- Negative prompt and person generation controls for video
"""
import os
import re
import time
import asyncio
import base64
import hashlib
import logging
import io
import uuid
import aiofiles
from PIL import Image
from google import genai
from google.genai import types
from google.genai.errors import ClientError, ServerError, APIError
from typing import List, Union, Optional, Dict, Any, Tuple, AsyncIterator, Callable
from pydantic_core import core_schema
from pydantic import BaseModel, Field, GetCoreSchemaHandler
from cryptography.fernet import Fernet, InvalidToken
from open_webui.env import SRC_LOG_LEVELS
from fastapi import Request, UploadFile, BackgroundTasks
from open_webui.routers.files import upload_file
from open_webui.models.users import UserModel, Users
from starlette.datastructures import Headers
ASPECT_RATIO_OPTIONS: List[str] = [
"default",
"1:1",
"2:3",
"3:2",
"3:4",
"4:3",
"4:5",
"5:4",
"9:16",
"16:9",
"21:9",
]
RESOLUTION_OPTIONS: List[str] = [
"default",
"1K",
"2K",
"4K",
]
VIDEO_ASPECT_RATIO_OPTIONS: List[str] = [
"default",
"16:9",
"9:16",
]
VIDEO_RESOLUTION_OPTIONS: List[str] = [
"default",
"720p",
"1080p",
"4k",
]
VIDEO_DURATION_OPTIONS: List[str] = [
"default",
"4",
"5",
"6",
"8",
]
VIDEO_PERSON_GENERATION_OPTIONS: List[str] = [
"default",
"allow_all",
"allow_adult",
"dont_allow",
]
# Simplified encryption implementation with automatic handling
class EncryptedStr(str):
"""A string type that automatically handles encryption/decryption"""
@classmethod
def _get_encryption_key(cls) -> Optional[bytes]:
"""
Generate encryption key from WEBUI_SECRET_KEY if available
Returns None if no key is configured
"""
secret = os.getenv("WEBUI_SECRET_KEY")
if not secret:
return None
hashed_key = hashlib.sha256(secret.encode()).digest()
return base64.urlsafe_b64encode(hashed_key)
@classmethod
def encrypt(cls, value: str) -> str:
"""
Encrypt a string value if a key is available
Returns the original value if no key is available
"""
if not value or value.startswith("encrypted:"):
return value
key = cls._get_encryption_key()
if not key: # No encryption if no key
return value
f = Fernet(key)
encrypted = f.encrypt(value.encode())
return f"encrypted:{encrypted.decode()}"
@classmethod
def decrypt(cls, value: str) -> str:
"""
Decrypt an encrypted string value if a key is available
Returns the original value if no key is available or decryption fails
"""
if not value or not value.startswith("encrypted:"):
return value
key = cls._get_encryption_key()
if not key: # No decryption if no key
return value[len("encrypted:") :] # Return without prefix
try:
encrypted_part = value[len("encrypted:") :]
f = Fernet(key)
decrypted = f.decrypt(encrypted_part.encode())
return decrypted.decode()
except (InvalidToken, Exception):
return value
# Pydantic integration
@classmethod
def __get_pydantic_core_schema__(
cls, _source_type: Any, _handler: GetCoreSchemaHandler
) -> core_schema.CoreSchema:
return core_schema.union_schema(
[
core_schema.is_instance_schema(cls),
core_schema.chain_schema(
[
core_schema.str_schema(),
core_schema.no_info_plain_validator_function(
lambda value: cls(cls.encrypt(value) if value else value)
),
]
),
],
serialization=core_schema.plain_serializer_function_ser_schema(
lambda instance: str(instance)
),
)
class Pipe:
"""
Pipeline for interacting with Google Gemini models.
"""
# User-overridable configuration valves
class UserValves(BaseModel):
IMAGE_GENERATION_ASPECT_RATIO: str = Field(
default=os.getenv("GOOGLE_IMAGE_GENERATION_ASPECT_RATIO", "default"),
description="Default aspect ratio for image generation.",
json_schema_extra={"enum": ASPECT_RATIO_OPTIONS},
)
IMAGE_GENERATION_RESOLUTION: str = Field(
default=os.getenv("GOOGLE_IMAGE_GENERATION_RESOLUTION", "default"),
description="Default resolution for image generation.",
json_schema_extra={"enum": RESOLUTION_OPTIONS},
)
VIDEO_GENERATION_ASPECT_RATIO: str = Field(
default=os.getenv("GOOGLE_VIDEO_GENERATION_ASPECT_RATIO", "default"),
description="Default aspect ratio for video generation (16:9 landscape or 9:16 portrait).",
json_schema_extra={"enum": VIDEO_ASPECT_RATIO_OPTIONS},
)
VIDEO_GENERATION_RESOLUTION: str = Field(
default=os.getenv("GOOGLE_VIDEO_GENERATION_RESOLUTION", "default"),
description="Default resolution for video generation (720p, 1080p, or 4k).",
json_schema_extra={"enum": VIDEO_RESOLUTION_OPTIONS},
)
VIDEO_GENERATION_DURATION: str = Field(
default=os.getenv("GOOGLE_VIDEO_GENERATION_DURATION", "default"),
description="Default duration in seconds for video generation (4, 5, 6, or 8 - availability varies by model).",
json_schema_extra={"enum": VIDEO_DURATION_OPTIONS},
)
# Configuration valves for the pipeline
class Valves(BaseModel):
BASE_URL: str = Field(
default=os.getenv(
"GOOGLE_GENAI_BASE_URL", "https://generativelanguage.googleapis.com/"
),
description="Base URL for the Google Generative AI API.",
)
GOOGLE_API_KEY: EncryptedStr = Field(
default=os.getenv("GOOGLE_API_KEY", ""),
description="API key for Google Generative AI (used if USE_VERTEX_AI is false).",
json_schema_extra={"input": {"type": "password"}},
)
API_VERSION: str = Field(
default=os.getenv("GOOGLE_API_VERSION", "v1alpha"),
description="API version to use for Google Generative AI (e.g., v1alpha, v1beta, v1).",
)
STREAMING_ENABLED: bool = Field(
default=os.getenv("GOOGLE_STREAMING_ENABLED", "true").lower() == "true",
description="Enable streaming responses (set false to force non-streaming mode).",
)
INCLUDE_THOUGHTS: bool = Field(
default=os.getenv("GOOGLE_INCLUDE_THOUGHTS", "true").lower() == "true",
description="Enable Gemini thoughts outputs (set false to disable).",
)
THINKING_BUDGET: int = Field(
default=int(os.getenv("GOOGLE_THINKING_BUDGET", "-1")),
description="Thinking budget for Gemini 2.5 models (0=disabled, -1=dynamic, 1-32768=fixed token limit). "
"Not used for Gemini 3 models which use THINKING_LEVEL instead.",
)
THINKING_LEVEL: str = Field(
default=os.getenv("GOOGLE_THINKING_LEVEL", ""),
description="Thinking level for Gemini 3 models. Most Gemini 3 models support 'low'/'high', "
"while gemini-3.1-flash-image-preview supports 'minimal'/'high'. "
"Ignored for other models. Empty string means use model default.",
)
USE_VERTEX_AI: bool = Field(
default=os.getenv("GOOGLE_GENAI_USE_VERTEXAI", "false").lower() == "true",
description="Whether to use Google Cloud Vertex AI instead of the Google Generative AI API.",
)
VERTEX_PROJECT: str | None = Field(
default=os.getenv("GOOGLE_CLOUD_PROJECT"),
description="The Google Cloud project ID to use with Vertex AI.",
)
VERTEX_LOCATION: str = Field(
default=os.getenv("GOOGLE_CLOUD_LOCATION", "global"),
description="The Google Cloud region to use with Vertex AI.",
)
VERTEX_AI_RAG_STORE: str | None = Field(
default=os.getenv("GOOGLE_VERTEX_AI_RAG_STORE"),
description="Vertex AI RAG Store path for grounding (e.g., projects/PROJECT/locations/LOCATION/ragCorpora/DATA_STORE_ID). Only used when USE_VERTEX_AI is true.",
)
USE_PERMISSIVE_SAFETY: bool = Field(
default=os.getenv("GOOGLE_USE_PERMISSIVE_SAFETY", "false").lower()
== "true",
description="Use permissive safety settings for content generation.",
)
MODEL_CACHE_TTL: int = Field(
default=int(os.getenv("GOOGLE_MODEL_CACHE_TTL", "600")),
description="Time in seconds to cache the model list before refreshing",
)
RETRY_COUNT: int = Field(
default=int(os.getenv("GOOGLE_RETRY_COUNT", "2")),
description="Number of times to retry API calls on temporary failures",
)
DEFAULT_SYSTEM_PROMPT: str = Field(
default=os.getenv("GOOGLE_DEFAULT_SYSTEM_PROMPT", ""),
description="Default system prompt applied to all chats. If a user-defined system prompt exists, "
"this is prepended to it. Leave empty to disable.",
)
ENABLE_FORWARD_USER_INFO_HEADERS: bool = Field(
default=os.getenv(
"GOOGLE_ENABLE_FORWARD_USER_INFO_HEADERS", "false"
).lower()
== "true",
description="Whether to forward user information headers.",
)
MODEL_ADDITIONAL: str = Field(
default=os.getenv("GOOGLE_MODEL_ADDITIONAL", ""),
description="A comma-separated list of model IDs to manually add to the list of available models. "
"These are models not returned by the SDK but that you want to make available. "
"Non-Gemini model IDs must be explicitly included in MODEL_WHITELIST to be available.",
)
MODEL_WHITELIST: str = Field(
default=os.getenv("GOOGLE_MODEL_WHITELIST", ""),
description="A comma-separated list of model IDs to show in the models list. "
"If set, only these models will be available (after MODEL_ADDITIONAL is applied). "
"Leave empty to show all models.",
)
USE_ENTERPRISE_WEB_SEARCH: bool = Field(
default=os.getenv("GOOGLE_USE_ENTERPRISE_WEB_SEARCH", "false").lower()
== "true",
description="Whether to use Enterprise Web Search instead of standard Google search when grounding is enabled. "
"Only available on Vertex AI.",
)
# Image Processing Configuration
IMAGE_GENERATION_ASPECT_RATIO: str = Field(
default=os.getenv("GOOGLE_IMAGE_GENERATION_ASPECT_RATIO", "default"),
description="Default aspect ratio for image generation.",
json_schema_extra={"enum": ASPECT_RATIO_OPTIONS},
)
IMAGE_GENERATION_RESOLUTION: str = Field(
default=os.getenv("GOOGLE_IMAGE_GENERATION_RESOLUTION", "default"),
description="Default resolution for image generation.",
json_schema_extra={"enum": RESOLUTION_OPTIONS},
)
IMAGE_MAX_SIZE_MB: float = Field(
default=float(os.getenv("GOOGLE_IMAGE_MAX_SIZE_MB", "15.0")),
description="Maximum image size in MB before compression is applied",
)
IMAGE_MAX_DIMENSION: int = Field(
default=int(os.getenv("GOOGLE_IMAGE_MAX_DIMENSION", "2048")),
description="Maximum width or height in pixels before resizing",
)
IMAGE_COMPRESSION_QUALITY: int = Field(
default=int(os.getenv("GOOGLE_IMAGE_COMPRESSION_QUALITY", "85")),
description="JPEG compression quality (1-100, higher = better quality but larger size)",
)
IMAGE_ENABLE_OPTIMIZATION: bool = Field(
default=os.getenv("GOOGLE_IMAGE_ENABLE_OPTIMIZATION", "true").lower()
== "true",
description="Enable intelligent image optimization for API compatibility",
)
IMAGE_PNG_COMPRESSION_THRESHOLD_MB: float = Field(
default=float(os.getenv("GOOGLE_IMAGE_PNG_THRESHOLD_MB", "0.5")),
description="PNG files above this size (MB) will be converted to JPEG for better compression",
)
IMAGE_HISTORY_MAX_REFERENCES: int = Field(
default=int(os.getenv("GOOGLE_IMAGE_HISTORY_MAX_REFERENCES", "5")),
description="Maximum total number of images (history + current message) to include in a generation call",
)
IMAGE_ADD_LABELS: bool = Field(
default=os.getenv("GOOGLE_IMAGE_ADD_LABELS", "true").lower() == "true",
description="If true, add small text labels like [Image 1] before each image part so the model can reference them.",
)
IMAGE_DEDUP_HISTORY: bool = Field(
default=os.getenv("GOOGLE_IMAGE_DEDUP_HISTORY", "true").lower() == "true",
description="If true, deduplicate identical images (by hash) when constructing history context",
)
IMAGE_HISTORY_FIRST: bool = Field(
default=os.getenv("GOOGLE_IMAGE_HISTORY_FIRST", "true").lower() == "true",
description="If true (default), history images precede current message images; if false, current images first.",
)
# Video Generation Configuration (Veo models)
VIDEO_GENERATION_ASPECT_RATIO: str = Field(
default=os.getenv("GOOGLE_VIDEO_GENERATION_ASPECT_RATIO", "default"),
description="Default aspect ratio for video generation (16:9 landscape or 9:16 portrait).",
json_schema_extra={"enum": VIDEO_ASPECT_RATIO_OPTIONS},
)
VIDEO_GENERATION_RESOLUTION: str = Field(
default=os.getenv("GOOGLE_VIDEO_GENERATION_RESOLUTION", "default"),
description="Default resolution for video generation (720p, 1080p, or 4k).",
json_schema_extra={"enum": VIDEO_RESOLUTION_OPTIONS},
)
VIDEO_GENERATION_DURATION: str = Field(
default=os.getenv("GOOGLE_VIDEO_GENERATION_DURATION", "default"),
description="Default duration in seconds for video generation (4, 5, 6, or 8 - availability varies by model).",
json_schema_extra={"enum": VIDEO_DURATION_OPTIONS},
)
VIDEO_GENERATION_NEGATIVE_PROMPT: str = Field(
default=os.getenv("GOOGLE_VIDEO_GENERATION_NEGATIVE_PROMPT", ""),
description="Default negative prompt for video generation (describes what not to include).",
)
VIDEO_GENERATION_PERSON_GENERATION: str = Field(
default=os.getenv("GOOGLE_VIDEO_GENERATION_PERSON_GENERATION", "default"),
description="Controls generation of people in videos (allow_all, allow_adult, dont_allow).",
json_schema_extra={"enum": VIDEO_PERSON_GENERATION_OPTIONS},
)
VIDEO_GENERATION_ENHANCE_PROMPT: bool = Field(
default=os.getenv("GOOGLE_VIDEO_GENERATION_ENHANCE_PROMPT", "true").lower()
== "true",
description="Enable prompt enhancement for video generation.",
)
VIDEO_POLL_INTERVAL: int = Field(
default=int(os.getenv("GOOGLE_VIDEO_POLL_INTERVAL", "10")),
description="Polling interval in seconds when waiting for video generation to complete.",
)
VIDEO_POLL_TIMEOUT: int = Field(
default=int(os.getenv("GOOGLE_VIDEO_POLL_TIMEOUT", "600")),
description="Maximum time in seconds to wait for video generation before timing out (0=no limit).",
)
# ---------------- Internal Helpers ---------------- #
async def _gather_history_images(
self,
messages: List[Dict[str, Any]],
last_user_msg: Dict[str, Any],
optimization_stats: List[Dict[str, Any]],
) -> List[Dict[str, Any]]:
history_images: List[Dict[str, Any]] = []
for msg in messages:
if msg is last_user_msg:
continue
if msg.get("role") not in {"user", "assistant"}:
continue
_p, parts = await self._extract_images_from_message(
msg, stats_list=optimization_stats
)
if parts:
history_images.extend(parts)
return history_images
def _deduplicate_images(self, images: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
if not self.valves.IMAGE_DEDUP_HISTORY:
return images
seen: set[str] = set()
result: List[Dict[str, Any]] = []
for part in images:
try:
data = part["inline_data"]["data"]
# Hash full base64 payload for stronger dedup reliability
h = hashlib.sha256(data.encode()).hexdigest()
if h in seen:
continue
seen.add(h)
except Exception as e:
# Skip images with malformed or missing data, but log for debugging.
self.log.debug(f"Skipping image in deduplication due to error: {e}")
result.append(part)
return result
def _combine_system_prompts(
self, user_system_prompt: Optional[str]
) -> Optional[str]:
"""Combine default system prompt with user-defined system prompt.
If DEFAULT_SYSTEM_PROMPT is set and user_system_prompt exists,
the default is prepended to the user's prompt.
If only DEFAULT_SYSTEM_PROMPT is set, it is used as the system prompt.
If only user_system_prompt is set, it is used as-is.
Args:
user_system_prompt: The user-defined system prompt from messages (may be None)
Returns:
Combined system prompt or None if neither is set
"""
default_prompt = self.valves.DEFAULT_SYSTEM_PROMPT.strip()
user_prompt = user_system_prompt.strip() if user_system_prompt else ""
if default_prompt and user_prompt:
combined = f"{default_prompt}\n\n{user_prompt}"
self.log.debug(
f"Combined system prompts: default ({len(default_prompt)} chars) + "
f"user ({len(user_prompt)} chars) = {len(combined)} chars"
)
return combined
elif default_prompt:
self.log.debug(f"Using default system prompt ({len(default_prompt)} chars)")
return default_prompt
elif user_prompt:
return user_prompt
return None
def _apply_order_and_limit(
self,
history: List[Dict[str, Any]],
current: List[Dict[str, Any]],
) -> Tuple[List[Dict[str, Any]], List[bool]]:
"""Combine history & current image parts honoring order & global limit.
Returns:
(combined_parts, reused_flags) where reused_flags[i] == True indicates
the image originated from history, False if from current message.
"""
history_first = self.valves.IMAGE_HISTORY_FIRST
limit = max(1, self.valves.IMAGE_HISTORY_MAX_REFERENCES)
combined: List[Dict[str, Any]] = []
reused_flags: List[bool] = []
def append(parts: List[Dict[str, Any]], reused: bool):
for p in parts:
if len(combined) >= limit:
break
combined.append(p)
reused_flags.append(reused)
if history_first:
append(history, True)
append(current, False)
else:
append(current, False)
append(history, True)
return combined, reused_flags
async def _emit_image_stats(
self,
ordered_stats: List[Dict[str, Any]],
reused_flags: List[bool],
total_limit: int,
__event_emitter__: Callable,
) -> None:
"""Emit per-image optimization stats aligned with final combined order.
ordered_stats: stats list in the exact order images will be sent (same length as combined image list)
reused_flags: parallel list indicating whether image originated from history
"""
if not ordered_stats:
return
for idx, stat in enumerate(ordered_stats, start=1):
reused = reused_flags[idx - 1] if idx - 1 < len(reused_flags) else False
stat_copy = dict(stat) if stat else {}
stat_copy.update({"index": idx, "reused": reused})
if stat and stat.get("original_size_mb") is not None:
desc = f"Image {idx}: {stat['original_size_mb']:.2f}MB -> {stat['final_size_mb']:.2f}MB"
if stat.get("quality") is not None:
desc += f" (Q{stat['quality']})"
else:
desc = f"Image {idx}: (no metrics)"
reasons = stat.get("reasons") if stat else None
if reasons:
desc += " | " + ", ".join(reasons[:3])
await __event_emitter__(
{
"type": "status",
"data": {
"action": "image_optimization",
"description": desc,
"index": idx,
"done": False,
"details": stat_copy,
},
}
)
await __event_emitter__(
{
"type": "status",
"data": {
"action": "image_optimization",
"description": f"{len(ordered_stats)} image(s) processed (limit {total_limit}).",
"done": True,
},
}
)
async def _build_image_generation_contents(
self,
messages: List[Dict[str, Any]],
__event_emitter__: Callable,
) -> Tuple[List[Dict[str, Any]], Optional[str]]:
"""Construct the contents payload for image-capable models.
Returns tuple (contents, system_instruction) where system_instruction is extracted from system messages.
"""
# Extract user-defined system instruction first
user_system_instruction = next(
(msg["content"] for msg in messages if msg.get("role") == "system"),
None,
)
# Combine with default system prompt if configured
system_instruction = self._combine_system_prompts(user_system_instruction)
last_user_msg = next(
(m for m in reversed(messages) if m.get("role") == "user"), None
)
if not last_user_msg:
raise ValueError("No user message found")
optimization_stats: List[Dict[str, Any]] = []
history_images = await self._gather_history_images(
messages, last_user_msg, optimization_stats
)
prompt, current_images = await self._extract_images_from_message(
last_user_msg, stats_list=optimization_stats
)
# Deduplicate
history_images = self._deduplicate_images(history_images)
current_images = self._deduplicate_images(current_images)
combined, reused_flags = self._apply_order_and_limit(
history_images, current_images
)
if not prompt and not combined:
raise ValueError("No prompt or images provided")
if not prompt and combined:
prompt = "Analyze and describe the provided images."
# Build ordered stats aligned with combined list
ordered_stats: List[Dict[str, Any]] = []
if optimization_stats:
# Build map from final_hash -> stat (first wins)
hash_map: Dict[str, Dict[str, Any]] = {}
for s in optimization_stats:
fh = s.get("final_hash")
if fh and fh not in hash_map:
hash_map[fh] = s
for part in combined:
try:
fh = hashlib.sha256(
part["inline_data"]["data"].encode()
).hexdigest()
ordered_stats.append(hash_map.get(fh) or {})
except Exception:
ordered_stats.append({})
# Emit stats AFTER final ordering so labels match
await self._emit_image_stats(
ordered_stats,
reused_flags,
self.valves.IMAGE_HISTORY_MAX_REFERENCES,
__event_emitter__,
)
# Emit mapping
if combined:
mapping = [
{
"index": i + 1,
"label": (
f"Image {i + 1}" if self.valves.IMAGE_ADD_LABELS else str(i + 1)
),
"reused": reused_flags[i],
"origin": "history" if reused_flags[i] else "current",
}
for i in range(len(combined))
]
await __event_emitter__(
{
"type": "status",
"data": {
"action": "image_reference_map",
"description": f"{len(combined)} image(s) included (limit {self.valves.IMAGE_HISTORY_MAX_REFERENCES}).",
"images": mapping,
"done": True,
},
}
)
# Build parts
parts: List[Dict[str, Any]] = []
# For image generation models, prepend system instruction to the prompt
# since system_instruction parameter may not be supported
final_prompt = prompt
if system_instruction and prompt:
final_prompt = f"{system_instruction}\n\n{prompt}"
self.log.debug(
f"Prepended system instruction to prompt for image generation. "
f"System instruction length: {len(system_instruction)}, "
f"Original prompt length: {len(prompt)}, "
f"Final prompt length: {len(final_prompt)}"
)
elif system_instruction and not prompt:
final_prompt = system_instruction
self.log.debug(
f"Using system instruction as prompt for image generation "
f"(length: {len(system_instruction)})"
)
if final_prompt:
parts.append({"text": final_prompt})
if self.valves.IMAGE_ADD_LABELS:
for idx, part in enumerate(combined, start=1):
parts.append({"text": f"[Image {idx}]"})
parts.append(part)
else:
parts.extend(combined)
self.log.debug(
f"Image-capable payload: history={len(history_images)} current={len(current_images)} used={len(combined)} limit={self.valves.IMAGE_HISTORY_MAX_REFERENCES} history_first={self.valves.IMAGE_HISTORY_FIRST} prompt_len={len(final_prompt)}"
)
# Return None for system_instruction since we've incorporated it into the prompt
return [{"role": "user", "parts": parts}], None
def __init__(self):
"""Initializes the Pipe instance and configures the genai library."""
self.valves = self.Valves()
self.name: str = "Google Gemini: "
# Setup logging
self.log = logging.getLogger("google_ai.pipe")
self.log.setLevel(SRC_LOG_LEVELS.get("OPENAI", logging.INFO))
# Model cache
self._model_cache: Optional[List[Dict[str, str]]] = None
self._model_cache_time: float = 0
def _get_client(self) -> genai.Client:
"""
Validates API credentials and returns a genai.Client instance.
"""
self._validate_api_key()
if self.valves.USE_VERTEX_AI:
self.log.debug(
f"Initializing Vertex AI client (Project: {self.valves.VERTEX_PROJECT}, Location: {self.valves.VERTEX_LOCATION})"
)
return genai.Client(
vertexai=True,
project=self.valves.VERTEX_PROJECT,
location=self.valves.VERTEX_LOCATION,
)
else:
self.log.debug("Initializing Google Generative AI client with API Key")
headers = {}
if (
self.valves.ENABLE_FORWARD_USER_INFO_HEADERS
and hasattr(self, "user")
and self.user
):
def sanitize_header_value(value: Any, max_length: int = 255) -> str:
if value is None:
return ""
# Convert to string and remove all control characters
sanitized = re.sub(r"[\x00-\x1F\x7F]", "", str(value))
sanitized = sanitized.strip()
return (
sanitized[:max_length]
if len(sanitized) > max_length
else sanitized
)
user_attrs = {
"X-OpenWebUI-User-Name": sanitize_header_value(
getattr(self.user, "name", None)
),
"X-OpenWebUI-User-Id": sanitize_header_value(
getattr(self.user, "id", None)
),
"X-OpenWebUI-User-Email": sanitize_header_value(
getattr(self.user, "email", None)
),
"X-OpenWebUI-User-Role": sanitize_header_value(
getattr(self.user, "role", None)
),
}
headers = {k: v for k, v in user_attrs.items() if v not in (None, "")}
options = types.HttpOptions(
api_version=self.valves.API_VERSION,
base_url=self.valves.BASE_URL,
headers=headers,
)
return genai.Client(
api_key=EncryptedStr.decrypt(self.valves.GOOGLE_API_KEY),
http_options=options,
)
def _validate_api_key(self) -> None:
"""
Validates that the necessary Google API credentials are set.
Raises:
ValueError: If the required credentials are not set.
"""
if self.valves.USE_VERTEX_AI:
if not self.valves.VERTEX_PROJECT:
self.log.error("USE_VERTEX_AI is true, but VERTEX_PROJECT is not set.")
raise ValueError(
"VERTEX_PROJECT is not set. Please provide the Google Cloud project ID."
)
# For Vertex AI, location has a default, so project is the main thing to check.
# Actual authentication will be handled by ADC or environment.
self.log.debug(
"Using Vertex AI. Ensure ADC or service account is configured."
)
else:
if not self.valves.GOOGLE_API_KEY:
self.log.error("GOOGLE_API_KEY is not set (and not using Vertex AI).")
raise ValueError(
"GOOGLE_API_KEY is not set. Please provide the API key in the environment variables or valves."
)
self.log.debug("Using Google Generative AI API with API Key.")
def strip_prefix(self, model_name: str) -> str:
"""
Extract the model identifier using regex, handling various naming conventions.
e.g., "google_gemini_pipeline.gemini-2.5-flash-preview-04-17" -> "gemini-2.5-flash-preview-04-17"
e.g., "models/gemini-1.5-flash-001" -> "gemini-1.5-flash-001"
e.g., "publishers/google/models/gemini-1.5-pro" -> "gemini-1.5-pro"
"""
# Use regex to remove everything up to and including the last '/' or the first '.'
stripped = re.sub(r"^(?:.*/|[^.]*\.)", "", model_name)
return stripped
def get_google_models(self, force_refresh: bool = False) -> List[Dict[str, str]]:
"""
Retrieve available Google models suitable for content generation.
Uses caching to reduce API calls.
Args:
force_refresh: Whether to force refreshing the model cache
Returns:
List of dictionaries containing model id and name.
"""
# Check cache first
current_time = time.time()
if (
not force_refresh
and self._model_cache is not None
and (current_time - self._model_cache_time) < self.valves.MODEL_CACHE_TTL
):
self.log.debug("Using cached model list")
return self._model_cache
try:
client = self._get_client()
self.log.debug("Fetching models from Google API")
models = list(client.models.list())
# Process additional models (models not returned by SDK but that we want to add)
additional = self.valves.MODEL_ADDITIONAL
if additional:
self.log.debug(f"Processing additional models: {additional}")
existing_model_names = {self.strip_prefix(m.name) for m in models}
additional_ids = set(re.findall(r"[^,\s]+", additional))
for model_id in additional_ids.difference(existing_model_names):
self.log.debug(f"Adding additional model '{model_id}'.")
models.append(types.Model(name=f"models/{model_id}"))
available_models = []
for model in models:
actions = model.supported_actions
model_id_stripped = self.strip_prefix(model.name)
is_content_model = actions is None or "generateContent" in actions
is_video_model = (
actions is not None and "generateVideos" in actions
) or model_id_stripped.startswith("veo-")
if is_content_model or is_video_model:
model_id = model_id_stripped
model_name = model.display_name or model_id
# Check if model supports image generation
supports_image_generation = self._check_image_generation_support(
model_id
)
if supports_image_generation:
model_name += " 🎨" # Add image generation indicator
# Check if model supports video generation
supports_video_generation = self._check_video_generation_support(
model_id
)
if supports_video_generation:
model_name += " 🎬" # Add video generation indicator
available_models.append(
{
"id": model_id,
"name": model_name,
"image_generation": supports_image_generation,
"video_generation": supports_video_generation,
}
)
model_map = {model["id"]: model for model in available_models}
# Apply MODEL_WHITELIST filter if configured (takes priority)
whitelist = self.valves.MODEL_WHITELIST
if whitelist:
self.log.debug(f"Applying model whitelist: {whitelist}")
whitelisted_ids = set(re.findall(r"[^,\s]+", whitelist))
# Filter to only include whitelisted models
filtered_models = {
k: v for k, v in model_map.items() if k in whitelisted_ids
}
self.log.debug(f"After whitelist filter: {len(filtered_models)} models")
else:
# If no whitelist, filter to only include models starting with 'gemini-' or 'veo-'
filtered_models = {
k: v
for k, v in model_map.items()
if k.startswith("gemini-") or k.startswith("veo-")
}
self.log.debug(f"After prefix filter: {len(filtered_models)} models")
# Update cache
self._model_cache = list(filtered_models.values())
self._model_cache_time = current_time
self.log.debug(f"Found {len(self._model_cache)} Gemini models")
return self._model_cache
except Exception as e:
self.log.exception(f"Could not fetch models from Google: {str(e)}")
# Return a specific error entry for the UI
return [{"id": "error", "name": f"Could not fetch models: {str(e)}"}]
def _check_image_generation_support(self, model_id: str) -> bool:
"""
Check if a model supports image generation.
Args:
model_id: The model ID to check
Returns:
True if the model supports image generation, False otherwise
"""
# Known image generation models (both Gemini 2.5 and Gemini 3)
image_generation_models = [
"gemini-2.5-flash-image",
"gemini-2.5-flash-image-preview",
"gemini-3-flash-image",
"gemini-3-flash-image-preview",
"gemini-3.1-flash-image-preview",
"gemini-3-pro-image",
"gemini-3-pro-image-preview",
]
# Check for exact matches or pattern matches
for pattern in image_generation_models:
if model_id == pattern or pattern in model_id:
return True
# Additional pattern checking for future models
if "image" in model_id.lower() and (
"generation" in model_id.lower() or "preview" in model_id.lower()
):
return True
return False
def _is_gemini_3_family_model(self, model_id: str) -> bool:
"""Return True for Gemini 3.x model IDs, including Gemini 3.1."""
model_lower = model_id.lower()
return model_lower.startswith("gemini-3-") or model_lower.startswith(
"gemini-3."
)
def _is_gemini_3_image_model(self, model_id: str) -> bool:
"""Return True for Gemini 3.x image generation models."""
return self._is_gemini_3_family_model(
model_id
) and self._check_image_generation_support(model_id)
def _check_image_config_support(self, model_id: str) -> bool:
"""
Check if a model supports ImageConfig (aspect_ratio and image_size parameters).
ImageConfig is only supported by Gemini 3 image generation models.
Gemini 2.5 image models support image generation but not ImageConfig.
Args:
model_id: The model ID to check
Returns:
True if the model supports ImageConfig, False otherwise
"""
return self._is_gemini_3_image_model(model_id)
def _check_thinking_support(self, model_id: str) -> bool:
"""
Check if a model supports the thinking feature.
Args:
model_id: The model ID to check
Returns:
True if the model supports thinking, False otherwise
"""
# Models that do NOT support thinking
non_thinking_models = [
"gemini-2.5-flash-image-preview",
"gemini-2.5-flash-image",
]
# Check for exact matches
for pattern in non_thinking_models:
if model_id == pattern or pattern in model_id:
return False
# Gemini 3 image models support thinking and thinking-level controls.