Skip to content

Latest commit

 

History

History
513 lines (430 loc) · 13.2 KB

File metadata and controls

513 lines (430 loc) · 13.2 KB

DynamoDB Bulk Insert Example

Mix.install([
  {:dynamo, github: "bmalum/dynamo", branch: "documentation"},
  {:faker, "~> 0.17"},
  {:kino, "~> 0.9.0"}
])

Setup

defmodule AwsCredentialImporter do
  @doc """
  Imports AWS temporary credentials from an export string format.
  
  Example input:
export AWS_ACCESS_KEY_ID=ASIA1234567890EXAMPLE export AWS_SECRET_ACCESS_KEY=abc123def456ghi789example export AWS_SESSION_TOKEN=very-long-session-token-example

"""
def import_from_string(credentials_string) do
  credentials_string
  |> String.split("\n", trim: true)
  |> Enum.map(&parse_credential_line/1)
  |> Enum.filter(&(&1 != nil))
  |> Enum.each(fn {key, value} -> 
    System.put_env(key, value)
    IO.puts("Set environment variable: #{key}")
  end)
  
  :ok
end

def import_from_file(file_path) do
  case File.read(file_path) do
    {:ok, content} -> 
      import_from_string(content)
      IO.puts("Successfully imported AWS credentials from #{file_path}")
    
    {:error, reason} ->
      IO.puts("Failed to read credentials file: #{inspect(reason)}")
      {:error, reason}
  end
end

defp parse_credential_line(line) do
  if String.starts_with?(line, "export ") do
    line
    |> String.replace("export ", "")
    |> String.split("=", parts: 2)
    |> case do
      [key, value] -> {key, String.trim(value)}
      _ -> nil
    end
  else
    nil
  end
end
end
# Create a form for AWS credentials
aws_form = Kino.Control.form(
  [
    access_key_id: Kino.Input.text("AWS Access Key ID"),
    secret_access_key: Kino.Input.password("AWS Secret Access Key"),
    session_token: Kino.Input.textarea("AWS Session Token")
  ],
  submit: "Set AWS Credentials"
)
# Handle form submission
form_data = Kino.Control.stream(aws_form)
|> Kino.listen(fn event ->
  # Build credentials string from form data
  credentials = """
  export ISENGARD_PRODUCTION_ACCOUNT=false
  export AWS_ACCESS_KEY_ID=#{event.data.access_key_id}
  export AWS_SECRET_ACCESS_KEY=#{event.data.secret_access_key}
  export AWS_SESSION_TOKEN=#{event.data.session_token}
  """
  
  # Import credentials
  AwsCredentialImporter.import_from_string(credentials)
  
  # Provide feedback
  IO.puts("AWS credentials have been set!")
end)

Introduction

This Livebook demonstrates how to use the Dynamo library to bulk insert data into a DynamoDB table. We'll create 500 items of 3 different types and insert them into DynamoDB using batch operations.

Setup

First, let's define our schemas for the three different item types we'll be working with: Products, Users, and Orders.

defmodule MyApp.Product do
  use Dynamo.Schema
  
  item do
    table_name "demo_table"
    
    field :type, default: "product"
    field :id
    field :name
    field :category
    field :price
    field :stock
    field :created_at
    
    partition_key [:type]
    sort_key [:id]
  end

          def before_write(arg) do
          arg
          |> IO.inspect()
          |> Dynamo.Schema.generate_and_add_partition_key()
          |> Dynamo.Schema.generate_and_add_sort_key()
          |> Dynamo.Encodable.MyApp.Product.encode([])
          |> Map.get("M")
        end
end

defmodule MyApp.User do
  use Dynamo.Schema

  item do
    table_name "demo_table"
    
    field :type, default: "user"
    field :id
    field :email
    field :name
    field :age
    field :address
    field :created_at
    
    partition_key [:type]
    sort_key [:id]
  end

            def before_write(arg) do
          arg
          |> IO.inspect()
          |> Dynamo.Schema.generate_and_add_partition_key()
          |> Dynamo.Schema.generate_and_add_sort_key()
          |> Dynamo.Encodable.MyApp.User.encode([])
          |> Map.get("M")
        end
end

defmodule MyApp.Order do
  use Dynamo.Schema

  item do
    table_name "demo_table"
    
    field :type, default: "order"
    field :id
    field :user_id
    field :product_ids
    field :total
    field :status, default: "pending"
    field :created_at
    
    partition_key [:type]
    sort_key [:id]
  end

            def before_write(arg) do
          arg
          |> IO.inspect()
          |> Dynamo.Schema.generate_and_add_partition_key()
          |> Dynamo.Schema.generate_and_add_sort_key()
          |> Dynamo.Encodable.MyApp.Order.encode([])
          |> Map.get("M")
        end
