Skip to content
Draft
20 changes: 9 additions & 11 deletions libraries/app/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
#include <fc/crypto/base64.hpp>
#include <fc/crypto/hex.hpp>
#include <fc/rpc/api_connection.hpp>
#include <fc/thread/future.hpp>
#include <fc/thread/async.hpp>

#include <boost/fiber/future.hpp>

template class fc::api<graphene::app::block_api>;
template class fc::api<graphene::app::network_broadcast_api>;
Expand Down Expand Up @@ -187,12 +189,13 @@ namespace graphene { namespace app {

fc::variant network_broadcast_api::broadcast_transaction_synchronous(const precomputable_transaction& trx)
{
fc::promise<fc::variant>::ptr prom = fc::promise<fc::variant>::create();
broadcast_transaction_with_callback( [prom]( const fc::variant& v ){
prom->set_value(v);
boost::fibers::promise<fc::variant> prom;
boost::fibers::future<fc::variant> result = prom.get_future();
broadcast_transaction_with_callback( [&prom]( const fc::variant& v ) {
prom.set_value(v);
}, trx );

return fc::future<fc::variant>(prom).wait();
return result.get();
}

void network_broadcast_api::broadcast_block( const signed_block& b )
Expand Down Expand Up @@ -356,12 +359,7 @@ namespace graphene { namespace app {
if(_app.is_plugin_enabled("elasticsearch")) {
auto es = _app.get_plugin<elasticsearch::elasticsearch_plugin>("elasticsearch");
if(es.get()->get_running_mode() != elasticsearch::mode::only_save) {
if(!_app.elasticsearch_thread)
_app.elasticsearch_thread= std::make_shared<fc::thread>("elasticsearch");

return _app.elasticsearch_thread->async([&es, &account, &stop, &limit, &start]() {
return es->get_account_history(account, stop, limit, start);
}, "thread invoke for method " BOOST_PP_STRINGIZE(method_name)).wait();
return es->get_account_history(account, stop, limit, start);
}
}

Expand Down
1 change: 1 addition & 0 deletions libraries/app/database_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

#include <fc/crypto/hex.hpp>
#include <fc/rpc/api_connection.hpp>
#include <fc/thread/async.hpp>

#include <boost/range/iterator_range.hpp>

Expand Down
2 changes: 0 additions & 2 deletions libraries/app/include/graphene/app/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ namespace graphene { namespace app {

bool is_plugin_enabled(const string& name) const;

std::shared_ptr<fc::thread> elasticsearch_thread;

private:
void add_available_plugin( std::shared_ptr<abstract_plugin> p );
std::shared_ptr<detail::application_impl> my;
Expand Down
15 changes: 10 additions & 5 deletions libraries/chain/db_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include <graphene/protocol/fee_schedule.hpp>

#include <fc/asio.hpp>
#include <fc/io/raw.hpp>
#include <fc/thread/parallel.hpp>

Expand Down Expand Up @@ -822,9 +823,9 @@ void database::_precompute_parallel( const Trx* trx, const size_t count, const u
}
}

fc::future<void> database::precompute_parallel( const signed_block& block, const uint32_t skip )const
boost::fibers::future<void> database::precompute_parallel( const signed_block& block, const uint32_t skip )const
{ try {
std::vector<fc::future<void>> workers;
std::vector<boost::fibers::future<void>> workers;
if( !block.transactions.empty() )
{
if( (skip & skip_expensive) == skip_expensive )
Expand All @@ -850,16 +851,20 @@ fc::future<void> database::precompute_parallel( const signed_block& block, const
block.id();

if( workers.empty() )
return fc::future< void >( fc::promise< void >::create( true ) );
{
boost::fibers::promise< void > done;
done.set_value();
return done.get_future();
}

auto first = workers.begin();
auto worker = first;
while( ++worker != workers.end() )
worker->wait();
return *first;
return std::move( *first );
} FC_LOG_AND_RETHROW() }

fc::future<void> database::precompute_parallel( const precomputable_transaction& trx )const
boost::fibers::future<void> database::precompute_parallel( const precomputable_transaction& trx )const
{
return fc::do_parallel([this,&trx] () {
_precompute_parallel( &trx, 1, skip_nothing );
Expand Down
4 changes: 2 additions & 2 deletions libraries/chain/db_management.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ void database::reindex( fc::path data_dir )

size_t total_block_size = _block_id_to_block.total_block_size();
const auto& gpo = get_global_properties();
std::queue< std::tuple< size_t, signed_block, fc::future< void > > > blocks;
std::queue< std::tuple< size_t, signed_block, boost::fibers::future< void > > > blocks;
uint32_t next_block_num = head_block_num() + 1;
uint32_t i = next_block_num;
while( next_block_num <= last_block_num || !blocks.empty() )
Expand All @@ -93,7 +93,7 @@ void database::reindex( fc::path data_dir )
{
if( block->timestamp >= last_block->timestamp - gpo.parameters.maximum_time_until_expiration )
skip &= ~skip_transaction_dupe_check;
blocks.emplace( processed_block_size, std::move(*block), fc::future<void>() );
blocks.emplace( processed_block_size, std::move(*block), boost::fibers::future<void>() );
std::get<2>(blocks.back()) = precompute_parallel( std::get<1>(blocks.back()), skip );
}
else
Expand Down
9 changes: 6 additions & 3 deletions libraries/chain/include/graphene/chain/database.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,12 @@
#include <graphene/db/object_database.hpp>
#include <graphene/db/object.hpp>
#include <graphene/db/simple_index.hpp>
#include <fc/signals.hpp>

#include <fc/signals.hpp>
#include <fc/log/logger.hpp>

#include <boost/fiber/future.hpp>

#include <map>

namespace graphene { namespace chain {
Expand Down Expand Up @@ -453,7 +455,8 @@ namespace graphene { namespace chain {
* @return a future that will resolve to the input block with
* precomputations applied
*/
fc::future<void> precompute_parallel( const signed_block& block, const uint32_t skip = skip_nothing )const;
boost::fibers::future<void> precompute_parallel( const signed_block& block,
const uint32_t skip = skip_nothing )const;

/** Precomputes digests, signatures and operation validations.
* "Expensive" computations may be done in a parallel thread.
Expand All @@ -462,7 +465,7 @@ namespace graphene { namespace chain {
* @return a future that will resolve to the input transaction with
* precomputations applied
*/
fc::future<void> precompute_parallel( const precomputable_transaction& trx )const;
boost::fibers::future<void> precompute_parallel( const precomputable_transaction& trx )const;
private:
template<typename Trx>
void _precompute_parallel( const Trx* trx, const size_t count, const uint32_t skip )const;
Expand Down
4 changes: 2 additions & 2 deletions libraries/db/object_database.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void object_database::flush()
{
// ilog("Save object_database in ${d}", ("d", _data_dir));
fc::create_directories( _data_dir / "object_database.tmp" / "lock" );
std::vector<fc::future<void>> tasks;
std::vector<boost::fibers::future<void>> tasks;
tasks.reserve(200);
for( uint32_t space = 0; space < _index.size(); ++space )
{
Expand Down Expand Up @@ -109,7 +109,7 @@ void object_database::open(const fc::path& data_dir)
wlog("Ignoring locked object_database");
return;
}
std::vector<fc::future<void>> tasks;
std::vector<boost::fibers::future<void>> tasks;
tasks.reserve(200);
ilog("Opening object database from ${d} ...", ("d", data_dir));
for( uint32_t space = 0; space < _index.size(); ++space )
Expand Down
2 changes: 1 addition & 1 deletion libraries/fc
Submodule fc updated 64 files
+6 −9 CMakeLists.txt
+4 −1 include/fc/api.hpp
+65 −177 include/fc/asio.hpp
+0 −1 include/fc/io/stdio.hpp
+0 −10 include/fc/network/http/websocket.hpp
+1 −5 include/fc/network/tcp_socket.hpp
+1 −0 include/fc/rpc/api_connection.hpp
+9 −11 include/fc/rpc/cli.hpp
+11 −8 include/fc/rpc/state.hpp
+0 −15 include/fc/signals.hpp
+74 −0 include/fc/thread/async.hpp
+100 −0 include/fc/thread/fibers.hpp
+0 −344 include/fc/thread/future.hpp
+0 −109 include/fc/thread/mutex.hpp
+0 −35 include/fc/thread/non_preemptable_scope_check.hpp
+20 −17 include/fc/thread/parallel.hpp
+0 −22 include/fc/thread/priority.hpp
+0 −11 include/fc/thread/scoped_lock.hpp
+0 −35 include/fc/thread/spin_lock.hpp
+0 −35 include/fc/thread/spin_yield_lock.hpp
+0 −171 include/fc/thread/task.hpp
+0 −275 include/fc/thread/thread.hpp
+0 −84 include/fc/thread/thread_specific.hpp
+0 −42 include/fc/thread/unique_lock.hpp
+26 −89 src/asio.cpp
+12 −23 src/crypto/aes.cpp
+3 −160 src/io/iostream.cpp
+167 −0 src/io/stdio.cpp
+5 −7 src/log/appender.cpp
+4 −6 src/log/console_appender.cpp
+53 −45 src/log/file_appender.cpp
+1 −1 src/log/gelf_appender.cpp
+3 −6 src/log/log_message.cpp
+7 −8 src/log/logger.cpp
+173 −164 src/network/http/websocket.cpp
+120 −83 src/network/rate_limiting.cpp
+34 −26 src/network/tcp_socket.cpp
+37 −22 src/network/udp_socket.cpp
+59 −47 src/rpc/cli.cpp
+10 −16 src/rpc/state.cpp
+2 −2 src/rpc/websocket_api.cpp
+0 −256 src/thread/context.hpp
+214 −0 src/thread/fibers.cpp
+0 −140 src/thread/future.cpp
+0 −215 src/thread/mutex.cpp
+0 −20 src/thread/non_preemptable_scope_check.cpp
+121 −106 src/thread/parallel.cpp
+0 −46 src/thread/spin_lock.cpp
+0 −51 src/thread/spin_yield_lock.cpp
+0 −113 src/thread/task.cpp
+0 −530 src/thread/thread.cpp
+0 −835 src/thread/thread_d.hpp
+0 −65 src/thread/thread_specific.cpp
+0 −12 tests/CMakeLists.txt
+31 −18 tests/api_tests.cpp
+3 −5 tests/logging_tests.cpp
+41 −12 tests/network/http/websocket_test.cpp
+7 −5 tests/stacktrace_test.cpp
+108 −82 tests/thread/parallel_tests.cpp
+0 −249 tests/thread/task_cancel.cpp
+22 −27 tests/thread/thread_tests.cpp
+54 −0 tests/thread/worker_thread.hxx
+328 −0 vendor/boost/fiber/asio/detail/yield.hpp
+63 −0 vendor/boost/fiber/asio/yield.hpp
2 changes: 1 addition & 1 deletion libraries/net/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ set(SOURCES node.cpp
add_library( graphene_net ${SOURCES} ${HEADERS} )

target_link_libraries( graphene_net
PUBLIC fc graphene_db graphene_protocol )
PUBLIC fc graphene_db graphene_protocol graphene_utilities )
target_include_directories( graphene_net
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/include"
PRIVATE "${CMAKE_SOURCE_DIR}/libraries/chain/include"
Expand Down
28 changes: 2 additions & 26 deletions libraries/net/include/graphene/net/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <graphene/protocol/types.hpp>

#include <list>
#include <thread>

namespace graphene { namespace net {

Expand Down Expand Up @@ -293,35 +294,10 @@ namespace graphene { namespace net {
void disable_peer_advertising();
fc::variant_object get_call_statistics() const;
private:
std::unique_ptr<detail::node_impl, detail::node_impl_deleter> my;
std::unique_ptr<detail::node_impl> my;
};

class simulated_network : public node
{
public:
~simulated_network();
simulated_network(const std::string& user_agent) : node(user_agent) {}
void listen_to_p2p_network() override {}
void connect_to_p2p_network() override {}
void connect_to_endpoint(const fc::ip::endpoint& ep) override {}

fc::ip::endpoint get_actual_listening_endpoint() const override { return fc::ip::endpoint(); }

void sync_from(const item_id& current_head_block, const std::vector<uint32_t>& hard_fork_block_numbers) override {}
void broadcast(const message& item_to_broadcast) override;
void add_node_delegate(node_delegate* node_delegate_to_add);

virtual uint32_t get_connection_count() const override { return 8; }
private:
struct node_info;
void message_sender(node_info* destination_node);
std::list<node_info*> network_nodes;
};


typedef std::shared_ptr<node> node_ptr;
typedef std::shared_ptr<simulated_network> simulated_network_ptr;

} } // graphene::net

FC_REFLECT(graphene::net::message_propagation_data, (received_time)(validated_time)(originating_peer));
Expand Down
29 changes: 21 additions & 8 deletions libraries/net/include/graphene/net/peer_connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <graphene/net/peer_database.hpp>
#include <graphene/net/message_oriented_connection.hpp>
#include <graphene/net/config.hpp>
#include <graphene/utilities/recurring_task.hpp>

#include <boost/tuple/tuple.hpp>

Expand All @@ -39,10 +40,25 @@

#include <queue>
#include <boost/container/deque.hpp>
#include <fc/thread/future.hpp>

namespace graphene { namespace net
{
namespace graphene { namespace net {
class peer_connection;

namespace detail {

class send_queued_messages_task : public graphene::utilities::recurring_task {
public:
send_queued_messages_task() : _conn(nullptr) {}
explicit send_queued_messages_task( peer_connection& conn ) : _conn(&conn) {}

protected:
virtual void run();

peer_connection* _conn;
};

} // detail

struct firewall_check_state_data
{
node_id_t expected_node_id;
Expand All @@ -58,7 +74,6 @@ namespace graphene { namespace net
node_id_t requesting_peer;
};

class peer_connection;
class peer_connection_delegate
{
public:
Expand All @@ -69,7 +84,6 @@ namespace graphene { namespace net
virtual message get_message_for_item(const item_id& item) = 0;
};

class peer_connection;
typedef std::shared_ptr<peer_connection> peer_connection_ptr;
class peer_connection : public message_oriented_connection_delegate,
public std::enable_shared_from_this<peer_connection>
Expand Down Expand Up @@ -166,7 +180,7 @@ namespace graphene { namespace net

size_t _total_queued_messages_size = 0;
std::queue<std::unique_ptr<queued_message>, std::list<std::unique_ptr<queued_message> > > _queued_messages;
fc::future<void> _send_queued_messages_done;
detail::send_queued_messages_task _send_queued_messages;
public:
fc::time_point connection_initiation_time;
fc::time_point connection_closed_time;
Expand Down Expand Up @@ -260,8 +274,6 @@ namespace graphene { namespace net

uint32_t last_known_fork_block_number = 0;

fc::future<void> accept_or_connect_task_done;

firewall_check_state_data *firewall_check_state = nullptr;
private:
#ifndef NDEBUG
Expand Down Expand Up @@ -313,6 +325,7 @@ namespace graphene { namespace net
void send_queued_messages_task();
void accept_connection_task();
void connect_to_task(const fc::ip::endpoint& remote_endpoint);
friend class detail::send_queued_messages_task;
};
typedef std::shared_ptr<peer_connection> peer_connection_ptr;

Expand Down
Loading