Mix.install([
{:dynamo, github: "bmalum/dynamo", branch: "documentation"},
{:faker, "~> 0.17"},
{:kino, "~> 0.9.0"}
])defmodule AwsCredentialImporter do
@doc """
Imports AWS temporary credentials from an export string format.
Example input:
export AWS_ACCESS_KEY_ID=ASIA1234567890EXAMPLE export AWS_SECRET_ACCESS_KEY=abc123def456ghi789example export AWS_SESSION_TOKEN=very-long-session-token-example
"""
def import_from_string(credentials_string) do
credentials_string
|> String.split("\n", trim: true)
|> Enum.map(&parse_credential_line/1)
|> Enum.filter(&(&1 != nil))
|> Enum.each(fn {key, value} ->
System.put_env(key, value)
IO.puts("Set environment variable: #{key}")
end)
:ok
end
def import_from_file(file_path) do
case File.read(file_path) do
{:ok, content} ->
import_from_string(content)
IO.puts("Successfully imported AWS credentials from #{file_path}")
{:error, reason} ->
IO.puts("Failed to read credentials file: #{inspect(reason)}")
{:error, reason}
end
end
defp parse_credential_line(line) do
if String.starts_with?(line, "export ") do
line
|> String.replace("export ", "")
|> String.split("=", parts: 2)
|> case do
[key, value] -> {key, String.trim(value)}
_ -> nil
end
else
nil
end
end
end# Create a form for AWS credentials
aws_form = Kino.Control.form(
[
access_key_id: Kino.Input.text("AWS Access Key ID"),
secret_access_key: Kino.Input.password("AWS Secret Access Key"),
session_token: Kino.Input.textarea("AWS Session Token")
],
submit: "Set AWS Credentials"
)# Handle form submission
form_data = Kino.Control.stream(aws_form)
|> Kino.listen(fn event ->
# Build credentials string from form data
credentials = """
export ISENGARD_PRODUCTION_ACCOUNT=false
export AWS_ACCESS_KEY_ID=#{event.data.access_key_id}
export AWS_SECRET_ACCESS_KEY=#{event.data.secret_access_key}
export AWS_SESSION_TOKEN=#{event.data.session_token}
"""
# Import credentials
AwsCredentialImporter.import_from_string(credentials)
# Provide feedback
IO.puts("AWS credentials have been set!")
end)This Livebook demonstrates how to use the Dynamo library to bulk insert data into a DynamoDB table. We'll create 500 items of 3 different types and insert them into DynamoDB using batch operations.
First, let's define our schemas for the three different item types we'll be working with: Products, Users, and Orders.
defmodule MyApp.Product do
use Dynamo.Schema
item do
table_name "demo_table"
field :type, default: "product"
field :id
field :name
field :category
field :price
field :stock
field :created_at
partition_key [:type]
sort_key [:id]
end
def before_write(arg) do
arg
|> IO.inspect()
|> Dynamo.Schema.generate_and_add_partition_key()
|> Dynamo.Schema.generate_and_add_sort_key()
|> Dynamo.Encodable.MyApp.Product.encode([])
|> Map.get("M")
end
end
defmodule MyApp.User do
use Dynamo.Schema
item do
table_name "demo_table"
field :type, default: "user"
field :id
field :email
field :name
field :age
field :address
field :created_at
partition_key [:type]
sort_key [:id]
end
def before_write(arg) do
arg
|> IO.inspect()
|> Dynamo.Schema.generate_and_add_partition_key()
|> Dynamo.Schema.generate_and_add_sort_key()
|> Dynamo.Encodable.MyApp.User.encode([])
|> Map.get("M")
end
end
defmodule MyApp.Order do
use Dynamo.Schema
item do
table_name "demo_table"
field :type, default: "order"
field :id
field :user_id
field :product_ids
field :total
field :status, default: "pending"
field :created_at
partition_key [:type]
sort_key [:id]
end
def before_write(arg) do
arg
|> IO.inspect()
|> Dynamo.Schema.generate_and_add_partition_key()
|> Dynamo.Schema.generate_and_add_sort_key()
|> Dynamo.Encodable.MyApp.Order.encode([])
|> Map.get("M")
end
endNow let's create functions to generate random data for each of our item types.
defmodule DataGenerator do
def generate_products(count) do
Enum.map(1..count, fn i ->
%MyApp.Product{
id: "prod-#{i}",
name: Faker.Commerce.product_name(),
category: Enum.random(["Electronics", "Clothing", "Books", "Home", "Beauty"]),
price: :rand.uniform(10000) / 100,
stock: :rand.uniform(100),
created_at: random_timestamp()
}
end)
end
def generate_users(count) do
Enum.map(1..count, fn i ->
%MyApp.User{
id: "user-#{i}",
email: Faker.Internet.email(),
name: Faker.Person.name(),
age: 18 + :rand.uniform(70),
address: Faker.Address.street_address(),
created_at: random_timestamp()
}
end)
end
def generate_orders(count, max_user_id, max_product_id) do
Enum.map(1..count, fn i ->
user_id = "user-#{:rand.uniform(max_user_id)}"
product_count = :rand.uniform(5)
product_ids = Enum.map(1..product_count, fn _ ->
"prod-#{:rand.uniform(max_product_id)}"
end)
%MyApp.Order{
id: "order-#{i}",
user_id: user_id,
product_ids: product_ids,
total: :rand.uniform(50000) / 100,
status: Enum.random(["pending", "processing", "shipped", "delivered"]),
created_at: random_timestamp()
}
end)
end
defp random_timestamp do
days_ago = :rand.uniform(365)
DateTime.utc_now()
|> DateTime.add(-days_ago * 24 * 60 * 60, :second)
|> DateTime.to_iso8601()
end
endLet's generate our 500 items (approximately 167 of each type):
# Generate items
product_count = 167
user_count = 167
order_count = 166
products = DataGenerator.generate_products(product_count)
users = DataGenerator.generate_users(user_count)
orders = DataGenerator.generate_orders(order_count, user_count, product_count)
# Combine all items
all_items = products ++ users ++ orders
IO.puts("Generated #{length(products)} products")
IO.puts("Generated #{length(users)} users")
IO.puts("Generated #{length(orders)} orders")
IO.puts("Total: #{length(all_items)} items")
# Preview a few items of each type
IO.inspect(Enum.take(products, 2), label: "Sample Products")
IO.inspect(Enum.take(users, 2), label: "Sample Users")
IO.inspect(Enum.take(orders, 2), label: "Sample Orders")Before inserting data, we need to make sure our table exists. Let's create it if it doesn't:
defmodule TableSetup do
def ensure_table_exists(table_name) do
client = Dynamo.AWS.client()
# Check if table exists
case AWS.DynamoDB.describe_table(client, %{"TableName" => table_name}) do
{:ok, _, _} ->
IO.puts("Table #{table_name} already exists")
:ok
{:error, _} ->
# Create table
create_table(client, table_name)
end
end
defp create_table(client, table_name) do
params = %{
"TableName" => table_name,
"AttributeDefinitions" => [
%{
"AttributeName" => "pk",
"AttributeType" => "S"
},
%{
"AttributeName" => "sk",
"AttributeType" => "S"
}
],
"KeySchema" => [
%{
"AttributeName" => "pk",
"KeyType" => "HASH"
},
%{
"AttributeName" => "sk",
"KeyType" => "RANGE"
}
],
"BillingMode" => "PAY_PER_REQUEST"
}
case AWS.DynamoDB.create_table(client, params) do
{:ok, _, _} ->
IO.puts("Table #{table_name} created successfully")
wait_for_table_active(client, table_name)
{:error, error} ->
IO.puts("Error creating table: #{inspect(error)}")
{:error, error}
end
end
defp wait_for_table_active(client, table_name, attempts \\ 10) do
if attempts <= 0 do
IO.puts("Timed out waiting for table to become active")
{:error, :timeout}
else
case AWS.DynamoDB.describe_table(client, %{"TableName" => table_name}) do
{:ok, %{"Table" => %{"TableStatus" => "ACTIVE"}}, _} ->
IO.puts("Table is now active")
:ok
_ ->
IO.puts("Waiting for table to become active...")
:timer.sleep(5000)
wait_for_table_active(client, table_name, attempts - 1)
end
end
end
end
TableSetup.ensure_table_exists("demo_table")Now let's insert our data into DynamoDB using batch operations:
defmodule BatchInserter do
def insert_in_batches(items, batch_size \\ 25) do
items
|> Enum.map(fn x -> Kernel.struct!(x) end)
|> Enum.chunk_every(batch_size)
|> Enum.with_index(1)
|> Enum.reduce({0, 0}, fn {batch, batch_num}, {successful, failed} ->
IO.puts("Processing batch #{batch_num}/#{ceil(length(items) / batch_size)}...")
case insert_batch(batch) do
{:ok, result} ->
new_successful = successful + result.processed_items
new_failed = failed + length(result.unprocessed_items)
IO.puts(" Batch #{batch_num} results: #{result.processed_items} successful, #{length(result.unprocessed_items)} failed")
{new_successful, new_failed}
{:error, error} ->
IO.puts(" Batch #{batch_num} failed: #{inspect(error)}")
{successful, failed + length(batch)}
end
end)
end
defp insert_batch(items) do
# Group items by their type (schema module)
items_by_type = Enum.group_by(items, fn item -> item.__struct__ end)
# Process each group separately
Enum.reduce(items_by_type, {0, []}, fn {_type, type_items}, {successful, failed} ->
case Dynamo.Table.batch_write_item(type_items) do
{:ok, result} ->
{successful + result.processed_items, failed ++ result.unprocessed_items}
{:error, _error} ->
{successful, failed ++ type_items}
end
end)
|> case do
{processed, unprocessed} ->
{:ok, %{processed_items: processed, unprocessed_items: unprocessed}}
end
end
end
# Insert all items in batches
{successful, failed} = BatchInserter.insert_in_batches(all_items)
IO.puts("\nInsertion complete!")
IO.puts("Successfully inserted: #{successful} items")
IO.puts("Failed to insert: #{failed} items")Let's verify our data was inserted correctly by querying each type:
defmodule DataVerifier do
def count_items_by_type do
[
{"Products", MyApp.Product},
{"Users", MyApp.User},
{"Orders", MyApp.Order}
]
|> Enum.each(fn {type_name, module} ->
case module.list_items(%{module | type: module.__struct__.type}) do
{:ok, items} ->
IO.puts("#{type_name} count: #{length(items)}")
{:error, error} ->
IO.puts("Error querying #{type_name}: #{inspect(error)}")
end
end)
end
def sample_query do
# Query for products in the Electronics category
case MyApp.Product.list_items(
%MyApp.Product{type: "product"},
[
filter_expression: "category = :category",
expression_attribute_values: %{
":category" => %{"S" => "Electronics"}
}
]
) do
{:ok, items} ->
IO.puts("Found #{length(items)} electronics products")
if length(items) > 0 do
IO.inspect(Enum.take(items, 3), label: "Sample Electronics Products")
end
{:error, error} ->
IO.puts("Error querying electronics products: #{inspect(error)}")
end
# Query for recent orders
one_month_ago =
DateTime.utc_now()
|> DateTime.add(-30 * 24 * 60 * 60, :second)
|> DateTime.to_iso8601()
case MyApp.Order.list_items(
%MyApp.Order{type: "order"},
[
filter_expression: "created_at > :date",
expression_attribute_values: %{
":date" => %{"S" => one_month_ago}
}
]
) do
{:ok, items} ->
IO.puts("Found #{length(items)} orders from the last 30 days")
if length(items) > 0 do
IO.inspect(Enum.take(items, 3), label: "Sample Recent Orders")
end
{:error, error} ->
IO.puts("Error querying recent orders: #{inspect(error)}")
end
end
end
DataVerifier.count_items_by_type()
DataVerifier.sample_query()In this Livebook, we've demonstrated how to:
- Define schemas for different item types using Dynamo
- Generate random data for testing
- Create a DynamoDB table if it doesn't exist
- Insert data in batches for better performance
- Query the data to verify it was inserted correctly
This approach can be used to seed test data, migrate data between tables, or perform bulk operations on DynamoDB tables.
%MyApp.Order{
created_at: "2024-10-26T18:15:09.369514Z",
status: "shipped",
total: 479.95,
product_ids: "prod-80",
user_id: "user-123",
id: "order-1",
type: "order"
}
|> Dynamo.Table.insert()