-
Notifications
You must be signed in to change notification settings - Fork 132
Expand file tree
/
Copy pathLLMExtractor.py
More file actions
148 lines (120 loc) · 4.62 KB
/
LLMExtractor.py
File metadata and controls
148 lines (120 loc) · 4.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python3
# -*-coding:UTF-8 -*
"""
The LLM-Extractor Module
================================
This module extracts geolocation information from an unstructured text
"""
##################################
# Import External packages
##################################
from modules.abstract_module import AbstractModule
import os
import sys
from openai import Client
from itertools import chain
from pprint import pprint
from typing import List, Dict
from lib.ConfigLoader import ConfigLoader
sys.path.append(os.environ['AIL_BIN'])
##################################
# Import Project packages
##################################
# from lib.ConfigLoader import ConfigLoader
# from lib import Statistics
class LLMExtractor(AbstractModule):
def __init__(self, queue=True):
super(LLMExtractor, self).__init__(queue=queue)
config_loader = ConfigLoader()
# Waiting time in seconds between to message processed
self.pending_seconds = 0
# Send module state to logs
self.logger.info(f'Module {self.module_name} initialized')
self.api_key = os.getenv('OPENAI_API_KEY')
if not self.api_key:
assert False, "API key not found. Please set the OPENAI_API_KEY environment variable."
# or take it from the config file: api_key=config_loader.get_config_str("OpenAI", "OPENAI_API_KEY"))
self.openai_client = Client(api_key=self.api_key)
self.model = 'o3-mini'
def find_geolocation(self, text: str) -> List[Dict]:
"""find all occurences of geolocation information in the given text
Returns:
List[Dict]: list of geolocation information
Example:
[
{"location": "San Francisco", "type": "city", "latitude": 37.8176155, "longitude": -122.4783123},
{"location": "Terrace Avenue, Fresno County, California", "type": "full_address", "latitude": 36.7783, "longitude": -119.4179},
{"location": "USA", "type": "country", "latitude": 37.0902, "longitude": -95.7129},
]
"""
prompt = """
You are a highly skilled geospatial intelligence analyst, adept at extracting precise geolocation information from unstructured text. Please analyze the following text and identify all references to locations. Then, return only a JSON array of objects in the format:
[
{
"location": "<location string>",
"type": "<most specific type from: full_address, city, country, region, continent, ocean, sea, river, mountain, lake>",
"latitude": "<decimal degrees>",
"longitude": "<decimal degrees>"
}
]
Instructions:
For each mention of a place, use the most specific location type that applies (e.g., if it’s an address, use “full_address”; if it’s a city, use “city”; etc.).
Your “location” field should exactly match the text’s most specific place name or address.
“latitude” and “longitude” must be as accurate as possible for the identified location.
Do not add extra fields or information beyond the specified JSON structure.
Return only the JSON array, with no additional explanation or text.
Example:
[
{
"location": "San Francisco",
"type": "city",
"latitude": 37.7749,
"longitude": -122.4194
},
{
"location": "Terrace Avenue, Fresno County, California",
"type": "full_address",
"latitude": 36.7783,
"longitude": -119.4179
},
{
"location": "USA",
"type": "country",
"latitude": 37.0902,
"longitude": -95.7129
}
]
Now, process the text below and return the resulting JSON. Do not include any additional commentary.
Text to analyze:
{{text}}
"""
response = self.openai_client.chat.completions.create(
model=self.model,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": text},
],
response_format={
"type": "json_object",
},
reasoning_effort="medium",
# temperature=0, # for other models
store=False
)
# pprint(response, indent=4)
# Extract the geolocation information from the response
geolocation_info = [
answer.message.content for answer in response.choices]
return geolocation_info
def compute(self, message):
obj = self.get_obj()
obj_id = obj.get_id()
geolocations = self.find_geolocation(obj.get_content())
print(f'Geolocation found: {geolocations}')
if geolocations:
# Tags
tag = 'infoleak:automatic-detection="geolocation"'
self.add_message_to_queue(message=tag, queue='Tags')
if __name__ == '__main__':
module = LLMExtractor()
module.run()