-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
155 lines (129 loc) · 4.92 KB
/
main.py
File metadata and controls
155 lines (129 loc) · 4.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import dagger
from dagger import dag, function, object_type
import boto3
import time
@object_type
class Example:
@function
async def localstack__quickstart(self, auth_token: dagger.Secret) -> str:
"""Example showing how to start LocalStack."""
service = dag.localstack().start(auth_token=auth_token)
await service.start()
endpoint = await service.endpoint()
print(f"LocalStack is running at {endpoint}")
# Create a test S3 bucket
s3 = boto3.client(
's3',
endpoint_url=f"http://{endpoint}",
aws_access_key_id='test',
aws_secret_access_key='test',
region_name='us-east-1'
)
s3.create_bucket(Bucket='test-bucket')
print("S3 bucket created")
# Create a test object
s3.put_object(
Bucket='test-bucket',
Key='test-object',
Body='Hello, LocalStack!'
)
print("S3 object created")
# Verify the object was created
response = s3.get_object(Bucket='test-bucket', Key='test-object')
content = response['Body'].read().decode('utf-8')
print(f"S3 object content: {content}")
@function
async def localstack__pro(self, auth_token: dagger.Secret) -> str:
"""Example showing how to start LocalStack with custom configuration."""
# Start LocalStack using the module
service = dag.localstack().start(
auth_token=auth_token,
configuration="DEBUG=1,SERVICES=ecr"
)
await service.start()
endpoint = await service.endpoint()
print(f"LocalStack is running at {endpoint}")
# Create a test ECR repository
ecr = boto3.client(
'ecr',
endpoint_url=f"http://{endpoint}",
aws_access_key_id='test',
aws_secret_access_key='test',
region_name='us-east-1'
)
repository_name = "test-ecr-repo"
ecr.create_repository(repositoryName=repository_name)
print(f"ECR repository '{repository_name}' created")
@function
async def localstack__state(self, auth_token: dagger.Secret) -> str:
"""Example showing how to manage LocalStack state using Cloud Pods."""
service = dag.localstack().start(auth_token=auth_token)
await service.start()
endpoint = await service.endpoint()
try:
# Create a test bucket
s3 = boto3.client(
's3',
endpoint_url=f"http://{endpoint}",
aws_access_key_id='test',
aws_secret_access_key='test',
region_name='us-east-1'
)
s3.create_bucket(Bucket='test-bucket')
# Save state to Cloud Pod
await dag.localstack().state(
auth_token=auth_token,
save="test-dagger-example-pod",
endpoint=f"http://{endpoint}"
)
# Reset state
await dag.localstack().state(
reset=True,
endpoint=f"http://{endpoint}"
)
# Load state back
await dag.localstack().state(
auth_token=auth_token,
load="test-dagger-example-pod",
endpoint=f"http://{endpoint}"
)
return "Success: State operations completed"
except Exception as e:
return f"Error: {str(e)}"
@function
async def localstack_ephemeral(self, auth_token: dagger.Secret) -> str:
"""Example showing how to manage LocalStack Ephemeral Instances."""
try:
# Create a new ephemeral instance
await dag.localstack().ephemeral(
auth_token=auth_token,
operation="create",
name="test-dagger-example-instance",
lifetime=60,
)
# Wait for instance to be ready
time.sleep(15)
print("Instance created")
# List instances
list_response = await dag.localstack().ephemeral(
auth_token=auth_token,
operation="list"
)
print(f"Ephemeral instances: {list_response}")
# Get instance logs
instance_logs = await dag.localstack().ephemeral(
auth_token=auth_token,
operation="logs",
name="test-dagger-example-instance"
)
print(f"Instance logs: {instance_logs}")
# Delete instance
await dag.localstack().ephemeral(
auth_token=auth_token,
operation="delete",
name="test-dagger-example-instance"
)
print("Instance deleted")
return "Success: Ephemeral instance operations completed"
except Exception as e:
return f"Error: {str(e)}"