end

Generate Random Data

Now let's create functions to generate random data for each of our item types.

defmodule DataGenerator do
  def generate_products(count) do
    Enum.map(1..count, fn i ->
      %MyApp.Product{
        id: "prod-#{i}",
        name: Faker.Commerce.product_name(),
        category: Enum.random(["Electronics", "Clothing", "Books", "Home", "Beauty"]),
        price: :rand.uniform(10000) / 100,
        stock: :rand.uniform(100),
        created_at: random_timestamp()
      }
    end)
  end
  
  def generate_users(count) do
    Enum.map(1..count, fn i ->
      %MyApp.User{
        id: "user-#{i}",
        email: Faker.Internet.email(),
        name: Faker.Person.name(),
        age: 18 + :rand.uniform(70),
        address: Faker.Address.street_address(),
        created_at: random_timestamp()
      }
    end)
  end
  
  def generate_orders(count, max_user_id, max_product_id) do
    Enum.map(1..count, fn i ->
      user_id = "user-#{:rand.uniform(max_user_id)}"
      product_count = :rand.uniform(5)
      product_ids = Enum.map(1..product_count, fn _ -> 
        "prod-#{:rand.uniform(max_product_id)}" 
      end)
      
      %MyApp.Order{
        id: "order-#{i}",
        user_id: user_id,
        product_ids: product_ids,
        total: :rand.uniform(50000) / 100,
        status: Enum.random(["pending", "processing", "shipped", "delivered"]),
        created_at: random_timestamp()
      }
    end)
  end
  
  defp random_timestamp do
    days_ago = :rand.uniform(365)
    DateTime.utc_now()
    |> DateTime.add(-days_ago * 24 * 60 * 60, :second)
    |> DateTime.to_iso8601()
  end
end

Generate Items

Let's generate our 500 items (approximately 167 of each type):

# Generate items
product_count = 167
user_count = 167
order_count = 166

products = DataGenerator.generate_products(product_count)
users = DataGenerator.generate_users(user_count)
orders = DataGenerator.generate_orders(order_count, user_count, product_count)

# Combine all items
all_items = products ++ users ++ orders

IO.puts("Generated #{length(products)} products")
IO.puts("Generated #{length(users)} users")
IO.puts("Generated #{length(orders)} orders")
IO.puts("Total: #{length(all_items)} items")

# Preview a few items of each type
IO.inspect(Enum.take(products, 2), label: "Sample Products")
IO.inspect(Enum.take(users, 2), label: "Sample Users")
IO.inspect(Enum.take(orders, 2), label: "Sample Orders")

Create DynamoDB Table

Before inserting data, we need to make sure our table exists. Let's create it if it doesn't:

defmodule TableSetup do
  def ensure_table_exists(table_name) do
    client = Dynamo.AWS.client()
    
    # Check if table exists
    case AWS.DynamoDB.describe_table(client, %{"TableName" => table_name}) do
      {:ok, _, _} ->
        IO.puts("Table #{table_name} already exists")
        :ok
        
      {:error, _} ->
        # Create table
        create_table(client, table_name)
    end
  end
  
  defp create_table(client, table_name) do
    params = %{
      "TableName" => table_name,
      "AttributeDefinitions" => [
        %{
          "AttributeName" => "pk",
          "AttributeType" => "S"
        },
        %{
          "AttributeName" => "sk",
          "AttributeType" => "S"
        }
      ],
      "KeySchema" => [
        %{
          "AttributeName" => "pk",
          "KeyType" => "HASH"
        },
        %{
          "AttributeName" => "sk",
          "KeyType" => "RANGE"
        }
      ],
      "BillingMode" => "PAY_PER_REQUEST"
    }
    
    case AWS.DynamoDB.create_table(client, params) do
      {:ok, _, _} ->
        IO.puts("Table #{table_name} created successfully")
        wait_for_table_active(client, table_name)
        
      {:error, error} ->
        IO.puts("Error creating table: #{inspect(error)}")
        {:error, error}
    end
  end
  
  defp wait_for_table_active(client, table_name, attempts \\ 10) do
    if attempts <= 0 do
      IO.puts("Timed out waiting for table to become active")
      {:error, :timeout}
    else
      case AWS.DynamoDB.describe_table(client, %{"TableName" => table_name}) do
        {:ok, %{"Table" => %{"TableStatus" => "ACTIVE"}}, _} ->
          IO.puts("Table is now active")
          :ok
          
        _ ->
          IO.puts("Waiting for table to become active...")
          :timer.sleep(5000)
          wait_for_table_active(client, table_name, attempts - 1)
      end
    end
  end
end

