-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathcli_shared.py
More file actions
944 lines (750 loc) · 31.7 KB
/
cli_shared.py
File metadata and controls
944 lines (750 loc) · 31.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
import argparse
import ast
import base64
import logging
import sys
from argparse import FileType
from copy import deepcopy
from functools import cache
from getpass import getpass
from pathlib import Path
from typing import Any, Optional, Text, Union, cast
from multiversx_sdk import (
Account,
Address,
ApiNetworkProvider,
GasLimitEstimator,
LedgerAccount,
ProxyNetworkProvider,
Token,
TokenComputer,
TokenTransfer,
Transaction,
TransactionComputer,
)
from multiversx_sdk_cli import config, utils
from multiversx_sdk_cli.cli_output import CLIOutputBuilder
from multiversx_sdk_cli.cli_password import (
load_guardian_password,
load_password,
load_relayer_password,
)
from multiversx_sdk_cli.config_env import MxpyEnv, get_address_hrp
from multiversx_sdk_cli.config_wallet import (
get_active_wallet,
read_wallet_config_file,
resolve_wallet_config_path,
)
from multiversx_sdk_cli.constants import (
DEFAULT_GAS_PRICE,
DEFAULT_TX_VERSION,
TCS_SERVICE_ID,
TRANSACTION_OPTIONS_TX_HASH_SIGN,
)
from multiversx_sdk_cli.errors import (
AddressConfigFileError,
ArgumentsNotProvidedError,
BadUsage,
IncorrectWalletError,
LedgerError,
NoWalletProvided,
UnknownWalletAliasError,
WalletError,
)
from multiversx_sdk_cli.guardian_relayer_data import GuardianRelayerData
from multiversx_sdk_cli.interfaces import IAccount
from multiversx_sdk_cli.signing_wrapper import SigningWrapper
from multiversx_sdk_cli.simulation import Simulator
from multiversx_sdk_cli.transactions import send_and_wait_for_result
from multiversx_sdk_cli.utils import log_explorer_transaction, parse_headers_list
from multiversx_sdk_cli.ux import confirm_continuation
logger = logging.getLogger("cli_shared")
trusted_cosigner_service_url_by_chain_id = {
"1": "https://tools.multiversx.com/guardian",
"D": "https://devnet-tools.multiversx.com/guardian",
"T": "https://testnet-tools.multiversx.com/guardian",
}
def get_trusted_cosigner_service_url_by_chain_id(chain_id: str) -> str:
try:
return trusted_cosigner_service_url_by_chain_id[chain_id]
except:
raise BadUsage(f"Could not get Trusted Cosigner Service Url. No match found for chain id: {chain_id}")
def wider_help_formatter(prog: Text):
return argparse.RawDescriptionHelpFormatter(prog, max_help_position=50, width=120)
def add_group_subparser(subparsers: Any, group: str, description: str) -> Any:
parser = subparsers.add_parser(
group,
usage=f"mxpy {group} COMMAND [-h] ...",
description=description,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser._positionals.title = "COMMANDS"
parser._optionals.title = "OPTIONS"
return parser
def build_group_epilog(subparsers: Any) -> str:
epilog = """
----------------
COMMANDS summary
----------------
"""
for choice, sub in subparsers.choices.items():
description_first_line = sub.description.splitlines()[0]
epilog += f"{choice.ljust(30)} {description_first_line}\n"
return epilog
def add_command_subparser(subparsers: Any, group: str, command: str, description: str):
return subparsers.add_parser(
command,
usage=f"mxpy {group} {command} [-h] ...",
description=description,
formatter_class=wider_help_formatter,
)
def add_tx_args(
args: list[str],
sub: Any,
with_nonce: bool = True,
with_receiver: bool = True,
with_data: bool = True,
):
if with_nonce:
sub.add_argument(
"--nonce",
type=int,
required=False,
default=None,
help="# the nonce for the transaction. If not provided, is fetched from the network.",
)
if with_receiver:
sub.add_argument("--receiver", required=False, help="🖄 the address of the receiver")
sub.add_argument("--receiver-username", required=False, help="🖄 the username of the receiver")
sub.add_argument(
"--gas-price",
default=DEFAULT_GAS_PRICE,
type=int,
help="⛽ the gas price (default: %(default)d)",
)
sub.add_argument("--gas-limit", required=False, type=int, help="⛽ the gas limit")
sub.add_argument(
"--gas-limit-multiplier",
required=False,
type=float,
help="if `--gas-limit` is not provided, the estimated value will be multiplied by this multiplier (e.g 1.1)",
)
sub.add_argument("--value", default=0, type=int, help="the value to transfer (default: %(default)s)")
if with_data:
sub.add_argument(
"--data",
default="",
help="the payload, or 'memo' of the transaction (default: %(default)s)",
)
sub.add_argument("--chain", type=str, help="the chain identifier")
sub.add_argument(
"--version",
type=int,
default=DEFAULT_TX_VERSION,
help="the transaction version (default: %(default)s)",
)
sub.add_argument("--options", type=int, default=0, help="the transaction options (default: %(default)s)")
sub.add_argument("--relayer", type=str, help="the bech32 address of the relayer", default="")
sub.add_argument("--guardian", type=str, help="the bech32 address of the guardian", default="")
def add_wallet_args(args: list[str], sub: Any):
sub.add_argument("--sender", type=str, help="the alias of the wallet set in the address config")
sub.add_argument(
"--pem",
required=False,
help="🔑 the PEM file, if keyfile not provided",
)
sub.add_argument(
"--keyfile",
required=False,
help="🔑 a JSON keyfile, if PEM not provided",
)
sub.add_argument(
"--passfile",
help="DEPRECATED, do not use it anymore. Instead, you'll be prompted to enter the password.",
)
sub.add_argument(
"--ledger",
action="store_true",
required=False,
default=False,
help="🔐 bool flag for signing transaction using ledger",
)
sub.add_argument(
"--sender-wallet-index",
type=int,
default=0,
help="🔑 the address index; can be used for PEM files, keyfiles of type mnemonic or Ledger devices (default: %(default)s)",
)
sub.add_argument("--sender-username", required=False, help="🖄 the username of the sender")
sub.add_argument(
"--hrp", required=False, type=str, help="The hrp used to convert the address to its bech32 representation"
)
def add_guardian_wallet_args(args: list[str], sub: Any):
sub.add_argument(
"--guardian-service-url",
type=str,
help="the url of the guardian service",
default="",
)
sub.add_argument(
"--guardian-2fa-code",
type=str,
help="the 2fa code for the guardian",
default="",
)
sub.add_argument(
"--guardian-pem",
help="🔑 the PEM file, if keyfile not provided",
)
sub.add_argument(
"--guardian-keyfile",
help="🔑 a JSON keyfile, if PEM not provided",
)
sub.add_argument(
"--guardian-passfile",
help="DEPRECATED, do not use it anymore. Instead, you'll be prompted to enter the password.",
)
sub.add_argument(
"--guardian-ledger",
action="store_true",
default=False,
help="🔐 bool flag for signing transaction using ledger",
)
sub.add_argument(
"--guardian-wallet-index",
type=int,
default=0,
help="🔑 the address index; can be used for PEM files, keyfiles of type mnemonic or Ledger devices (default: %(default)s)",
)
def add_relayed_v3_wallet_args(args: list[str], sub: Any):
sub.add_argument("--relayer-pem", help="🔑 the PEM file, if keyfile not provided")
sub.add_argument("--relayer-keyfile", help="🔑 a JSON keyfile, if PEM not provided")
sub.add_argument(
"--relayer-passfile",
help="DEPRECATED, do not use it anymore. Instead, you'll be prompted to enter the password.",
)
sub.add_argument(
"--relayer-ledger",
action="store_true",
default=False,
help="🔐 bool flag for signing transaction using ledger",
)
sub.add_argument(
"--relayer-wallet-index",
type=int,
default=0,
help="🔑 the address index; can be used for PEM files, keyfiles of type mnemonic or Ledger devices (default: %(default)s)",
)
def add_proxy_arg(sub: Any):
sub.add_argument("--proxy", type=str, help="🔗 the URL of the proxy")
sub.add_argument(
"--proxy-headers",
nargs="+",
metavar="KEY=VALUE",
help="custom HTTP headers for proxy requests, e.g. 'Api-Key=mytoken'",
)
def parse_proxy_headers(proxy_headers: Optional[list[str]]) -> dict[str, str]:
if not proxy_headers:
return {}
for item in proxy_headers:
if "=" not in item:
raise ArgumentsNotProvidedError(f"Invalid proxy header (expected KEY=VALUE): {item!r}")
return parse_headers_list(proxy_headers)
def add_outfile_arg(sub: Any, what: str = ""):
what = f"({what})" if what else ""
sub.add_argument(
"--outfile",
type=FileType("w"),
default=sys.stdout,
help=f"where to save the output {what} (default: stdout)",
)
def add_infile_arg(sub: Any, what: str = ""):
what = f"({what})" if what else ""
sub.add_argument("--infile", type=FileType("r"), required=True, help=f"input file {what}")
def add_omit_fields_arg(sub: Any):
sub.add_argument(
"--omit-fields",
default="[]",
type=str,
required=False,
help="omit fields in the output payload (default: %(default)s); fields should be passed as a string containing a list of fields (e.g. \"['field1', 'field2']\")",
)
def add_token_transfers_args(sub: Any):
sub.add_argument(
"--token-transfers",
nargs="+",
help="token transfers for transfer & execute, as [token, amount] "
"E.g. --token-transfers NFT-123456-0a 1 ESDT-987654 100000000",
)
def add_metadata_arg(sub: Any):
sub.add_argument(
"--metadata-not-upgradeable",
dest="metadata_upgradeable",
action="store_false",
help="‼ mark the contract as NOT upgradeable (default: upgradeable)",
)
sub.add_argument(
"--metadata-not-readable",
dest="metadata_readable",
action="store_false",
help="‼ mark the contract as NOT readable (default: readable)",
)
sub.add_argument(
"--metadata-payable",
dest="metadata_payable",
action="store_true",
help="‼ mark the contract as payable (default: not payable)",
)
sub.add_argument(
"--metadata-payable-by-sc",
dest="metadata_payable_by_sc",
action="store_true",
help="‼ mark the contract as payable by SC (default: not payable by SC)",
)
sub.set_defaults(metadata_upgradeable=True, metadata_payable=False)
def add_wait_result_and_timeout_args(sub: Any):
sub.add_argument(
"--wait-result",
action="store_true",
default=False,
help="signal to wait for the transaction result - only valid if --send is set",
)
sub.add_argument(
"--timeout",
default=100,
help="max num of seconds to wait for result" " - only valid if --wait-result is set",
)
def parse_omit_fields_arg(args: Any) -> list[str]:
literal = args.omit_fields
parsed = ast.literal_eval(literal)
return cast(list[str], parsed)
def _has_options_set_for_hash_signing(args: Any) -> bool:
if hasattr(args, "options") and args.options:
if args.options & TRANSACTION_OPTIONS_TX_HASH_SIGN == TRANSACTION_OPTIONS_TX_HASH_SIGN:
return True
return False
def prepare_account(args: Any):
hrp = get_address_hrp_with_fallback(args)
if args.pem:
acc = Account.new_from_pem(file_path=Path(args.pem), index=args.sender_wallet_index, hrp=hrp)
if _has_options_set_for_hash_signing(args):
acc.use_hash_signing = True
return acc
elif args.keyfile:
password = load_password(args)
index = args.sender_wallet_index if args.sender_wallet_index != 0 else None
try:
acc = Account.new_from_keystore(Path(args.keyfile), password=password, address_index=index, hrp=hrp)
if _has_options_set_for_hash_signing(args):
acc.use_hash_signing = True
return acc
except Exception as e:
raise WalletError(str(e))
elif args.ledger:
try:
return LedgerAccount(address_index=args.sender_wallet_index)
except Exception as e:
raise LedgerError(str(e))
elif args.sender:
acc = load_wallet_by_alias(alias=args.sender, hrp=hrp)
if _has_options_set_for_hash_signing(args):
acc.use_hash_signing = True
return acc
else:
acc = load_default_wallet(hrp=hrp)
if _has_options_set_for_hash_signing(args):
acc.use_hash_signing = True
return acc
def load_wallet_by_alias(alias: str, hrp: str) -> Account:
file_path = resolve_wallet_config_path()
if not file_path.is_file():
raise AddressConfigFileError("The wallet config file was not found.")
file = read_wallet_config_file()
if file == dict():
raise AddressConfigFileError("Wallet config file is empty.")
wallets: dict[str, Any] = file["wallets"]
wallet = wallets.get(alias, None)
if not wallet:
raise UnknownWalletAliasError(alias)
logger.info(f"Using sender [{alias}] from wallet config.")
return _load_wallet_from_wallet_config(wallet=wallet, hrp=hrp)
def load_default_wallet(hrp: str) -> Account:
active_wallet = get_active_wallet()
if active_wallet == dict():
logger.info("No default wallet found in wallet config.")
raise NoWalletProvided()
alias_of_default_wallet = read_wallet_config_file().get("active", "")
logger.info(f"Using sender [{alias_of_default_wallet}] from wallet config.")
return _load_wallet_from_wallet_config(wallet=active_wallet, hrp=hrp)
def _load_wallet_from_wallet_config(wallet: dict[str, str], hrp: str) -> Account:
wallet_path = wallet.get("path", None)
if not wallet_path:
raise AddressConfigFileError("'path' field must be set in the wallet config.")
path = Path(wallet_path)
index = int(wallet.get("index", 0))
if path.suffix == ".pem":
return Account.new_from_pem(file_path=path, index=index, hrp=hrp)
elif path.suffix == ".json":
logger.info(f"Using keystore wallet at: [{path}].")
password = getpass("Please enter the wallet password: ")
try:
return Account.new_from_keystore(file_path=path, password=password, address_index=index, hrp=hrp)
except Exception as e:
raise WalletError(str(e))
else:
raise WalletError(f"Unsupported wallet file type: [{path.suffix}]. Supported types are: `.pem` and `.json`.")
def get_address_hrp_with_fallback(args: Any) -> str:
"""Use hrp provided by the user. If not provided, fetch from network. If proxy not provided, get hrp from config."""
hrp: str = ""
if hasattr(args, "hrp") and args.hrp:
hrp = args.hrp
return hrp
if hasattr(args, "proxy") and args.proxy:
hrp = _get_hrp_from_proxy(args)
elif hasattr(args, "api") and args.api:
hrp = _get_hrp_from_api(args)
if hrp:
return hrp
return get_address_hrp()
def _get_hrp_from_proxy(args: Any) -> str:
network_provider_config = config.get_config_for_network_providers()
proxy = ProxyNetworkProvider(url=args.proxy, config=network_provider_config)
network_config = proxy.get_network_config()
hrp: str = network_config.raw.get("erd_address_hrp", "")
return hrp
def _get_hrp_from_api(args: Any) -> str:
network_provider_config = config.get_config_for_network_providers()
proxy = ApiNetworkProvider(url=args.api, config=network_provider_config)
network_config = proxy.get_network_config()
hrp: str = network_config.raw.get("erd_address_hrp", "")
return hrp
def load_guardian_account(args: Any) -> Union[IAccount, None]:
hrp = get_address_hrp_with_fallback(args)
if args.guardian_pem:
return Account.new_from_pem(file_path=Path(args.guardian_pem), index=args.guardian_wallet_index, hrp=hrp)
elif args.guardian_keyfile:
password = load_guardian_password(args)
index = args.guardian_wallet_index if args.guardian_wallet_index != 0 else None
try:
return Account.new_from_keystore(
Path(args.guardian_keyfile),
password=password,
address_index=index,
hrp=hrp,
)
except Exception as e:
raise WalletError(str(e))
elif args.guardian_ledger:
try:
return LedgerAccount(address_index=args.guardian_wallet_index)
except Exception as e:
raise LedgerError(str(e))
return None
def get_guardian_address(guardian: Union[IAccount, None], args: Any) -> Union[Address, None]:
address_from_account = guardian.address if guardian else None
address_from_args = Address.new_from_bech32(args.guardian) if hasattr(args, "guardian") and args.guardian else None
if not _is_matching_address(address_from_account, address_from_args):
raise IncorrectWalletError("Guardian wallet does not match the guardian's address set in the arguments.")
return address_from_account or address_from_args
def get_guardian_and_relayer_data(sender: str, args: Any) -> GuardianRelayerData:
guardian = load_guardian_account(args)
# get guardian address from account or from cli args
guardian_address = get_guardian_address(guardian, args)
relayer = load_relayer_account(args)
relayer_address = get_relayer_address(relayer, args)
guardian_and_relayer_data = GuardianRelayerData(
guardian=guardian,
guardian_address=guardian_address,
relayer=relayer,
relayer_address=relayer_address,
)
if guardian_and_relayer_data.guardian_address:
guardian_and_relayer_data.guardian_service_url = args.guardian_service_url
guardian_and_relayer_data.guardian_2fa_code = args.guardian_2fa_code
else:
_get_guardian_data_from_network(sender, args, guardian_and_relayer_data)
return guardian_and_relayer_data
def _get_guardian_data_from_network(sender: str, args: Any, guardian_and_relayer_data: GuardianRelayerData):
"""Updates the `guardian_and_relayer_data` parameter, that is later used."""
# if guardian not provided, get guardian from the network
guardian_data = _get_guardian_data(sender, args.proxy)
if guardian_data:
guardian_and_relayer_data.guardian_address = Address.new_from_bech32(guardian_data["guardian_address"])
# if tcs is used, set url, else get service url from args
tcs_url = guardian_data["cosigner_service_url"]
guardian_and_relayer_data.guardian_service_url = tcs_url if tcs_url else args.guardian_service_url
if guardian_and_relayer_data.guardian_service_url:
guardian_and_relayer_data.guardian_2fa_code = _ask_for_2fa_code(args)
def _get_guardian_data(address: str, proxy_url: str) -> Union[dict[str, str], None]:
if not proxy_url:
return None
guardian_data = _fetch_guardian_data(address, proxy_url)
if not bool(guardian_data.get("guarded", "")):
return None
active_guardian = guardian_data.get("activeGuardian", {})
guardian_address = active_guardian.get("address", "")
service_id = active_guardian.get("serviceUID", "")
cosigner_service_url = ""
if service_id == TCS_SERVICE_ID:
chain_id = _fetch_chain_id(proxy_url)
cosigner_service_url = get_trusted_cosigner_service_url_by_chain_id(chain_id)
return {
"guardian_address": guardian_address,
"cosigner_service_url": cosigner_service_url,
}
def _fetch_guardian_data(address: str, proxy_url: str) -> dict[str, Any]:
network_provider_config = config.get_config_for_network_providers()
proxy = ProxyNetworkProvider(url=proxy_url, config=network_provider_config)
response = proxy.do_get_generic(f"/address/{address}/guardian-data").to_dictionary()
guardian_data: dict[str, Any] = response.get("guardianData", {})
return guardian_data
def _ask_for_2fa_code(args: Any) -> str:
code: str = args.guardian_2fa_code
if not code:
code = input("Please enter the two factor authentication code: ")
return code
def get_relayer_address(relayer: Union[IAccount, None], args: Any) -> Union[Address, None]:
address_from_account = relayer.address if relayer else None
address_from_args = Address.new_from_bech32(args.relayer) if hasattr(args, "relayer") and args.relayer else None
if not _is_matching_address(address_from_account, address_from_args):
raise IncorrectWalletError("Relayer wallet does not match the relayer's address set in the arguments.")
return address_from_account or address_from_args
def _is_matching_address(account_address: Union[Address, None], args_address: Union[Address, None]) -> bool:
if account_address and args_address and account_address != args_address:
return False
return True
def load_relayer_account(args: Any) -> Union[IAccount, None]:
hrp = get_address_hrp_with_fallback(args)
if args.relayer_pem:
return Account.new_from_pem(file_path=Path(args.relayer_pem), index=args.relayer_wallet_index, hrp=hrp)
elif args.relayer_keyfile:
password = load_relayer_password(args)
index = args.relayer_wallet_index if args.relayer_wallet_index != 0 else None
try:
return Account.new_from_keystore(
Path(args.relayer_keyfile),
password=password,
address_index=index,
hrp=hrp,
)
except Exception as e:
raise WalletError(str(e))
elif args.relayer_ledger:
try:
return LedgerAccount(address_index=args.relayer_wallet_index)
except Exception as e:
raise LedgerError(str(e))
return None
def get_current_nonce_for_address(address: Address, proxy_url: Union[str, None]) -> int:
if not proxy_url:
raise ArgumentsNotProvidedError("If `--nonce` is not explicitly provided, `--proxy` must be provided")
network_provider_config = config.get_config_for_network_providers()
proxy = ProxyNetworkProvider(url=proxy_url, config=network_provider_config)
return proxy.get_account(address).nonce
@cache
def get_chain_id(proxy_url: str, chain_id: Optional[str] = None) -> str:
"""We know and have already validated that if chainID is not provided, proxy is provided."""
if chain_id:
return chain_id
return _fetch_chain_id(proxy_url)
@cache
def _fetch_chain_id(proxy_url: str) -> str:
network_provider_config = config.get_config_for_network_providers()
proxy = ProxyNetworkProvider(url=proxy_url, config=network_provider_config)
return proxy.get_network_config().chain_id
def add_broadcast_args(sub: Any, simulate: bool = True):
sub.add_argument(
"--send",
action="store_true",
default=False,
help="✓ whether to broadcast the transaction (default: %(default)s)",
)
if simulate:
sub.add_argument(
"--simulate",
action="store_true",
default=False,
help="whether to simulate the transaction (default: %(default)s)",
)
def send_or_simulate(tx: Transaction, args: Any, dump_output: bool = True) -> CLIOutputBuilder:
network_provider_config = config.get_config_for_network_providers()
proxy = ProxyNetworkProvider(url=args.proxy, config=network_provider_config)
is_set_wait_result = hasattr(args, "wait_result") and args.wait_result
is_set_send = hasattr(args, "send") and args.send
is_set_simulate = hasattr(args, "simulate") and args.simulate
send_wait_result = is_set_wait_result and is_set_send and not is_set_simulate
send_only = is_set_send and not (is_set_wait_result or is_set_simulate)
simulate = is_set_simulate and not (send_only or send_wait_result)
output_builder = CLIOutputBuilder()
output_builder.set_emitted_transaction(tx)
outfile = args.outfile if hasattr(args, "outfile") else None
hash = b""
try:
if send_wait_result:
_confirm_continuation_if_required(tx)
transaction_on_network = send_and_wait_for_result(tx, proxy, args.timeout)
output_builder.set_awaited_transaction(transaction_on_network)
elif send_only:
_confirm_continuation_if_required(tx)
hash = proxy.send_transaction(tx)
output_builder.set_emitted_transaction_hash(hash.hex())
elif simulate:
simulation = Simulator(proxy).run(tx)
output_builder.set_simulation_results(simulation)
finally:
output_transaction = output_builder.build()
if dump_output:
utils.dump_out_json(output_transaction, outfile=outfile)
if send_only and hash:
cli_config = MxpyEnv.from_active_env()
log_explorer_transaction(
chain=output_transaction["emittedTransaction"]["chainID"],
transaction_hash=output_transaction["emittedTransactionHash"],
explorer_url=cli_config.explorer_url,
)
return output_builder
def _confirm_continuation_if_required(tx: Transaction) -> None:
env = MxpyEnv.from_active_env()
if env.ask_confirmation:
transaction = tx.to_dictionary()
# decode the data field from base64 if it exists
data = base64.b64decode(transaction.get("data", "")).decode()
transaction["data"] = data if data else ""
utils.dump_out_json(transaction)
confirm_continuation("You are about to send the above transaction. Do you want to continue?")
def prepare_sender(args: Any):
"""Returns the sender's account.
If no account was provided, will raise an exception."""
sender = prepare_account(args)
sender.nonce = (
int(args.nonce) if args.nonce is not None else get_current_nonce_for_address(sender.address, args.proxy)
)
return sender
def prepare_guardian(args: Any) -> tuple[Union[IAccount, None], Union[Address, None]]:
"""Reurns a tuple containing the guardians's account and the account's address.
If no account or address were provided, will return (None, None)."""
guardian = load_guardian_account(args)
guardian_address = get_guardian_address(guardian, args)
return guardian, guardian_address
def prepare_relayer(args: Any) -> tuple[Union[IAccount, None], Union[Address, None]]:
"""Reurns a tuple containing the relayer's account and the account's address.
If no account or address were provided, will return (None, None)."""
relayer = load_relayer_account(args)
relayer_address = get_relayer_address(relayer, args)
return relayer, relayer_address
def prepare_guardian_relayer_data(args: Any) -> GuardianRelayerData:
guardian, guardian_address = prepare_guardian(args)
relayer, relayer_address = prepare_relayer(args)
return GuardianRelayerData(
guardian=guardian,
guardian_address=guardian_address,
relayer=relayer,
relayer_address=relayer_address,
)
def prepare_token_transfers(transfers: list[str]) -> list[TokenTransfer]:
"""Converts a list of token transfers as received from the CLI to a list of TokenTransfer objects."""
token_computer = TokenComputer()
token_transfers: list[TokenTransfer] = []
for i in range(0, len(transfers) - 1, 2):
extended_identifier = transfers[i]
amount = int(transfers[i + 1])
nonce = token_computer.extract_nonce_from_extended_identifier(extended_identifier)
identifier = token_computer.extract_identifier_from_extended_identifier(extended_identifier)
token = Token(identifier, nonce)
transfer = TokenTransfer(token, amount)
token_transfers.append(transfer)
return token_transfers
def set_proxy_from_config_if_not_provided(args: Any) -> None:
"""This function modifies the `args` object by setting the proxy from the config if not already set. If proxy is not needed (chainID and nonce are provided), the proxy will not be set."""
if hasattr(args, "proxy_headers") and not args.proxy_headers:
env = MxpyEnv.from_active_env()
if env.proxy_headers:
args.proxy_headers = [f"{key}={value}" for key, value in env.proxy_headers.items()]
if not hasattr(args, "proxy"):
return
if not args.proxy:
if hasattr(args, "chain") and args.chain and hasattr(args, "nonce") and args.nonce is not None:
return
env = MxpyEnv.from_active_env()
if env.proxy_url:
logger.info(f"Using proxy URL from config: {env.proxy_url}")
args.proxy = env.proxy_url
def initialize_gas_limit_estimator(args: Any) -> Union[GasLimitEstimator, None]:
# if gas limit is provided, we don't need GasLimitEstimator
if hasattr(args, "gas_limit") and args.gas_limit:
return None
# if proxy is not provided, we can't use GasLimitEstimator
if hasattr(args, "proxy") and not args.proxy:
return None
if hasattr(args, "gas_limit_multiplier") and args.gas_limit_multiplier:
multiplier: float = args.gas_limit_multiplier
else:
multiplier = config.get_gas_limit_multiplier_from_config()
network_provider_config = config.get_config_for_network_providers()
proxy = ProxyNetworkProvider(url=args.proxy, config=network_provider_config)
return GasLimitEstimator(network_provider=proxy, gas_multiplier=multiplier)
def set_options_for_hash_signing_if_needed(
transaction: Transaction,
guardian: Union[IAccount, None],
relayer: Union[IAccount, None],
):
transaction_computer = TransactionComputer()
if guardian and guardian.use_hash_signing:
transaction_computer.apply_options_for_hash_signing(transaction)
return
if relayer and relayer.use_hash_signing:
transaction_computer.apply_options_for_hash_signing(transaction)
def alter_transaction_and_sign_again_if_needed(
args: Any,
tx: Transaction,
sender: IAccount,
guardian_and_relayer_data: GuardianRelayerData,
):
initial_tx = deepcopy(tx)
set_options_for_hash_signing_if_needed(
transaction=tx,
guardian=guardian_and_relayer_data.guardian,
relayer=guardian_and_relayer_data.relayer,
)
altered = _alter_version_and_options_if_provided(
args=args,
initial_tx=initial_tx,
transaction=tx,
)
if altered:
# sign only if something was altered
_sign_transaction(tx, sender, guardian_and_relayer_data)
else:
# sign only with guardian/relayer if needed
_sign_transaction(tx, None, guardian_and_relayer_data)
def _alter_version_and_options_if_provided(
args: Any,
initial_tx: Transaction,
transaction: Transaction,
) -> bool:
"""Alters the transaction version and options if they are provided in args.
Returns True if any alteration was made, False otherwise.
"""
altered = False
if initial_tx.version != transaction.version:
altered = True
if initial_tx.options != transaction.options:
altered = True
if args.version != DEFAULT_TX_VERSION and transaction.version != args.version:
transaction.version = args.version
altered = True
if args.options and transaction.options != args.options:
transaction.options = args.options
altered = True
return altered
def _sign_transaction(
transaction: Transaction,
sender: Optional[IAccount] = None,
guardian_and_relayer_data: GuardianRelayerData = GuardianRelayerData(),
):
signer = SigningWrapper()
signer.sign_transaction(
transaction=transaction,
sender=sender,
guardian_and_relayer=guardian_and_relayer_data,
)