From 0dfad21ebf2a4e6e11e332544b2ac6eb2d322bca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=9E=D0=B2?= =?UTF-8?q?=D1=87=D0=B8=D0=BD=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Mon, 16 Sep 2019 20:17:09 +0300 Subject: [PATCH 01/13] allow client to reconnect if other side closes connection --- lib/thrift/binary/framed/client.ex | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/lib/thrift/binary/framed/client.ex b/lib/thrift/binary/framed/client.ex index 18be648b..39fdb22b 100644 --- a/lib/thrift/binary/framed/client.ex +++ b/lib/thrift/binary/framed/client.ex @@ -42,6 +42,7 @@ defmodule Thrift.Binary.Framed.Client do {:tcp_opts, [tcp_option]} | {:ssl_opts, [SSL.option()]} | {:gen_server_opts, [genserver_call_option]} + | {:reconnect, boolean} @type options :: [option] @@ -55,7 +56,8 @@ defmodule Thrift.Binary.Framed.Client do ssl_opts: [SSL.option()], timeout: integer, sock: {:gen_tcp, :gen_tcp.socket()} | {:ssl, :ssl.sslsocket()}, - seq_id: integer + seq_id: integer, + reconnect: boolean } defstruct host: nil, port: nil, @@ -64,7 +66,8 @@ defmodule Thrift.Binary.Framed.Client do ssl_opts: nil, timeout: 5000, sock: nil, - seq_id: 0 + seq_id: 0, + reconnect: false end require Logger @@ -74,6 +77,7 @@ defmodule Thrift.Binary.Framed.Client do def init({host, port, opts}) do tcp_opts = Keyword.get(opts, :tcp_opts, []) ssl_opts = Keyword.get(opts, :ssl_opts, []) + reconnect = Keyword.get(opts, :reconnect, false) {timeout, tcp_opts} = Keyword.pop(tcp_opts, :timeout, 5000) @@ -82,7 +86,8 @@ defmodule Thrift.Binary.Framed.Client do port: port, tcp_opts: tcp_opts, ssl_opts: ssl_opts, - timeout: timeout + timeout: timeout, + reconnect: reconnect } {:connect, :init, s} @@ -246,7 +251,7 @@ defmodule Thrift.Binary.Framed.Client do def handle_call( {:call, rpc_name, serialized_args, tcp_opts}, _, - %{sock: {transport, sock}, seq_id: seq_id, timeout: default_timeout} = s + %{sock: {transport, sock}, seq_id: seq_id, timeout: default_timeout, reconnect: reconnect} = s ) do s = %{s | seq_id: seq_id + 1} message = Binary.serialize(:message_begin, {:call, seq_id, rpc_name}) @@ -257,6 +262,13 @@ defmodule Thrift.Binary.Framed.Client do reply = deserialize_message_reply(message, rpc_name, seq_id) {:reply, reply, s} else + {:error, :closed} = error -> + if reconnect do + {:connect, :reconnect, s} + else + {:disconnect, error, error, s} + end + {:error, :timeout} = error -> {:disconnect, {:error, :timeout, timeout}, error, s} @@ -277,7 +289,7 @@ defmodule Thrift.Binary.Framed.Client do def handle_cast( {:oneway, rpc_name, serialized_args}, - %{sock: {transport, sock}, seq_id: seq_id} = s + %{sock: {transport, sock}, seq_id: seq_id, reconnect: reconnect} = s ) do s = %{s | seq_id: seq_id + 1} message = Binary.serialize(:message_begin, {:oneway, seq_id, rpc_name}) @@ -286,6 +298,13 @@ defmodule Thrift.Binary.Framed.Client do :ok -> {:noreply, s} + {:error, :closed} = error -> + if reconnect do + {:connect, :reconnect, s} + else + {:disconnect, error, s} + end + {:error, _} = error -> {:disconnect, error, s} end From d3b0edcfbbeb2badcb0f5fd2fef7ce4ae903ce35 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=9E=D0=B2?= =?UTF-8?q?=D1=87=D0=B8=D0=BD=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Mon, 16 Sep 2019 20:51:05 +0300 Subject: [PATCH 02/13] format fixed, added cleaning sock in state before reconnect --- lib/thrift/binary/framed/client.ex | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/lib/thrift/binary/framed/client.ex b/lib/thrift/binary/framed/client.ex index 39fdb22b..2aa6ec7e 100644 --- a/lib/thrift/binary/framed/client.ex +++ b/lib/thrift/binary/framed/client.ex @@ -251,7 +251,12 @@ defmodule Thrift.Binary.Framed.Client do def handle_call( {:call, rpc_name, serialized_args, tcp_opts}, _, - %{sock: {transport, sock}, seq_id: seq_id, timeout: default_timeout, reconnect: reconnect} = s + %{ + sock: {transport, sock}, + seq_id: seq_id, + timeout: default_timeout, + reconnect: reconnect + } = s ) do s = %{s | seq_id: seq_id + 1} message = Binary.serialize(:message_begin, {:call, seq_id, rpc_name}) @@ -264,7 +269,7 @@ defmodule Thrift.Binary.Framed.Client do else {:error, :closed} = error -> if reconnect do - {:connect, :reconnect, s} + {:connect, :reconnect, %{s | sock: nil}} else {:disconnect, error, error, s} end @@ -300,7 +305,7 @@ defmodule Thrift.Binary.Framed.Client do {:error, :closed} = error -> if reconnect do - {:connect, :reconnect, s} + {:connect, :reconnect, %{s | sock: nil}} else {:disconnect, error, s} end From ea2e2462a95406d7633ba8780cad1416b4b65393 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=9E=D0=B2?= =?UTF-8?q?=D1=87=D0=B8=D0=BD=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Mon, 16 Sep 2019 22:27:03 +0300 Subject: [PATCH 03/13] added: resending data after succesfull reconnect --- lib/thrift/binary/framed/client.ex | 49 +++++++++++++++++++++++++----- 1 file changed, 42 insertions(+), 7 deletions(-) diff --git a/lib/thrift/binary/framed/client.ex b/lib/thrift/binary/framed/client.ex index 2aa6ec7e..f403226f 100644 --- a/lib/thrift/binary/framed/client.ex +++ b/lib/thrift/binary/framed/client.ex @@ -142,15 +142,20 @@ defmodule Thrift.Binary.Framed.Client do def close(conn), do: Connection.call(conn, :close) @impl Connection - def connect(_info, %{sock: nil, host: host, port: port, tcp_opts: opts, timeout: timeout} = s) do + def connect(info, %{sock: nil, host: host, port: port, tcp_opts: opts, timeout: timeout} = s) do opts = opts |> Keyword.merge(@immutable_tcp_opts) |> Keyword.put_new(:send_timeout, 1000) + # reset sequence id for newly created connection + s = %{s | seq_id: 0} + case :gen_tcp.connect(host, port, opts, timeout) do {:ok, sock} -> - maybe_ssl_handshake(sock, host, port, s) + sock + |> maybe_ssl_handshake(host, port, s) + |> maybe_resend_data(info) {:error, :timeout} = error -> Logger.error("Failed to connect to #{host}:#{port} due to timeout after #{timeout}ms") @@ -249,8 +254,8 @@ defmodule Thrift.Binary.Framed.Client do end def handle_call( - {:call, rpc_name, serialized_args, tcp_opts}, - _, + {:call, rpc_name, serialized_args, tcp_opts} = msg, + from, %{ sock: {transport, sock}, seq_id: seq_id, @@ -269,7 +274,7 @@ defmodule Thrift.Binary.Framed.Client do else {:error, :closed} = error -> if reconnect do - {:connect, :reconnect, %{s | sock: nil}} + {:connect, {:reconnect, :call, msg, from}, %{s | sock: nil}} else {:disconnect, error, error, s} end @@ -293,7 +298,7 @@ defmodule Thrift.Binary.Framed.Client do end def handle_cast( - {:oneway, rpc_name, serialized_args}, + {:oneway, rpc_name, serialized_args} = msg, %{sock: {transport, sock}, seq_id: seq_id, reconnect: reconnect} = s ) do s = %{s | seq_id: seq_id + 1} @@ -305,7 +310,7 @@ defmodule Thrift.Binary.Framed.Client do {:error, :closed} = error -> if reconnect do - {:connect, :reconnect, %{s | sock: nil}} + {:connect, {:reconnect, :cast, msg}, %{s | sock: nil}} else {:disconnect, error, s} end @@ -396,4 +401,34 @@ defmodule Thrift.Binary.Framed.Client do {:stop, error, s} end end + + defp maybe_resend_data({:ok, s}, {:reconnect, :call, msg, from}) do + case handle_call(msg, from, s) do + {:reply, reply, s} -> + GenServer.reply(from, reply) + {:ok, s} + + {:disconnect, info, error, s} -> + GenServer.reply(from, error) + disconnect(info, s) + + _ -> + {:ok, s} + end + end + + defp maybe_resend_data({:ok, s}, {:reconnect, :cast, msg}) do + case handle_cast(msg, s) do + {:noreply, s} -> + {:ok, s} + + {:disconnect, info, s} -> + disconnect(info, s) + + _ -> + {:ok, s} + end + end + + defp maybe_resend_data(reply, _), do: reply end From 6f97f5614fcaf28c03c094366f01b830a135168d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=9E=D0=B2?= =?UTF-8?q?=D1=87=D0=B8=D0=BD=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Tue, 17 Sep 2019 18:46:26 +0300 Subject: [PATCH 04/13] connection is active now, tests added --- lib/thrift/binary/framed/client.ex | 39 +++++++++++++++++++---- test/thrift/binary/framed/server_test.exs | 20 ++++++++++++ 2 files changed, 52 insertions(+), 7 deletions(-) diff --git a/lib/thrift/binary/framed/client.ex b/lib/thrift/binary/framed/client.ex index f403226f..75394be5 100644 --- a/lib/thrift/binary/framed/client.ex +++ b/lib/thrift/binary/framed/client.ex @@ -21,7 +21,7 @@ defmodule Thrift.Binary.Framed.Client do alias Thrift.TApplicationException alias Thrift.Transport.SSL - @immutable_tcp_opts [active: false, packet: 4, mode: :binary] + @immutable_tcp_opts [active: true, packet: 4, mode: :binary] @type error :: {:error, atom} | {:error, {:exception, struct}} @type success :: {:ok, binary} @@ -168,10 +168,13 @@ defmodule Thrift.Binary.Framed.Client do end @impl Connection - def disconnect(info, %{sock: {transport, sock}}) do + def disconnect(info, %{sock: {transport, sock}} = s) do :ok = transport.close(sock) case info do + {:reconnect, _} = reconnect -> + {:connect, info, %{s | sock: nil}} + {:close, from} -> Connection.reply(from, :ok) {:stop, :normal, nil} @@ -268,13 +271,13 @@ defmodule Thrift.Binary.Framed.Client do timeout = Keyword.get(tcp_opts, :timeout, default_timeout) with :ok <- transport.send(sock, [message | serialized_args]), - {:ok, message} <- transport.recv(sock, 0, timeout) do + {:ok, message} <- receive_message(sock, timeout) do reply = deserialize_message_reply(message, rpc_name, seq_id) {:reply, reply, s} else {:error, :closed} = error -> if reconnect do - {:connect, {:reconnect, :call, msg, from}, %{s | sock: nil}} + {:disconnect, {:reconnect, {:call, msg, from}}, s} else {:disconnect, error, error, s} end @@ -310,7 +313,7 @@ defmodule Thrift.Binary.Framed.Client do {:error, :closed} = error -> if reconnect do - {:connect, {:reconnect, :cast, msg}, %{s | sock: nil}} + {:disconnect, {:reconnect, {:cast, msg}}, s} else {:disconnect, error, s} end @@ -320,10 +323,32 @@ defmodule Thrift.Binary.Framed.Client do end end + @impl Connection + def handle_info({:tcp_closed, sock}, %{reconnect: true, sock: {_transport, sock}} = s) do + {:disconnect, {:reconnect, nil}, s} + end + + def handle_info(_, s) do + {:noreply, s} + end + def deserialize_message_reply(message, rpc_name, seq_id) do handle_message(Binary.deserialize(:message_begin, message), seq_id, rpc_name) end + defp receive_message(sock, timeout) do + receive do + {:tcp, ^sock, data} -> {:ok, data} + {:tcp_closed, ^sock} -> {:error, :closed} + {:tcp_error, ^sock, error} -> {:error, error} + {:ssl, ^sock, data} -> {:ok, data} + {:ssl_closed, ^sock} -> {:error, :closed} + {:ssl_error, ^sock, error} -> {:error, error} + after + timeout -> {:error, :timeout} + end + end + defp handle_message({:ok, {:reply, seq_id, rpc_name, serialized_response}}, seq_id, rpc_name) do {:ok, serialized_response} end @@ -402,7 +427,7 @@ defmodule Thrift.Binary.Framed.Client do end end - defp maybe_resend_data({:ok, s}, {:reconnect, :call, msg, from}) do + defp maybe_resend_data({:ok, s}, {:reconnect, {:call, msg, from}}) do case handle_call(msg, from, s) do {:reply, reply, s} -> GenServer.reply(from, reply) @@ -417,7 +442,7 @@ defmodule Thrift.Binary.Framed.Client do end end - defp maybe_resend_data({:ok, s}, {:reconnect, :cast, msg}) do + defp maybe_resend_data({:ok, s}, {:reconnect, {:cast, msg}}) do case handle_cast(msg, s) do {:noreply, s} -> {:ok, s} diff --git a/test/thrift/binary/framed/server_test.exs b/test/thrift/binary/framed/server_test.exs index 43b73e5c..ad9b8006 100644 --- a/test/thrift/binary/framed/server_test.exs +++ b/test/thrift/binary/framed/server_test.exs @@ -177,4 +177,24 @@ defmodule Servers.Binary.Framed.IntegrationTest do thrift_test "client methods can be called by name instead of pid", %{client_name: name} do assert {:ok, true} == Client.ping(name) end + + @ping_reply <<128, 1, 0, 2, 0, 0, 0, 4, 112, 105, 110, 103, 0, 0, 0, 0, 2, 0, 0, 1, 0>> + thrift_test "client can reconnect when connection closed by server", ctx do + {:ok, sock} = :gen_tcp.listen(0, [:binary, packet: 4, active: false]) + {:ok, port} = :inet.port(sock) + first_conn = Task.async(fn -> + {:ok, conn} = :gen_tcp.accept(sock) + :ok = :gen_tcp.close(conn) + end) + name = String.to_atom("#{ctx.client_name}_1") + {:ok, client} = Client.start_link("localhost", port, name: name, reconnect: true) + second_conn = Task.async(fn -> + {:ok, conn} = :gen_tcp.accept(sock) + {:ok, _} = :gen_tcp.recv(conn, 0) + :ok = :gen_tcp.send(conn, @ping_reply) + end) + assert {:ok, true} == Client.ping(client) + Task.await(first_conn) + Task.await(second_conn) + end end From a0be2d935345689959359845c1af261a30595076 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=9E=D0=B2?= =?UTF-8?q?=D1=87=D0=B8=D0=BD=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Tue, 17 Sep 2019 19:00:11 +0300 Subject: [PATCH 05/13] format fixed --- lib/thrift/binary/framed/client.ex | 2 +- test/thrift/binary/framed/server_test.exs | 24 ++++++++++++++--------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/lib/thrift/binary/framed/client.ex b/lib/thrift/binary/framed/client.ex index 75394be5..f6ac7355 100644 --- a/lib/thrift/binary/framed/client.ex +++ b/lib/thrift/binary/framed/client.ex @@ -172,7 +172,7 @@ defmodule Thrift.Binary.Framed.Client do :ok = transport.close(sock) case info do - {:reconnect, _} = reconnect -> + {:reconnect, _} -> {:connect, info, %{s | sock: nil}} {:close, from} -> diff --git a/test/thrift/binary/framed/server_test.exs b/test/thrift/binary/framed/server_test.exs index ad9b8006..ad4b4821 100644 --- a/test/thrift/binary/framed/server_test.exs +++ b/test/thrift/binary/framed/server_test.exs @@ -182,17 +182,23 @@ defmodule Servers.Binary.Framed.IntegrationTest do thrift_test "client can reconnect when connection closed by server", ctx do {:ok, sock} = :gen_tcp.listen(0, [:binary, packet: 4, active: false]) {:ok, port} = :inet.port(sock) - first_conn = Task.async(fn -> - {:ok, conn} = :gen_tcp.accept(sock) - :ok = :gen_tcp.close(conn) - end) + + first_conn = + Task.async(fn -> + {:ok, conn} = :gen_tcp.accept(sock) + :ok = :gen_tcp.close(conn) + end) + name = String.to_atom("#{ctx.client_name}_1") {:ok, client} = Client.start_link("localhost", port, name: name, reconnect: true) - second_conn = Task.async(fn -> - {:ok, conn} = :gen_tcp.accept(sock) - {:ok, _} = :gen_tcp.recv(conn, 0) - :ok = :gen_tcp.send(conn, @ping_reply) - end) + + second_conn = + Task.async(fn -> + {:ok, conn} = :gen_tcp.accept(sock) + {:ok, _} = :gen_tcp.recv(conn, 0) + :ok = :gen_tcp.send(conn, @ping_reply) + end) + assert {:ok, true} == Client.ping(client) Task.await(first_conn) Task.await(second_conn) From 719ad3f711ec4624b79c2ed825505deb59cc8e10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=9E=D0=B2?= =?UTF-8?q?=D1=87=D0=B8=D0=BD=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Tue, 17 Sep 2019 19:31:07 +0300 Subject: [PATCH 06/13] changed: separate handling of incoming messages for tcp and ssl --- lib/thrift/binary/framed/client.ex | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/lib/thrift/binary/framed/client.ex b/lib/thrift/binary/framed/client.ex index f6ac7355..eb39d1b9 100644 --- a/lib/thrift/binary/framed/client.ex +++ b/lib/thrift/binary/framed/client.ex @@ -271,7 +271,7 @@ defmodule Thrift.Binary.Framed.Client do timeout = Keyword.get(tcp_opts, :timeout, default_timeout) with :ok <- transport.send(sock, [message | serialized_args]), - {:ok, message} <- receive_message(sock, timeout) do + {:ok, message} <- receive_message(transport, sock, timeout) do reply = deserialize_message_reply(message, rpc_name, seq_id) {:reply, reply, s} else @@ -336,11 +336,18 @@ defmodule Thrift.Binary.Framed.Client do handle_message(Binary.deserialize(:message_begin, message), seq_id, rpc_name) end - defp receive_message(sock, timeout) do + defp receive_message(:gen_tcp, sock, timeout) do receive do {:tcp, ^sock, data} -> {:ok, data} {:tcp_closed, ^sock} -> {:error, :closed} {:tcp_error, ^sock, error} -> {:error, error} + after + timeout -> {:error, :timeout} + end + end + + defp receive_message(:ssl, sock, timeout) do + receive do {:ssl, ^sock, data} -> {:ok, data} {:ssl_closed, ^sock} -> {:error, :closed} {:ssl_error, ^sock, error} -> {:error, error} From 29bca94750288228210b47099d44396fb5844a53 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=9E=D0=B2?= =?UTF-8?q?=D1=87=D0=B8=D0=BD=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Wed, 18 Sep 2019 18:39:22 +0300 Subject: [PATCH 07/13] removed: resending data after reconnect --- lib/thrift/binary/framed/client.ex | 71 ++++------------------- mix.lock | 2 +- test/thrift/binary/framed/server_test.exs | 13 ++++- test/thrift/generator/utils_test.exs | 20 +++---- 4 files changed, 35 insertions(+), 71 deletions(-) diff --git a/lib/thrift/binary/framed/client.ex b/lib/thrift/binary/framed/client.ex index eb39d1b9..7ba078aa 100644 --- a/lib/thrift/binary/framed/client.ex +++ b/lib/thrift/binary/framed/client.ex @@ -142,7 +142,7 @@ defmodule Thrift.Binary.Framed.Client do def close(conn), do: Connection.call(conn, :close) @impl Connection - def connect(info, %{sock: nil, host: host, port: port, tcp_opts: opts, timeout: timeout} = s) do + def connect(_info, %{sock: nil, host: host, port: port, tcp_opts: opts, timeout: timeout} = s) do opts = opts |> Keyword.merge(@immutable_tcp_opts) @@ -153,9 +153,7 @@ defmodule Thrift.Binary.Framed.Client do case :gen_tcp.connect(host, port, opts, timeout) do {:ok, sock} -> - sock - |> maybe_ssl_handshake(host, port, s) - |> maybe_resend_data(info) + maybe_ssl_handshake(sock, host, port, s) {:error, :timeout} = error -> Logger.error("Failed to connect to #{host}:#{port} due to timeout after #{timeout}ms") @@ -257,14 +255,9 @@ defmodule Thrift.Binary.Framed.Client do end def handle_call( - {:call, rpc_name, serialized_args, tcp_opts} = msg, - from, - %{ - sock: {transport, sock}, - seq_id: seq_id, - timeout: default_timeout, - reconnect: reconnect - } = s + {:call, rpc_name, serialized_args, tcp_opts}, + _from, + %{sock: {transport, sock}, seq_id: seq_id, timeout: default_timeout} = s ) do s = %{s | seq_id: seq_id + 1} message = Binary.serialize(:message_begin, {:call, seq_id, rpc_name}) @@ -275,13 +268,6 @@ defmodule Thrift.Binary.Framed.Client do reply = deserialize_message_reply(message, rpc_name, seq_id) {:reply, reply, s} else - {:error, :closed} = error -> - if reconnect do - {:disconnect, {:reconnect, {:call, msg, from}}, s} - else - {:disconnect, error, error, s} - end - {:error, :timeout} = error -> {:disconnect, {:error, :timeout, timeout}, error, s} @@ -301,8 +287,8 @@ defmodule Thrift.Binary.Framed.Client do end def handle_cast( - {:oneway, rpc_name, serialized_args} = msg, - %{sock: {transport, sock}, seq_id: seq_id, reconnect: reconnect} = s + {:oneway, rpc_name, serialized_args}, + %{sock: {transport, sock}, seq_id: seq_id} = s ) do s = %{s | seq_id: seq_id + 1} message = Binary.serialize(:message_begin, {:oneway, seq_id, rpc_name}) @@ -311,20 +297,17 @@ defmodule Thrift.Binary.Framed.Client do :ok -> {:noreply, s} - {:error, :closed} = error -> - if reconnect do - {:disconnect, {:reconnect, {:cast, msg}}, s} - else - {:disconnect, error, s} - end - {:error, _} = error -> {:disconnect, error, s} end end @impl Connection - def handle_info({:tcp_closed, sock}, %{reconnect: true, sock: {_transport, sock}} = s) do + def handle_info({:tcp_closed, sock}, %{reconnect: true, sock: {:gen_tcp, sock}} = s) do + {:disconnect, {:reconnect, nil}, s} + end + + def handle_info({:ssl_closed, sock}, %{reconnect: true, sock: {:ssl, sock}} = s) do {:disconnect, {:reconnect, nil}, s} end @@ -433,34 +416,4 @@ defmodule Thrift.Binary.Framed.Client do {:stop, error, s} end end - - defp maybe_resend_data({:ok, s}, {:reconnect, {:call, msg, from}}) do - case handle_call(msg, from, s) do - {:reply, reply, s} -> - GenServer.reply(from, reply) - {:ok, s} - - {:disconnect, info, error, s} -> - GenServer.reply(from, error) - disconnect(info, s) - - _ -> - {:ok, s} - end - end - - defp maybe_resend_data({:ok, s}, {:reconnect, {:cast, msg}}) do - case handle_cast(msg, s) do - {:noreply, s} -> - {:ok, s} - - {:disconnect, info, s} -> - disconnect(info, s) - - _ -> - {:ok, s} - end - end - - defp maybe_resend_data(reply, _), do: reply end diff --git a/mix.lock b/mix.lock index de28de65..4651a56f 100644 --- a/mix.lock +++ b/mix.lock @@ -14,7 +14,7 @@ "makeup_elixir": {:hex, :makeup_elixir, "0.14.0", "cf8b7c66ad1cff4c14679698d532f0b5d45a3968ffbcbfd590339cb57742f1ae", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm"}, "metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], [], "hexpm"}, "mimerl": {:hex, :mimerl, "1.2.0", "67e2d3f571088d5cfd3e550c383094b47159f3eee8ffa08e64106cdf5e981be3", [:rebar3], [], "hexpm"}, - "nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [], [], "hexpm"}, + "nimble_parsec": {:hex, :nimble_parsec, "0.5.1", "c90796ecee0289dbb5ad16d3ad06f957b0cd1199769641c961cfe0b97db190e0", [:mix], [], "hexpm"}, "parse_trans": {:hex, :parse_trans, "3.3.0", "09765507a3c7590a784615cfd421d101aec25098d50b89d7aa1d66646bc571c1", [:rebar3], [], "hexpm"}, "ranch": {:hex, :ranch, "1.7.1", "6b1fab51b49196860b733a49c07604465a47bdb78aa10c1c16a3d199f7f8c881", [:rebar3], [], "hexpm"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.4", "f0eafff810d2041e93f915ef59899c923f4568f4585904d010387ed74988e77b", [:make, :mix, :rebar3], [], "hexpm"}, diff --git a/test/thrift/binary/framed/server_test.exs b/test/thrift/binary/framed/server_test.exs index ad4b4821..9e25ddad 100644 --- a/test/thrift/binary/framed/server_test.exs +++ b/test/thrift/binary/framed/server_test.exs @@ -182,7 +182,9 @@ defmodule Servers.Binary.Framed.IntegrationTest do thrift_test "client can reconnect when connection closed by server", ctx do {:ok, sock} = :gen_tcp.listen(0, [:binary, packet: 4, active: false]) {:ok, port} = :inet.port(sock) + test_pid = self() + # in first connection we emulate closing connection by server first_conn = Task.async(fn -> {:ok, conn} = :gen_tcp.accept(sock) @@ -192,15 +194,24 @@ defmodule Servers.Binary.Framed.IntegrationTest do name = String.to_atom("#{ctx.client_name}_1") {:ok, client} = Client.start_link("localhost", port, name: name, reconnect: true) + # in second connection we emulate reconnection to same server port second_conn = Task.async(fn -> {:ok, conn} = :gen_tcp.accept(sock) + send(test_pid, :connected) {:ok, _} = :gen_tcp.recv(conn, 0) :ok = :gen_tcp.send(conn, @ping_reply) end) - assert {:ok, true} == Client.ping(client) Task.await(first_conn) + # wait for reconnection success + :ok = + receive do + :connected -> :ok + end + + assert {:ok, true} == Client.ping(client) + Task.await(second_conn) end end diff --git a/test/thrift/generator/utils_test.exs b/test/thrift/generator/utils_test.exs index d1461bdb..e533ee69 100644 --- a/test/thrift/generator/utils_test.exs +++ b/test/thrift/generator/utils_test.exs @@ -9,15 +9,15 @@ defmodule Thrift.Generator.UtilsTest do end test "optimize_iolist" do - check <<0>>, <<0>> - check [<<0>>], <<0>> - check [<<1>>, <<2>>], <<1, 2>> - check [<<1>>, [<<2>>]], <<1, 2>> - check [[<<1>>], <<2>>], <<1, 2>> - check [[[[<<1>>]], [<<2>>]]], <<1, 2>> - check [<<1>>, x, [<<2>>, y]], [<<1>>, x, <<2>> | y] - check [x, <<1>>, [<<2>>, y]], [x, <<1, 2>> | y] - check [<<1, 2>>, <<0>>], <<1, 2, 0>> - check [<<1, 2>>, "foo"], <<1, 2, "foo">> + check(<<0>>, <<0>>) + check([<<0>>], <<0>>) + check([<<1>>, <<2>>], <<1, 2>>) + check([<<1>>, [<<2>>]], <<1, 2>>) + check([[<<1>>], <<2>>], <<1, 2>>) + check([[[[<<1>>]], [<<2>>]]], <<1, 2>>) + check([<<1>>, x, [<<2>>, y]], [<<1>>, x, <<2>> | y]) + check([x, <<1>>, [<<2>>, y]], [x, <<1, 2>> | y]) + check([<<1, 2>>, <<0>>], <<1, 2, 0>>) + check([<<1, 2>>, "foo"], <<1, 2, "foo">>) end end From 3120978ebf5afe24bacc3403f4c238f3cc02bb9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=9E=D0=B2?= =?UTF-8?q?=D1=87=D0=B8=D0=BD=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Wed, 18 Sep 2019 18:54:53 +0300 Subject: [PATCH 08/13] removed: resending data after reconnect --- lib/thrift/binary/framed/client.ex | 6 +++--- test/thrift/binary/framed/server_test.exs | 7 +++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/lib/thrift/binary/framed/client.ex b/lib/thrift/binary/framed/client.ex index 7ba078aa..0ef524c8 100644 --- a/lib/thrift/binary/framed/client.ex +++ b/lib/thrift/binary/framed/client.ex @@ -170,7 +170,7 @@ defmodule Thrift.Binary.Framed.Client do :ok = transport.close(sock) case info do - {:reconnect, _} -> + :reconnect -> {:connect, info, %{s | sock: nil}} {:close, from} -> @@ -304,11 +304,11 @@ defmodule Thrift.Binary.Framed.Client do @impl Connection def handle_info({:tcp_closed, sock}, %{reconnect: true, sock: {:gen_tcp, sock}} = s) do - {:disconnect, {:reconnect, nil}, s} + {:disconnect, :reconnect, s} end def handle_info({:ssl_closed, sock}, %{reconnect: true, sock: {:ssl, sock}} = s) do - {:disconnect, {:reconnect, nil}, s} + {:disconnect, :reconnect, s} end def handle_info(_, s) do diff --git a/test/thrift/binary/framed/server_test.exs b/test/thrift/binary/framed/server_test.exs index 9e25ddad..2a359894 100644 --- a/test/thrift/binary/framed/server_test.exs +++ b/test/thrift/binary/framed/server_test.exs @@ -184,7 +184,7 @@ defmodule Servers.Binary.Framed.IntegrationTest do {:ok, port} = :inet.port(sock) test_pid = self() - # in first connection we emulate closing connection by server + # in the first connection we emulate disconnection by server first_conn = Task.async(fn -> {:ok, conn} = :gen_tcp.accept(sock) @@ -194,7 +194,7 @@ defmodule Servers.Binary.Framed.IntegrationTest do name = String.to_atom("#{ctx.client_name}_1") {:ok, client} = Client.start_link("localhost", port, name: name, reconnect: true) - # in second connection we emulate reconnection to same server port + # in the second connection we emulate reconnection to the same server port second_conn = Task.async(fn -> {:ok, conn} = :gen_tcp.accept(sock) @@ -204,14 +204,13 @@ defmodule Servers.Binary.Framed.IntegrationTest do end) Task.await(first_conn) - # wait for reconnection success + # wait for sucessfull connection to re-opened port :ok = receive do :connected -> :ok end assert {:ok, true} == Client.ping(client) - Task.await(second_conn) end end From 2db60eb97f36162b0ab54e574183b7c6849d7377 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=9E=D0=B2?= =?UTF-8?q?=D1=87=D0=B8=D0=BD=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Tue, 15 Oct 2019 18:39:37 +0300 Subject: [PATCH 09/13] handle_info fixed --- lib/thrift/binary/framed/client.ex | 40 ++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/lib/thrift/binary/framed/client.ex b/lib/thrift/binary/framed/client.ex index 0ef524c8..369bcc48 100644 --- a/lib/thrift/binary/framed/client.ex +++ b/lib/thrift/binary/framed/client.ex @@ -303,14 +303,54 @@ defmodule Thrift.Binary.Framed.Client do end @impl Connection + def handle_info({:tcp, sock, _data}, %{reconnect: true, sock: {:gen_tcp, sock}} = s) do + {:disconnect, :reconnect, s} + end + def handle_info({:tcp_closed, sock}, %{reconnect: true, sock: {:gen_tcp, sock}} = s) do {:disconnect, :reconnect, s} end + def handle_info({:tcp_error, sock, _error}, %{reconnect: true, sock: {:gen_tcp, sock}} = s) do + {:disconnect, :reconnect, s} + end + + def handle_info({:ssl, sock, _data}, %{reconnect: true, sock: {:ssl, sock}} = s) do + {:disconnect, :reconnect, s} + end + def handle_info({:ssl_closed, sock}, %{reconnect: true, sock: {:ssl, sock}} = s) do {:disconnect, :reconnect, s} end + def handle_info({:ssl_error, sock, _error}, %{reconnect: true, sock: {:ssl, sock}} = s) do + {:disconnect, :reconnect, s} + end + + def handle_info({:tcp, sock, _data}, %{sock: {:gen_tcp, sock}} = s) do + {:disconnect, :unexpected_payload, s} + end + + def handle_info({:tcp_closed, sock}, %{sock: {:gen_tcp, sock}} = s) do + {:disconnect, :closed, s} + end + + def handle_info({:tcp_error, sock, error}, %{sock: {:gen_tcp, sock}} = s) do + {:disconnect, error, s} + end + + def handle_info({:ssl, sock, _data}, %{sock: {:ssl, sock}} = s) do + {:disconnect, :unexpected_payload, s} + end + + def handle_info({:ssl_closed, sock}, %{sock: {:ssl, sock}} = s) do + {:disconnect, :closed, s} + end + + def handle_info({:ssl_error, sock, error}, %{sock: {:ssl, sock}} = s) do + {:disconnect, error, s} + end + def handle_info(_, s) do {:noreply, s} end From 0827acf6502c2ebef7fc56ded222b6f7a2ceaa33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=9E=D0=B2?= =?UTF-8?q?=D1=87=D0=B8=D0=BD=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Tue, 15 Oct 2019 18:47:53 +0300 Subject: [PATCH 10/13] handle_info fixed --- lib/thrift/binary/framed/client.ex | 24 ------------------------ 1 file changed, 24 deletions(-) diff --git a/lib/thrift/binary/framed/client.ex b/lib/thrift/binary/framed/client.ex index 369bcc48..6ff0f816 100644 --- a/lib/thrift/binary/framed/client.ex +++ b/lib/thrift/binary/framed/client.ex @@ -327,30 +327,6 @@ defmodule Thrift.Binary.Framed.Client do {:disconnect, :reconnect, s} end - def handle_info({:tcp, sock, _data}, %{sock: {:gen_tcp, sock}} = s) do - {:disconnect, :unexpected_payload, s} - end - - def handle_info({:tcp_closed, sock}, %{sock: {:gen_tcp, sock}} = s) do - {:disconnect, :closed, s} - end - - def handle_info({:tcp_error, sock, error}, %{sock: {:gen_tcp, sock}} = s) do - {:disconnect, error, s} - end - - def handle_info({:ssl, sock, _data}, %{sock: {:ssl, sock}} = s) do - {:disconnect, :unexpected_payload, s} - end - - def handle_info({:ssl_closed, sock}, %{sock: {:ssl, sock}} = s) do - {:disconnect, :closed, s} - end - - def handle_info({:ssl_error, sock, error}, %{sock: {:ssl, sock}} = s) do - {:disconnect, error, s} - end - def handle_info(_, s) do {:noreply, s} end From 64bd3ad6232174bf5e0581baeb54e9f5d21d3caf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=9E=D0=B2?= =?UTF-8?q?=D1=87=D0=B8=D0=BD=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Tue, 15 Oct 2019 19:02:15 +0300 Subject: [PATCH 11/13] :reconnect option added to start_link/3 docs --- lib/thrift/binary/framed/client.ex | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/thrift/binary/framed/client.ex b/lib/thrift/binary/framed/client.ex index 6ff0f816..db87d6cf 100644 --- a/lib/thrift/binary/framed/client.ex +++ b/lib/thrift/binary/framed/client.ex @@ -129,6 +129,9 @@ defmodule Thrift.Binary.Framed.Client do Additionally, the options `:name`, `:debug`, and `:spawn_opt`, if specified, will be passed to the underlying `GenServer`. See `GenServer.start_link/3` for details on these options. + + The `:reconnect` option if setted to `true` forces client to reopen tcp connection whenever + it closed. """ @spec start_link(String.t(), 0..65_535, options) :: GenServer.on_start() def start_link(host, port, opts) do From 46ce001adb94c3aedcda1e272855231692b4545c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=9E=D0=B2?= =?UTF-8?q?=D1=87=D0=B8=D0=BD=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Tue, 15 Oct 2019 19:04:33 +0300 Subject: [PATCH 12/13] :reconnect option doc typo fixed --- lib/thrift/binary/framed/client.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/thrift/binary/framed/client.ex b/lib/thrift/binary/framed/client.ex index db87d6cf..38058336 100644 --- a/lib/thrift/binary/framed/client.ex +++ b/lib/thrift/binary/framed/client.ex @@ -130,7 +130,7 @@ defmodule Thrift.Binary.Framed.Client do will be passed to the underlying `GenServer`. See `GenServer.start_link/3` for details on these options. - The `:reconnect` option if setted to `true` forces client to reopen tcp connection whenever + The `:reconnect` option if set to `true` forces client to reopen tcp connection whenever it closed. """ @spec start_link(String.t(), 0..65_535, options) :: GenServer.on_start() From a1c23e3666580dfd3edeb759d7653f3ceb5ddf41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BB=D0=B5=D0=BA=D1=81=D0=B5=D0=B9=20=D0=9E=D0=B2?= =?UTF-8?q?=D1=87=D0=B8=D0=BD=D0=BD=D0=B8=D0=BA=D0=BE=D0=B2?= Date: Tue, 15 Oct 2019 19:05:59 +0300 Subject: [PATCH 13/13] :reconnect option doc typo fixed --- lib/thrift/binary/framed/client.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/thrift/binary/framed/client.ex b/lib/thrift/binary/framed/client.ex index 38058336..f761353e 100644 --- a/lib/thrift/binary/framed/client.ex +++ b/lib/thrift/binary/framed/client.ex @@ -130,7 +130,7 @@ defmodule Thrift.Binary.Framed.Client do will be passed to the underlying `GenServer`. See `GenServer.start_link/3` for details on these options. - The `:reconnect` option if set to `true` forces client to reopen tcp connection whenever + The `:reconnect` option if set to `true` forces client to reopen TCP connection whenever it closed. """ @spec start_link(String.t(), 0..65_535, options) :: GenServer.on_start()