Skip to content
This repository was archived by the owner on Apr 22, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ application.conf

divolte-collector-*
flask_secret
*.avro
*.avro.partial
*.avro.crc
*.avro.partial.crc

# Maven
dependency-reduced-pom.xml
Expand Down
80 changes: 80 additions & 0 deletions data/generate/generate-event-files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import argparse
import random
from time import time
from datetime import datetime, timezone, timedelta
import base36
import secrets
from elasticsearch import Elasticsearch
import pandas as pd
import json
import requests
from collections import Counter

def generate_divolte_id():
ts = int(round(time() * 1000))
b = base36.dumps(ts)
s = secrets.token_urlsafe(24)
return "0:{timestamp}:{id}".format(timestamp=b, id = s)

def generate_event_id():
s = secrets.token_urlsafe(36)
return "0:{id}".format(id = s)


def get_items():
es = Elasticsearch()
products = es.search(index='catalog', _source_include=['id','categories'], size=200)
return [{'item_id': int(product['_source']['id'])} for product in products['hits']['hits']]

def get_events(dt):
timeframe=random.uniform(1,24)
pdf = pd.date_range(start=dt+timedelta(hours=-timeframe), end=dt, freq='S').to_frame()
sampled = pdf.sample(frac=random.uniform(1, 200)/pdf.count())
sampled.columns = ['ts']
sorted = sampled.sort_values('ts')

# add a column containing previous timestamp
combined = pd.concat([sorted,
sorted.transform(lambda x:x.shift(1))]
,axis=1)
combined.columns = ['ts','prev_ts']

# create the new session column
combined['is_new_party'] = (combined['prev_ts'].isnull())

# create the new session column
combined['is_new_session'] = ((combined['prev_ts'].isnull()) | ((combined['ts']
- combined['prev_ts'])>=timedelta(seconds=30*60)))

# create the session_id
combined['increment'] = combined['is_new_session'].cumsum()
combined['session_id'] = combined['is_new_session'].groupby(combined['increment']).transform(lambda x:generate_divolte_id())

combined['event_type'] = 'preview'
combined['event_id'] = combined['event_type'].transform(lambda x: generate_event_id())
combined['client_timestamp_iso'] = combined['ts'].transform(lambda x: x.replace(tzinfo=timezone(offset=timedelta(hours=2))).isoformat())

serie = pd.Series(get_items()[:combined['ts'].count()])
combined['parameters'] = serie.values

return combined[['session_id', 'event_id', 'is_new_party', 'is_new_session', 'client_timestamp_iso', 'event_type', 'parameters']].to_json(orient='records')

if __name__ == "__main__":

parser = argparse.ArgumentParser(description='Create Divolte AVRO files.')
parser.add_argument('--nr-of-parties', '-n', type=int, required=True, help='The number of unique parties to generate data for.')
arguments = parser.parse_args()

responses = Counter()
for i in range(0,arguments.nr_of_parties):
party = generate_divolte_id()
for j in range(0,int(random.uniform(1,6))):
data = get_events(datetime.now()-timedelta(days=j))
for event in json.loads(data):
response = requests.post('http://localhost:8290/json?p={party}'.format(party=party), json=event)
# print('POST status %d' % response.status_code)
responses.update([response.status_code])

print('\nSummary:')
for status, count in responses.items():
print('\t%d times status code %d' % (count,status))
Empty file.
Empty file.
4 changes: 4 additions & 0 deletions data/generate/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
base36
elasticsearch
pandas
requests
9 changes: 9 additions & 0 deletions data/generate/spark-read-avro.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import com.databricks.spark.avro._

val df = spark.read.format("com.databricks.spark.avro").load("data/generate/mount-data/finished")

df.show()

df.count()

df.select("sessionId").distinct.count()
1 change: 1 addition & 0 deletions divolte/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ ENV DIVOLTE_CONF_DIR ${DIVOLTE_CONF_DIR:-/etc/shop/divolte}
RUN mkdir -p /etc/shop