TableSetup.ensure_table_exists("demo_table")

Insert Data in Batches

Now let's insert our data into DynamoDB using batch operations:

defmodule BatchInserter do
  def insert_in_batches(items, batch_size \\ 25) do
    items
    |> Enum.map(fn x -> Kernel.struct!(x) end)
    |> Enum.chunk_every(batch_size)
    |> Enum.with_index(1)
    |> Enum.reduce({0, 0}, fn {batch, batch_num}, {successful, failed} ->
      IO.puts("Processing batch #{batch_num}/#{ceil(length(items) / batch_size)}...")
      
      case insert_batch(batch) do
        {:ok, result} ->
          new_successful = successful + result.processed_items
          new_failed = failed + length(result.unprocessed_items)
          
          IO.puts("  Batch #{batch_num} results: #{result.processed_items} successful, #{length(result.unprocessed_items)} failed")
          {new_successful, new_failed}
          
        {:error, error} ->
          IO.puts("  Batch #{batch_num} failed: #{inspect(error)}")
          {successful, failed + length(batch)}
      end
    end)
  end
  
  defp insert_batch(items) do
    # Group items by their type (schema module)
    items_by_type = Enum.group_by(items, fn item -> item.__struct__ end)
    
    # Process each group separately
    Enum.reduce(items_by_type, {0, []}, fn {_type, type_items}, {successful, failed} ->
      case Dynamo.Table.batch_write_item(type_items) do
        {:ok, result} ->
          {successful + result.processed_items, failed ++ result.unprocessed_items}
          
        {:error, _error} ->
          {successful, failed ++ type_items}
      end
    end)
    |> case do
      {processed, unprocessed} ->
        {:ok, %{processed_items: processed, unprocessed_items: unprocessed}}
    end
  end
end

# Insert all items in batches
{successful, failed} = BatchInserter.insert_in_batches(all_items)

IO.puts("\nInsertion complete!")
IO.puts("Successfully inserted: #{successful} items")
IO.puts("Failed to insert: #{failed} items")

Query the Data

Let's verify our data was inserted correctly by querying each type:

defmodule DataVerifier do
  def count_items_by_type do
    [
      {"Products", MyApp.Product},
      {"Users", MyApp.User},
      {"Orders", MyApp.Order}
    ]
    |> Enum.each(fn {type_name, module} ->
      case module.list_items(%{module | type: module.__struct__.type}) do
        {:ok, items} ->
          IO.puts("#{type_name} count: #{length(items)}")
          
        {:error, error} ->
          IO.puts("Error querying #{type_name}: #{inspect(error)}")
      end
    end)
  end
  
  def sample_query do
    # Query for products in the Electronics category
    case MyApp.Product.list_items(
      %MyApp.Product{type: "product"},
      [
        filter_expression: "category = :category",
        expression_attribute_values: %{
          ":category" => %{"S" => "Electronics"}
        }
      ]
    ) do
      {:ok, items} ->
        IO.puts("Found #{length(items)} electronics products")
        if length(items) > 0 do
          IO.inspect(Enum.take(items, 3), label: "Sample Electronics Products")
        end
        
      {:error, error} ->
        IO.puts("Error querying electronics products: #{inspect(error)}")
    end
    
    # Query for recent orders
    one_month_ago = 
      DateTime.utc_now()
      |> DateTime.add(-30 * 24 * 60 * 60, :second)
      |> DateTime.to_iso8601()
      
    case MyApp.Order.list_items(
      %MyApp.Order{type: "order"},
      [
        filter_expression: "created_at > :date",
        expression_attribute_values: %{
          ":date" => %{"S" => one_month_ago}
        }
      ]
    ) do
      {:ok, items} ->
        IO.puts("Found #{length(items)} orders from the last 30 days")
        if length(items) > 0 do
          IO.inspect(Enum.take(items, 3), label: "Sample Recent Orders")
        end
        
      {:error, error} ->
        IO.puts("Error querying recent orders: #{inspect(error)}")
    end
  end
end

DataVerifier.count_items_by_type()
DataVerifier.sample_query()

Conclusion

In this Livebook, we've demonstrated how to:

  1. Define schemas for different item types using Dynamo
  2. Generate random data for testing
  3. Create a DynamoDB table if it doesn't exist
  4. Insert data in batches for better performance
  5. Query the data to verify it was inserted correctly

This approach can be used to seed test data, migrate data between tables, or perform bulk operations on DynamoDB tables.

%MyApp.Order{
    created_at: "2024-10-26T18:15:09.369514Z",
    status: "shipped",
    total: 479.95,
    product_ids: "prod-80",
    user_id: "user-123",
    id: "order-1",
    type: "order"
  }
|> Dynamo.Table.insert()