|
1 | 1 | require 'riak/client' |
2 | 2 | require 'riak/bucket' |
| 3 | +require 'riak/multi' |
3 | 4 |
|
4 | 5 | module Riak |
5 | | - # Coordinates a parallel fetch operation for multiple values. |
6 | | - class Multiget |
7 | | - include Util::Translation |
8 | | - |
9 | | - # @return [Riak::Client] the associated client |
10 | | - attr_reader :client |
11 | | - |
12 | | - # @return [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch |
13 | | - attr_reader :fetch_list |
14 | | - |
15 | | - # @return [Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances |
16 | | - attr_accessor :result_hash |
17 | | - |
18 | | - # @return [Boolean] finished if the fetch operation has completed |
19 | | - attr_reader :finished |
20 | | - |
21 | | - # @return [Integer] The number of threads to use |
22 | | - attr_accessor :thread_count |
23 | | - |
24 | | - # Perform a Riak Multiget operation. |
25 | | - # @param [Client] client the {Riak::Client} that will perform the multiget |
26 | | - # @param [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch |
27 | | - # @return [Hash<fetch_list_entry, RObject] result_hash a {Hash} of {Bucket} and {String} key pairs to {RObject} instances |
28 | | - def self.get_all(client, fetch_list) |
29 | | - multi = new client, fetch_list |
30 | | - multi.fetch |
31 | | - multi.results |
32 | | - end |
33 | | - |
34 | | - # Create a Riak Multiget operation. |
35 | | - # @param [Client] client the {Riak::Client} that will perform the multiget |
36 | | - # @param [Array<Bucket, String>] fetch_list an {Array} of {Bucket} and {String} keys to fetch |
37 | | - def initialize(client, fetch_list) |
38 | | - raise ArgumentError, t('client_type', :client => client.inspect) unless client.is_a? Riak::Client |
39 | | - raise ArgumentError, t('array_type', :array => fetch_list.inspect) unless fetch_list.is_a? Array |
40 | | - |
41 | | - validate_fetch_list fetch_list |
42 | | - @client, @fetch_list = client, fetch_list.uniq |
43 | | - self.result_hash = Hash.new |
44 | | - @finished = false |
45 | | - self.thread_count = client.multiget_threads |
46 | | - end |
47 | | - |
48 | | - # Starts the parallelized fetch operation |
49 | | - # @raise [ArgumentError] when a non-positive-Integer count is given |
50 | | - def fetch |
51 | | - queue = fetch_list.dup |
52 | | - queue_mutex = Mutex.new |
53 | | - result_mutex = Mutex.new |
54 | | - |
55 | | - unless thread_count.is_a?(Integer) && thread_count > 0 |
56 | | - raise ArgumentError, t("invalid_multiget_thread_count") |
57 | | - end |
58 | | - |
59 | | - @threads = 1.upto(thread_count).map do |_node| |
60 | | - Thread.new do |
61 | | - loop do |
62 | | - pair = queue_mutex.synchronize do |
63 | | - queue.shift |
64 | | - end |
65 | | - |
66 | | - break if pair.nil? |
67 | | - |
68 | | - found = attempt_fetch(*pair) |
69 | | - result_mutex.synchronize do |
70 | | - result_hash[pair] = found |
71 | | - end |
72 | | - end |
73 | | - end |
74 | | - end |
75 | | - end |
76 | | - |
77 | | - def results |
78 | | - wait_for_finish |
79 | | - result_hash |
80 | | - end |
81 | | - |
82 | | - def finished? |
83 | | - set_finished_for_thread_liveness |
84 | | - finished |
85 | | - end |
86 | | - |
87 | | - def wait_for_finish |
88 | | - return if finished? |
89 | | - @threads.each {|t| t.join } |
90 | | - @finished = true |
| 6 | + # Coordinates a parallel fetch operation for multiple keys. |
| 7 | + class Multiget < Multi |
| 8 | + # @deprecated use perform |
| 9 | + class << self |
| 10 | + alias_method :get_all, :perform |
91 | 11 | end |
92 | 12 |
|
93 | 13 | private |
94 | 14 |
|
95 | | - def attempt_fetch(bucket, key) |
| 15 | + def work(bucket, key) |
96 | 16 | bucket[key] |
97 | 17 | rescue Riak::FailedRequest => e |
98 | 18 | raise e unless e.not_found? |
99 | 19 | nil |
100 | 20 | end |
101 | | - |
102 | | - def set_finished_for_thread_liveness |
103 | | - return if @finished # already done |
104 | | - |
105 | | - all_dead = @threads.none? {|t| t.alive? } |
106 | | - return unless all_dead # still working |
107 | | - |
108 | | - @finished = true |
109 | | - return |
110 | | - end |
111 | | - |
112 | | - def validate_fetch_list(fetch_list) |
113 | | - return unless erroneous = fetch_list.detect do |e| |
114 | | - bucket, key = e |
115 | | - next true unless bucket.is_a? Bucket |
116 | | - next true unless key.is_a? String |
117 | | - end |
118 | | - |
119 | | - raise ArgumentError, t('fetch_list_type', :problem => erroneous) |
120 | | - end |
121 | 21 | end |
122 | 22 | end |
0 commit comments