|
1 | 1 | import os, sys, re, csv, socket, struct, ipaddress, binascii |
2 | 2 | import time |
3 | | -from io import open |
| 3 | +from io import open, StringIO |
| 4 | +import pandas as pd |
| 5 | +import pyarrow as pa |
| 6 | +import pyarrow.parquet as pq |
| 7 | +from decimal import Decimal |
| 8 | + |
4 | 9 |
|
5 | 10 | conversion_mode = 'range' |
6 | 11 | write_mode = 'replace' |
@@ -179,3 +184,182 @@ def convert_to_csv(input_file, output_file, conversion_mode, write_mode): |
179 | 184 | # Stop the loop if there are no more rows |
180 | 185 | if not chunk: |
181 | 186 | break |
| 187 | + |
| 188 | +def get_last_row(file_path): |
| 189 | + with open(file_path, 'rb') as f: |
| 190 | + f.seek(-2, 2) # Move to the second-last byte |
| 191 | + while f.read(1) != b'\n': |
| 192 | + f.seek(-2, 1) |
| 193 | + last_line = f.readline().decode() |
| 194 | + return last_line |
| 195 | + |
| 196 | +def detect_ip_version_from_number(ip_num): |
| 197 | + try: |
| 198 | + ip = ipaddress.ip_address(int(ip_num)) |
| 199 | + return 'IPv4' if ip.version == 4 else 'IPv6' |
| 200 | + except ValueError: |
| 201 | + return 'Invalid' |
| 202 | + |
| 203 | +def detect_versions_from_chunk(chunk): |
| 204 | + versions = set() |
| 205 | + |
| 206 | + # Combine both start and end IP columns (col 0 and col 1) |
| 207 | + all_ips = pd.concat([chunk.iloc[:, 0], chunk.iloc[:, 1]], ignore_index=True) |
| 208 | + |
| 209 | + for ip_num in all_ips: |
| 210 | + try: |
| 211 | + version = ipaddress.ip_address(int(ip_num)).version |
| 212 | + versions.add(version) |
| 213 | + if len(versions) > 1: |
| 214 | + break # early stop if we detect both v4 and v6 |
| 215 | + except ValueError: |
| 216 | + continue # skip invalid numbers |
| 217 | + |
| 218 | + return versions |
| 219 | + |
| 220 | +# Scan the file in chunks |
| 221 | +def check_ip_versions(csv_path): |
| 222 | + seen_versions = set() |
| 223 | + for chunk in pd.read_csv(csv_path, chunksize=50_000, header=None, usecols=[0, 1]): |
| 224 | + seen_versions.update(detect_versions_from_chunk(chunk)) |
| 225 | + if len(seen_versions) > 1: |
| 226 | + break # early exit if both found |
| 227 | + |
| 228 | + if seen_versions == {4, 6}: |
| 229 | + print(f'Your csv file {csv_path} contains mixture of IPv4 and IPv6 addresses, which will causing issue when converting to parquet file.') |
| 230 | + print(f'It is advisable to separate IPv4 and IPv6 addresses into two identical csv file.') |
| 231 | + sys.exit(1) |
| 232 | + |
| 233 | +# Convert CSV to Parquet |
| 234 | +def csv_to_parquet(input_file, output_file, db_type): |
| 235 | + column_names = '' |
| 236 | + csv_file = input_file |
| 237 | + parquet_file = output_file |
| 238 | + parquet_chunksize = 50_000 |
| 239 | + parquet_writer = None |
| 240 | + |
| 241 | + # Need to determine the column names |
| 242 | + column_names_list = { |
| 243 | + 'DB1': ["ip_from", "ip_to", "country_code", "country_name"], |
| 244 | + 'DB2': ["ip_from", "ip_to", "country_code", "country_name", "isp"], |
| 245 | + 'DB3': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name"], |
| 246 | + 'DB4': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "isp"], |
| 247 | + 'DB5': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude"], |
| 248 | + 'DB6': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "isp"], |
| 249 | + 'DB7': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "isp", "domain"], |
| 250 | + 'DB8': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "isp", "domain"], |
| 251 | + 'DB9': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "zip_code"], |
| 252 | + 'DB10': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "zip_code", "isp", "domain"], |
| 253 | + 'DB11': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "zip_code", "time_zone"], |
| 254 | + 'DB12': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "zip_code", "time_zone", "isp", "domain"], |
| 255 | + 'DB13': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "time_zone", "net_speed"], |
| 256 | + 'DB14': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "zip_code", "time_zone", "isp", "domain", "time_zone", "net_speed"], |
| 257 | + 'DB15': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "zip_code", "time_zone", "idd_code", "area_code"], |
| 258 | + 'DB16': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "zip_code", "time_zone", "isp", "domain", "time_zone", "net_speed", "idd_code", "area_code"], |
| 259 | + 'DB17': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "time_zone", "net_speed", "weather_station_code", "weather_station_name"], |
| 260 | + 'DB18': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "zip_code", "time_zone", "isp", "domain", "time_zone", "net_speed", "idd_code", "area_code", "weather_station_code", "weather_station_name"], |
| 261 | + 'DB19': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "isp", "domain", "mcc", "mnc", "mobile_brand"], |
| 262 | + 'DB20': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "zip_code", "time_zone", "isp", "domain", "time_zone", "net_speed", "idd_code", "area_code", "weather_station_code", "weather_station_name", "mcc", "mnc", "mobile_brand"], |
| 263 | + 'DB21': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "zip_code", "time_zone", "idd_code", "area_code", "elevation"], |
| 264 | + 'DB22': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "zip_code", "time_zone", "isp", "domain", "time_zone", "net_speed", "idd_code", "area_code", "weather_station_code", "weather_station_name", "mcc", "mnc", "mobile_brand", "elevation"], |
| 265 | + 'DB23': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "isp", "domain", "mcc", "mnc", "mobile_brand", "usage_type"], |
| 266 | + 'DB24': ["ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", "latitude", "longitude", "zip_code", "time_zone", "isp", "domain", "time_zone", "net_speed", "idd_code", "area_code", "weather_station_code", "weather_station_name", "mcc", "mnc", "mobile_brand", "elevation", "usage_type"], |
| 267 | + 'DB25': [ |
| 268 | + "ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", |
| 269 | + "latitude", "longitude", "zip_code", "time_zone", "isp", "domain", "net_speed", |
| 270 | + "idd_code", "area_code", "weather_station_code", "weather_station_name", |
| 271 | + "mcc", "mnc", "mobile_brand", "elevation", "usage_type", "address_type", |
| 272 | + "category" |
| 273 | + ], |
| 274 | + 'DB26': [ |
| 275 | + "ip_from", "ip_to", "country_code", "country_name", "region_name", "city_name", |
| 276 | + "latitude", "longitude", "zip_code", "time_zone", "isp", "domain", "net_speed", |
| 277 | + "idd_code", "area_code", "weather_station_code", "weather_station_name", |
| 278 | + "mcc", "mnc", "mobile_brand", "elevation", "usage_type", "address_type", |
| 279 | + "category", "district", "asn", "as_name" |
| 280 | + ], |
| 281 | + 'PX1': ['ip_from', 'ip_to', 'country_code', 'country_name'], |
| 282 | + 'PX2': ['ip_from', 'ip_to', 'proxy_type', 'country_code', 'country_name'], |
| 283 | + 'PX3': ['ip_from', 'ip_to', 'proxy_type', 'country_code', 'country_name', 'region_name', 'city_name'], |
| 284 | + 'PX4': ['ip_from', 'ip_to', 'proxy_type', 'country_code', 'country_name', 'region_name', 'city_name', 'isp'], |
| 285 | + 'PX5': ['ip_from', 'ip_to', 'proxy_type', 'country_code', 'country_name', 'region_name', 'city_name', 'isp', 'domain'], |
| 286 | + 'PX6': ['ip_from', 'ip_to', 'proxy_type', 'country_code', 'country_name', 'region_name', 'city_name', 'isp', 'domain', 'usage_type'], |
| 287 | + 'PX7': ['ip_from', 'ip_to', 'proxy_type', 'country_code', 'country_name', 'region_name', 'city_name', 'isp', 'domain', 'usage_type', 'asn', 'as'], |
| 288 | + 'PX8': ['ip_from', 'ip_to', 'proxy_type', 'country_code', 'country_name', 'region_name', 'city_name', 'isp', 'domain', 'usage_type', 'asn', 'as', 'last_seen'], |
| 289 | + 'PX9': ['ip_from', 'ip_to', 'proxy_type', 'country_code', 'country_name', 'region_name', 'city_name', 'isp', 'domain', 'usage_type', 'asn', 'as', 'last_seen', 'threat'], |
| 290 | + 'PX10': ['ip_from', 'ip_to', 'proxy_type', 'country_code', 'country_name', 'region_name', 'city_name', 'isp', 'domain', 'usage_type', 'asn', 'as', 'last_seen', 'threat'], |
| 291 | + 'PX11': ['ip_from', 'ip_to', 'proxy_type', 'country_code', 'country_name', 'region_name', 'city_name', 'isp', 'domain', 'usage_type', 'asn', 'as', 'last_seen', 'threat', 'provider'], |
| 292 | + 'PX12': ['ip_from', 'ip_to', 'proxy_type', 'country_code', 'country_name', 'region_name', 'city_name', 'isp', 'domain', 'usage_type', 'asn', 'as', 'last_seen', 'threat', 'provider', 'fraud_score'] |
| 293 | + } |
| 294 | + if db_type != '': |
| 295 | + try: |
| 296 | + column_names = column_names_list[db_type] |
| 297 | + except Exception: |
| 298 | + print(f'Invalid db_type value foundm the valid value should be range from DB1 to DB26. Your input: {db_type}.') |
| 299 | + sys.exit(1) |
| 300 | + |
| 301 | + # check_ip_versions(csv_file) |
| 302 | + |
| 303 | + # Determine ipv4 or ipv6 based on the last row of the file |
| 304 | + # Get last line |
| 305 | + last_line = get_last_row(csv_file) |
| 306 | + |
| 307 | + df_last = pd.read_csv(StringIO(last_line), header=None) |
| 308 | + ip_value = df_last.iloc[0, 0] # Replace with actual index |
| 309 | + # print("Is IPv6?", is_ipv6(ip_value)) |
| 310 | + # print(f"{ip_value} is {detect_ip_version_from_number(ip_value)}") |
| 311 | + ip_ver = detect_ip_version_from_number(ip_value) |
| 312 | + |
| 313 | + if column_names != '': |
| 314 | + try: |
| 315 | + schema_list = [] |
| 316 | + for column in column_names: |
| 317 | + if column in ["ip_from", "ip_to"]: |
| 318 | + if ip_ver == 'IPv4': |
| 319 | + schema_list.append(pa.field(column, pa.uint32())) |
| 320 | + elif ip_ver == 'IPv6': |
| 321 | + schema_list.append(pa.field(column, pa.string())) |
| 322 | + elif column in ["latitude", "longitude"]: |
| 323 | + schema_list.append(pa.field(column, pa.float64())) |
| 324 | + elif column in ['last_seen', 'fraud_score', "elevation"]: |
| 325 | + schema_list.append(pa.field(column, pa.int32())) |
| 326 | + else: |
| 327 | + schema_list.append(pa.field(column, pa.string())) |
| 328 | + schema = pa.schema(schema_list) |
| 329 | + for chunk in pd.read_csv( |
| 330 | + csv_file, |
| 331 | + names=column_names, |
| 332 | + header=None, |
| 333 | + chunksize=parquet_chunksize, |
| 334 | + low_memory=True, |
| 335 | + dtype=str # initially read all as string to control parsing |
| 336 | + ): |
| 337 | + if ip_ver == 'IPv4': |
| 338 | + chunk["ip_from"] = pd.to_numeric(chunk["ip_from"], errors="coerce").astype("uint32") |
| 339 | + chunk["ip_to"] = pd.to_numeric(chunk["ip_to"], errors="coerce").astype("uint32") |
| 340 | + elif ip_ver == 'IPv6': |
| 341 | + chunk["ip_from"] = chunk["ip_from"].apply(int) |
| 342 | + chunk["ip_from"] = chunk["ip_from"].apply(lambda x: format(x, '032x')) |
| 343 | + chunk["ip_to"] = chunk["ip_to"].apply(int) |
| 344 | + chunk["ip_to"] = chunk["ip_to"].apply(lambda x: format(x, '032x')) |
| 345 | + if "latitude" in column_names: |
| 346 | + chunk["latitude"] = pd.to_numeric(chunk["latitude"], errors="coerce").astype("float64") |
| 347 | + if "longitude" in column_names: |
| 348 | + chunk["longitude"] = pd.to_numeric(chunk["longitude"], errors="coerce").astype("float64") |
| 349 | + if "elevation" in column_names: |
| 350 | + chunk["elevation"] = pd.to_numeric(chunk["elevation"], errors="coerce") |
| 351 | + |
| 352 | + table = pa.Table.from_pandas(chunk, schema=schema, preserve_index=False) |
| 353 | + |
| 354 | + if parquet_writer is None: |
| 355 | + parquet_writer = pq.ParquetWriter(parquet_file, table.schema) |
| 356 | + |
| 357 | + parquet_writer.write_table(table) |
| 358 | + |
| 359 | + if parquet_writer: |
| 360 | + parquet_writer.close() |
| 361 | + except Exception as e: |
| 362 | + print(f'Unexcepted error occured, will abort now...') |
| 363 | + print(str(e)) |
| 364 | + sys.exit(1) |
| 365 | + |
0 commit comments