Skip to content

Commit 9675d0e

Browse files
authored
Optimise type inference, add support for new data types and command line args (#2)
* Each CSV is now read parallelly thus making the schema infer step much faster * Support for integer and float data types have been added * Ability to pass options as command-line arguments in addition to environment variables is now possible.
1 parent 2c226a9 commit 9675d0e

7 files changed

Lines changed: 535 additions & 97 deletions

File tree

README.md

Lines changed: 242 additions & 29 deletions
Large diffs are not rendered by default.

config.env.sample

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
# Configurations for csv2sql
22

3-
# You must replace the question marks with the correct values,
3+
# You can replace any required configuration below
44
# You can edit the other configurations and change their default values to improve performance
55
# (However this may lead to crashes or unexpected behaviour)
66

7-
# The location were the generated schema file will be stored
8-
export csv2sql_schema_file_path=?
7+
# The location were the generated schema file will be stored, default to the csv2sql_source_csv_directory
8+
export csv2sql_schema_file_path=
99

10-
# The source directory where the csvs are located
11-
export csv2sql_source_csv_directory=?
10+
# The source directory where the csvs are located, default to the current directory
11+
export csv2sql_source_csv_directory=
1212

13-
# The directory were the csvs will be moved after importing to database, make sure it is present and is empty
14-
export csv2sql_imported_csv_directory=?
13+
# The directory were the csvs will be moved after importing to database, defaults to the <csv2sql_source_csv_directory>/imported
14+
export csv2sql_imported_csv_directory=
1515

16-
# The directory were the csvs will be moved after they are validated, make sure it is present and is empty
17-
export csv2sql_validated_csv_directory=?
16+
# The directory were the csvs will be moved after they are validated, defaults to the <csv2sql_source_csv_directory>/validated
17+
export csv2sql_validated_csv_directory=
1818

1919
# Set whether you want to make schema file
2020
export csv2sql_set_make_schema="true"
@@ -28,24 +28,29 @@ export csv2sql_set_insert_data="true"
2828
# Set whether to validate if the csvs have been inserted correctly
2929
export csv2sql_set_validate="true"
3030

31-
# mysql username
32-
export csv2sql_username=?
31+
# mysql username, required field if database access is required
32+
export csv2sql_username=
3333

34-
# mysql password
35-
export csv2sql_password=?
34+
# mysql password, required field if database access is required
35+
export csv2sql_password=
3636

3737
# mysql host
3838
export csv2sql_host="localhost"
3939

40-
# This is the name of the database which will be created (if not present already)
41-
export csv2sql_database_name=?
40+
# This is the name of the database which will be created (if not present already), required field if database access is required
41+
export csv2sql_database_name=
4242

4343
# The socket file location for mysql
4444
export csv2sql_socket="/var/run/mysqld/mysqld.sock"
4545

4646
# The character limit after which a db field becomes a text from a varchar
4747
export csv2sql_varchar_limit=100
4848

49+
# The chunk size to use when the schema for a csv will be infered parallely .
50+
# For example: A chunk size 100 means the csv will be read 100 rows at a time
51+
# and separate processes will be used to infer the schema of each 100 row chunk
52+
export csv2sql_schema_infer_chunk_size=100
53+
4954
# The number of workers
5055
# Increasing worker count may result in errors
5156
export csv2sql_worker_count=10
@@ -60,10 +65,10 @@ export csv2sql_insertion_chunk_size=100
6065
# Number of chunks to keep in memory (Memory required=insertion_chunk_size * job_count_limit)
6166
export csv2sql_job_count_limit=10
6267

63-
# When false, does not log the query
68+
# When false, does not log the query, possible values are false, debug, info, warn
6469
export csv2sql_log=false
6570

66-
# The time in milliseconds to wait for the query call to finishThe time in milliseconds to wait for the query call to finish
71+
# The time in milliseconds to wait for the query call to finish
6772
# (default: 15000)
6873
export csv2sql_timeout=60000
6974

executable/csv2sql.zip

4.94 KB
Binary file not shown.

lib/csv2sql.ex

Lines changed: 187 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
defmodule Csv2sql do
2-
def main(_args \\ []) do
2+
def main(args) do
33
Csv2sql.Helpers.greet()
44
# Load configuration varaibles dynamically for escripts, this is required
55
# since configuration variables are set to whatever they where when the
66
# escript was build and cannot be changed later
7-
update_config()
7+
update_config(args)
88

99
Csv2sql.Application.start(:no_args, :no_args)
1010

@@ -16,53 +16,208 @@ defmodule Csv2sql do
1616
end
1717
end
1818

19-
def update_config() do
19+
defp update_config(args) do
20+
{opts, _, _} =
21+
OptionParser.parse(args,
22+
strict: [
23+
schema_file_path: :string,
24+
source_csv_directory: :string,
25+
imported_csv_directory: :string,
26+
validated_csv_directory: :string,
27+
skip_make_schema: :boolean,
28+
skip_insert_schema: :boolean,
29+
skip_insert_data: :boolean,
30+
skip_validate_import: :boolean,
31+
db_connection_string: :string,
32+
connection_socket: :string,
33+
varchar_limit: :integer,
34+
schema_infer_chunk_size: :integer,
35+
worker_count: :integer,
36+
db_worker_count: :integer,
37+
insertion_chunk_size: :integer,
38+
job_count_limit: :integer,
39+
log: :string,
40+
timeout: :integer,
41+
connect_timeout: :integer,
42+
pool_size: :integer,
43+
queue_target: :integer,
44+
queue_interval: :integer
45+
]
46+
)
47+
48+
source_csv_directory =
49+
opts[:source_csv_directory] || System.get_env("csv2sql_source_csv_directory") || "."
50+
51+
schema_file_path =
52+
opts[:schema_file_path] || is_blank(System.get_env("csv2sql_schema_file_path")) ||
53+
source_csv_directory
54+
55+
imported_csv_directory =
56+
opts[:imported_csv_directory] ||
57+
(is_blank(System.get_env("csv2sql_imported_csv_directory")) ||
58+
"#{source_csv_directory}/imported")
59+
60+
validated_csv_directory =
61+
opts[:validated_csv_directory] ||
62+
(is_blank(System.get_env("csv2sql_validated_csv_directory")) ||
63+
"#{source_csv_directory}/validated")
64+
65+
make_schema =
66+
if opts[:skip_make_schema],
67+
do: false,
68+
else: if(System.get_env("csv2sql_set_make_schema") == "false", do: false, else: true)
69+
70+
insert_schema =
71+
if opts[:skip_insert_schema],
72+
do: false,
73+
else: if(System.get_env("csv2sql_set_insert_schema") == "false", do: false, else: true)
74+
75+
insert_data =
76+
if opts[:skip_insert_data],
77+
do: false,
78+
else: if(System.get_env("csv2sql_set_insert_data") == "false", do: false, else: true)
79+
80+
validate_import =
81+
if opts[:skip_validate_import],
82+
do: false,
83+
else: if(System.get_env("csv2sql_set_validate") == "false", do: false, else: true)
84+
85+
[username, password, host, database_name] =
86+
if opts[:db_connection_string] do
87+
str = opts[:db_connection_string]
88+
89+
[username, tmp] = String.split(str, ":")
90+
91+
[password, tmp] = String.split(tmp, "@")
92+
93+
[host, database_name] = String.split(tmp, "/")
94+
95+
[username, password, host, database_name]
96+
else
97+
[
98+
System.get_env("csv2sql_username"),
99+
System.get_env("csv2sql_password"),
100+
System.get_env("csv2sql_host"),
101+
System.get_env("csv2sql_database_name")
102+
]
103+
end
104+
105+
connection_socket =
106+
opts[:connection_socket] || System.get_env("csv2sql_socket") ||
107+
"/var/run/mysqld/mysqld.sock"
108+
109+
varchar_limit =
110+
opts[:varchar_limit] || System.get_env("csv2sql_varchar_limit") |> to_int() ||
111+
100
112+
113+
schema_infer_chunk_size =
114+
opts[:schema_infer_chunk_size] ||
115+
System.get_env("csv2sql_schema_infer_chunk_size") |> to_int() || 100
116+
117+
worker_count =
118+
opts[:worker_count] || System.get_env("csv2sql_db_worker_count") |> to_int() ||
119+
10
120+
121+
db_worker_count =
122+
opts[:db_worker_count] || System.get_env("csv2sql_db_worker_count") |> to_int() ||
123+
15
124+
125+
insertion_chunk_size =
126+
opts[:insertion_chunk_size] ||
127+
System.get_env("csv2sql_insertion_chunk_size") |> to_int() || 100
128+
129+
job_count_limit =
130+
opts[:job_count_limit] || System.get_env("csv2sql_job_count_limit") |> to_int() ||
131+
10
132+
133+
log =
134+
if(opts[:log],
135+
do: String.to_atom(opts[:log]),
136+
else: false
137+
) ||
138+
if(System.get_env("csv2sql_log") == "false") do
139+
false
140+
else
141+
if System.get_env("csv2sql_log"),
142+
do: String.to_atom(System.get_env("csv2sql_log")),
143+
else: false
144+
end
145+
146+
timeout = opts[:timeout] || System.get_env("csv2sql_timeout") |> to_int() || 60000
147+
148+
connect_timeout =
149+
opts[:connect_timeout] || System.get_env("csv2sql_connect_timeout") |> to_int() ||
150+
60000
151+
152+
pool_size = opts[:pool_size] || System.get_env("csv2sql_pool_size") |> to_int() || 20
153+
154+
queue_target =
155+
opts[:queue_target] || System.get_env("csv2sql_queue_target") |> to_int() || 5000
156+
157+
queue_interval =
158+
opts[:queue_interval] || System.get_env("csv2sql_queue_interval") |> to_int() ||
159+
1000
160+
20161
current_config = [
21162
csv2sql: [
22163
{Csv2sql.SchemaMaker,
23164
[
24-
varchar_limit: System.get_env("csv2sql_varchar_limit") |> String.to_integer(),
25-
schema_file_path: System.get_env("csv2sql_schema_file_path")
165+
varchar_limit: varchar_limit,
166+
schema_file_path: schema_file_path,
167+
schema_infer_chunk_size: schema_infer_chunk_size
26168
]},
27169
{Csv2sql.MainServer,
28170
[
29-
worker_count: System.get_env("csv2sql_worker_count") |> String.to_integer(),
30-
db_worker_count: System.get_env("csv2sql_db_worker_count") |> String.to_integer(),
31-
source_csv_directory: System.get_env("csv2sql_source_csv_directory"),
32-
imported_csv_directory: System.get_env("csv2sql_imported_csv_directory"),
33-
validated_csv_directory: System.get_env("csv2sql_validated_csv_directory"),
34-
set_validate:
35-
if(System.get_env("csv2sql_set_validate") == "true", do: true, else: false)
171+
worker_count: worker_count,
172+
db_worker_count: db_worker_count,
173+
source_csv_directory: source_csv_directory,
174+
imported_csv_directory: imported_csv_directory,
175+
validated_csv_directory: validated_csv_directory,
176+
set_validate: validate_import
36177
]},
37178
{Csv2sql.Worker,
38179
[
39-
set_make_schema:
40-
if(System.get_env("csv2sql_set_make_schema") == "true", do: true, else: false),
41-
set_insert_schema:
42-
if(System.get_env("csv2sql_set_insert_schema") == "true", do: true, else: false),
43-
set_insert_data:
44-
if(System.get_env("csv2sql_set_insert_data") == "true", do: true, else: false)
180+
set_make_schema: make_schema,
181+
set_insert_schema: insert_schema,
182+
set_insert_data: insert_data
45183
]},
46184
{Csv2sql.Repo,
47185
[
48-
username: System.get_env("csv2sql_username"),
49-
password: System.get_env("csv2sql_password"),
50-
host: System.get_env("csv2sql_host"),
51-
database_name: System.get_env("csv2sql_database_name"),
52-
insertion_chunk_size:
53-
System.get_env("csv2sql_insertion_chunk_size") |> String.to_integer(),
54-
job_count_limit: System.get_env("csv2sql_job_count_limit") |> String.to_integer(),
55-
socket: System.get_env("csv2sql_socket"),
56-
log: if(System.get_env("csv2sql_log") == "false", do: false, else: true),
57-
timeout: System.get_env("csv2sql_timeout") |> String.to_integer(),
58-
connect_timeout: System.get_env("csv2sql_connect_timeout") |> String.to_integer(),
59-
pool_size: System.get_env("csv2sql_pool_size") |> String.to_integer(),
60-
queue_target: System.get_env("csv2sql_queue_target") |> String.to_integer(),
61-
queue_interval: System.get_env("csv2sql_queue_interval") |> String.to_integer()
186+
username: username,
187+
password: password,
188+
host: host,
189+
database_name: database_name,
190+
insertion_chunk_size: insertion_chunk_size,
191+
job_count_limit: job_count_limit,
192+
socket: connection_socket,
193+
log: log,
194+
timeout: timeout,
195+
connect_timeout: connect_timeout,
196+
pool_size: pool_size,
197+
queue_target: queue_target,
198+
queue_interval: queue_interval
62199
]}
63200
]
64201
]
65202

66203
Application.put_all_env(current_config)
67204
end
205+
206+
defp is_blank(item, int \\ false) do
207+
if item == nil || item == "",
208+
do: false,
209+
else: if(int, do: to_int(item), else: item)
210+
end
211+
212+
defp to_int(str) do
213+
if is_nil(str) || str == "" do
214+
nil
215+
else
216+
try do
217+
String.to_integer(str)
218+
rescue
219+
_e in ArgumentError -> nil
220+
end
221+
end
222+
end
68223
end

lib/csv2sql/file_server.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ defmodule Csv2sql.FileServer do
2929

3030
if extension == ".csv", do: path, else: get_next_file(dir_walker_pid)
3131

32-
# Retuns nil when ther are no fore files to return
32+
# Retuns nil when ther are no more files to return
3333
other ->
3434
other
3535
end

lib/csv2sql/main_server.ex

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ defmodule Csv2sql.MainServer do
1515
# server is then free to pick up this message, which triggers the handle_info
1616
# callback, and the workers get started
1717
def init(_) do
18+
make_directories_if_not_present()
1819
Process.send_after(self(), :kickoff, 0)
1920
worker_count = Application.get_env(:csv2sql, Csv2sql.MainServer)[:worker_count]
2021
{:ok, worker_count}
@@ -102,4 +103,24 @@ defmodule Csv2sql.MainServer do
102103
true -> nil
103104
end
104105
end
106+
107+
defp make_directories_if_not_present() do
108+
source_csv_directory =
109+
Application.get_env(:csv2sql, Csv2sql.MainServer)[:source_csv_directory]
110+
111+
imported_csv_directory =
112+
Application.get_env(:csv2sql, Csv2sql.MainServer)[:imported_csv_directory]
113+
114+
validated_csv_directory =
115+
Application.get_env(:csv2sql, Csv2sql.MainServer)[:validated_csv_directory]
116+
117+
if source_csv_directory && !File.exists?(source_csv_directory) do
118+
Csv2sql.Helpers.print_msg("ERROR: csv source directory does not exists !", :red)
119+
System.halt(0)
120+
end
121+
122+
if imported_csv_directory, do: File.mkdir(imported_csv_directory)
123+
124+
if validated_csv_directory, do: File.mkdir(validated_csv_directory)
125+
end
105126
end

0 commit comments

Comments
 (0)