forked from pytorch/executorch
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patharm_vela.py
More file actions
144 lines (116 loc) · 5.18 KB
/
arm_vela.py
File metadata and controls
144 lines (116 loc) · 5.18 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
# Copyright 2023-2026 Arm Limited and/or its affiliates.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.
import os
import struct
import tempfile
from typing import List
import numpy as np
try:
from ethosu.vela import vela # type: ignore
has_vela = True
except ImportError:
has_vela = False
def _as_int32(value, name: str) -> int:
"""Convert numpy scalars to signed int32 with a clear error on overflow."""
arr = np.asarray(value)
if np.issubdtype(arr.dtype, np.unsignedinteger):
# Interpret unsigned values as signed (e.g., uint64 max -> -1).
arr = arr.astype(np.int64)
v = int(arr)
if v < -(2**31) or v > 2**31 - 1:
raise ValueError(f"{name} out of int32 range: {v}")
return v
# Pack either input or output tensor block, compose the related arrays into
# per-io structs to simplify runtime use.
def vela_bin_pack_io(prefix, data):
vela_input_shapes = data[prefix + "_shape"]
# Vela input/output shape is fixed to 6D
vela_io_shape_dims = 6
ios = struct.pack("<i", len(vela_input_shapes))
for i in range(len(vela_input_shapes)):
io_shape = vela_input_shapes[i]
io_elem_size = _as_int32(data[prefix + "_elem_size"][i], f"{prefix}_elem_size")
io_offset = _as_int32(data[prefix + "_offset"][i], f"{prefix}_offset")
io_region = _as_int32(data[prefix + "_region"][i], f"{prefix}_region")
if len(io_shape) != vela_io_shape_dims:
raise ValueError(
f"Expected {vela_io_shape_dims}D shape, got {len(io_shape)}D"
)
inp_pad = io_shape.tolist()
io_struct = struct.pack(
"<iiiiiiiii", *inp_pad, io_elem_size, io_offset, io_region
)
ios += io_struct
return ios
# Output via Vela to binary stream for ArmBackendEthosU
# WARNING: Do not change this without changing VelaBinStream.cpp as that
# function consumes this format and the two need to align.
def vela_compile(
tosa_flatbuffer: bytes,
args: List[str],
verbose: bool = False,
intermediate_path: str | None = None,
):
"""Compile a TOSA graph to a binary stream for ArmBackendEthosU using
Vela.
"""
if not has_vela:
raise RuntimeError(
"ethos-u-vela pip package couldn't be imported. Make sure it's installed!"
)
def run(dir: str) -> bytes:
tosaname = "out.tosa"
tosa_path = os.path.join(dir, tosaname)
with open(tosa_path, "wb") as f:
f.write(tosa_flatbuffer)
# invoke vela
output_dir = os.path.join(dir, "output")
args.append(f"--output-dir={output_dir}")
args.append(tosa_path)
if verbose:
args.append("--verbose-all")
vela.main(" ".join(args).split(" "))
np_path = os.path.join(dir, "output", "out_vela.npz")
blocks = b""
with np.load(np_path, allow_pickle=False) as data:
# Construct our modified output_blocks with data in a form easily
# digested on the device side
bin_blocks = {"vela_bin_stream": b""}
# copy command data through unmodified
bin_blocks["cmd_data"] = data["cmd_data"].tobytes()
# copy weight data through unmodified
bin_blocks["weight_data"] = data["weight_data"].tobytes()
# Add a block for scratch, inputs and outputs; scratch shape is a 1 element
# array giving us size in bytes so extract this and add a block of 0's.
# Currently we preallocated this on the host to provide SRAM for computation.
if not isinstance(data["scratch_shape"][0], np.int64):
raise RuntimeError("Expected scratch to be int64")
block_length = int(data["scratch_shape"][0])
bin_blocks["scratch_size"] = struct.pack("<I", block_length)
# Capture inputs and outputs
bin_blocks["inputs"] = vela_bin_pack_io("input", data)
bin_blocks["outputs"] = vela_bin_pack_io("output", data)
bin_blocks["vela_end_stream"] = b""
# Emit the NPZ regions as:
# - 16 byte block name null terminated string (padded to 16 if name shorter)
# - 4 bytes of int32 block length and 12 bytes of 0's
# - block data (padded to 16 byte alignment at end)
# Repeat for all blocks
for key in bin_blocks.keys():
block_name = bytes(key, "utf8")[:15]
block_name = block_name + b"\x00" * (16 - len(block_name))
# We need the acual unpadded block lengths for hw setup
block_length_bytes = struct.pack("<iiii", len(bin_blocks[key]), 0, 0, 0)
# Pad block data to multiple of 16 bytes
block_data = bin_blocks[key]
block_data = block_data + b"\x00" * (15 - (len(block_data) - 1) % 16)
block = block_name + block_length_bytes + block_data
blocks = blocks + block
return blocks
if intermediate_path is not None:
return run(intermediate_path)
else:
with tempfile.TemporaryDirectory() as tmpdir:
return run(tmpdir)