ADD schema/src/main/docker/divolte-collector.conf /etc/shop/divolte/divolte-collector.conf
ADD schema/src/main/docker/logback.xml /etc/shop/divolte/logback.xml
ADD schema/src/main/groovy/mapping.groovy /etc/shop/divolte/mapping.groovy
ADD schema/src/main/resources/ShopEventRecord.avsc /etc/shop/divolte/ShopEventRecord.avsc
RUN chown root:root /etc/shop/divolte/*
Expand Down
35 changes: 35 additions & 0 deletions divolte/schema/src/main/docker/divolte-collector.conf
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ divolte {
browser = {
type = browser
}
// Here's the low-level JSON source we're adding.
json = {
type = json
event_path = /json
}
}

mappings {
Expand All @@ -126,5 +131,35 @@ divolte {
sources = [browser]
sinks = [kafka]
}
file_mapping = {
schema_file = /etc/shop/divolte/ShopEventRecord.avsc
mapping_script_file = /etc/shop/divolte/mapping.groovy
sources = [json]
sinks = [hdfs]
}
}

sinks {
kafka {
type = kafka
topic = divolte
topic = ${?DIVOLTE_KAFKA_TOPIC}
}

hdfs = {
type = hdfs
file_strategy = {
sync_file_after_records = 1000
sync_file_after_records = ${?DIVOLTE_HDFS_SINK_SYNC_NR_OF_RECORDS}
sync_file_after_duration = 30 minutes
sync_file_after_duration = ${?DIVOLTE_HDFS_SINK_SYNC_DURATION}
roll_every = 60 minutes
roll_every = ${?DIVOLTE_HDFS_SINK_ROLL_DURATION}
working_dir = /tmp/working
working_dir = ${?DIVOLTE_HDFS_SINK_WORKING_DIR}
publish_dir = /tmp/processed
publish_dir = ${?DIVOLTE_HDFS_SINK_PUBLISH_DIR}
}
}
}
}
33 changes: 33 additions & 0 deletions divolte/schema/src/main/docker/logback.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright 2014 GoDataDriven B.V.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<configuration>
<appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
<target>System.out</target>
<encoder>
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSSX} [%thread] %-6level[%logger{0}]: %msg%n</pattern>
</encoder>
</appender>

<logger name="io.divolte" level="info" />
<logger name="io.divolte.server.IncomingRequestProcessor" level="inherited" />
<logger name="io.divolte.server.Server" level="inherited" />

<root level="info">
<appender-ref ref="CONSOLE"/>
</root>
</configuration>
14 changes: 14 additions & 0 deletions divolte/schema/src/main/docker/start.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash

if [ "$ENABLE_KERBEROS" = "yes" ]; then
/opt/divolte/configureKerberosClient.sh
export DIVOLTE_JAVA_OPTS="-Djava.security.krb5.conf=/etc/krb5.conf -Dsun.security.krb5.debug=true"
fi

DIVOLTE_HDFS_SINK_WORKING_DIR=${DIVOLTE_HDFS_SINK_WORKING_DIR:-/tmp/working}
mkdir -p "$DIVOLTE_HDFS_SINK_WORKING_DIR"

DIVOLTE_HDFS_SINK_PUBLISH_DIR=${DIVOLTE_HDFS_SINK_PUBLISH_DIR:-/tmp/processed}
mkdir -p "$DIVOLTE_HDFS_SINK_PUBLISH_DIR"

/opt/divolte/divolte-collector/bin/divolte-collector
9 changes: 9 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,17 @@ services:
environment:
DIVOLTE_KAFKA_BROKER_LIST: kafka:9092
DIVOLTE_CONF_DIR: /etc/shop/divolte
DIVOLTE_HDFS_ENABLED: "true"
DIVOLTE_HDFS_SINK_WORKING_DIR: /tmp/processing
DIVOLTE_HDFS_SINK_PUBLISH_DIR: /tmp/finished
DIVOLTE_HDFS_SINK_SYNC_NR_OF_RECORDS: 1000
DIVOLTE_HDFS_SINK_SYNC_DURATION: 5 minutes
DIVOLTE_HDFS_SINK_ROLL_DURATION: 15 minutes
ports:
- 8290:8290
volumes:
- ${PWD}/data/generate/mount-data/finished:/tmp/finished
- ${PWD}/data/generate/mount-data/processing:/tmp/processing
depends_on:
- kafka
links:
Expand Down
57 changes: 57 additions & 0 deletions generating-large-data.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Generating large data sets

## How to

Idea is to fire events to divolte to simulate users.
Divolte is configured to write to HDFS if the json endpoint is used.

The following command fires custom requests to that json endpoint:

```
curl -XPOST -d postdata.json -H 'Content-Type=application/json' 'http://localhost:8290/json?p=0:is8tiwk4:GKv5gCc5TtrvBTs9bXfVD8KIQ3oO~sEg'
```

The postdata.json file contains the POST data that is used. Something like this:
```
{"session_id": "0:is8tiwk4:XLEUVj9hA6AXRUOp2zuIdUpaeFOC~7AU", "event_id": "AruZ~Em0WNlAnbyzVmwM~GR0cMb6Xl9s", "is_new_party": true, "is_new_session": true, "client_timestamp_iso": "2018-08-24T13:29:39.412+02:00", "event_type": "preview", "parameters": {"item_id": "123456768"}}
```

The idea is to generate this postdata by querying elasticsearch to find out the products and categories which are available:

```
http://localhost:9200/catalog/_search?_source_includes=id,categories&size=100
```

## Usage
```
workon shop-gen-py3
cd data/generate
pip install -r requirements.txt
python generate-event-input-files.py
```

## TODO
Only preview event is currently implemented: Use the 5 known event types
Verify the statistical distribution of events to see if it really mimics users

90MB after 1st time 1000 parties
176MB after 2nd time 1000 parties

This is only for preview events.

ToDo:
- impression event
- addToBasket event
- removeFromBasket event
- default pageView events
- localhost:9011
- localhost:9011/category/animals (other categories: cars, flowers, architecture, landscape, cities, nautical)
- localhost:9011/category/animals/1 (page 2)
- localhost:9011/category/animals/1 has the add to basket button
- localhost:9011/product/<id> also has the add to basket button and a back to overview
- localhost:9011/basket (from add to basket button -> take me to checkout)
- localhost:9011/basket has the trash button (removeFromBasket event)
- localhost:9011/checkout (from basket)
- localhost:9011/download/<uuid> (after captha) can be bookmarked
- localhost:9011/search?q=tiger
- localhost:9011/search?q=tiger&page=1 (page 2 etc.)
6 changes: 6 additions & 0 deletions postdata.json.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"session_id": "0:is8tiwk4:XLEUVj9hA6AXRUOp2zuIdUpaeFOC~7AU", "event_id": "AruZ~Em0WNlAnbyzVmwM~GR0cMb6Xl9s", "is_new_party": true, "is_new_session": true, "client_timestamp_iso": "2018-08-24T13:29:39.412+02:00", "event_type": "preview", "parameters": {"item_id": "123456768"}}
{"session_id": "0:is8tiwk4:XLEUVj9hA6AXRUOp2zuIdUpaeFOC~7AU", "event_id": "AruZ~Em0WNlAnbyzVmwM~GR0cMb6Xl9s", "is_new_party": true, "is_new_session": true, "client_timestamp_iso": "2018-08-24T13:29:39.412+02:00", "event_type": "preview", "parameters": {"item_id": "123456769"}}
{"session_id": "0:is8tiwk4:XLEUVj9hA6AXRUOp2zuIdUpaeFOC~7AU", "event_id": "AruZ~Em0WNlAnbyzVmwM~GR0cMb6Xl9s", "is_new_party": true, "is_new_session": true, "client_timestamp_iso": "2018-08-24T13:29:39.412+02:00", "event_type": "preview", "parameters": {"item_id": "123456770"}}
{"session_id": "0:is8tiwk4:XLEUVj9hA6AXRUOp2zuIdUpaeFOC~7AU", "event_id": "AruZ~Em0WNlAnbyzVmwM~GR0cMb6Xl9s", "is_new_party": true, "is_new_session": true, "client_timestamp_iso": "2018-08-24T13:29:39.412+02:00", "event_type": "preview", "parameters": {"item_id": "123456771"}}
{"session_id": "0:is8tiwk4:XLEUVj9hA6AXRUOp2zuIdUpaeFOC~7AU", "event_id": "AruZ~Em0WNlAnbyzVmwM~GR0cMb6Xl9s", "is_new_party": true, "is_new_session": true, "client_timestamp_iso": "2018-08-24T13:29:39.412+02:00", "event_type": "preview", "parameters": {"item_id": "123456772"}}
{"session_id": "0:is8tiwk4:XLEUVj9hA6AXRUOp2zuIdUpaeFOC~7AU", "event_id": "AruZ~Em0WNlAnbyzVmwM~GR0cMb6Xl9s", "is_new_party": true, "is_new_session": true, "client_timestamp_iso": "2018-08-24T13:29:39.412+02:00", "event_type": "preview", "parameters": {"item_id": "123456773"}}