Skip to content

Commit 07c5826

Browse files
committed
stash eventbus changes
1 parent 74745ee commit 07c5826

11 files changed

Lines changed: 1062 additions & 0 deletions

File tree

iotdb-core/confignode/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@
116116
<groupId>commons-io</groupId>
117117
<artifactId>commons-io</artifactId>
118118
</dependency>
119+
<dependency>
120+
<groupId>com.github.ben-manes.caffeine</groupId>
121+
<artifactId>caffeine</artifactId>
122+
</dependency>
119123
<dependency>
120124
<groupId>com.timecho.ratis</groupId>
121125
<artifactId>ratis-common</artifactId>
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package com.google.common.base;
21+
22+
import org.checkerframework.checker.nullness.qual.Nullable;
23+
24+
import javax.annotation.CheckForNull;
25+
26+
import java.util.Arrays;
27+
28+
/**
29+
* Helper functions that can operate on any {@code Object}.
30+
*
31+
* <p>See the Guava User Guide on <a
32+
* href="https://github.com/google/guava/wiki/CommonObjectUtilitiesExplained">writing {@code Object}
33+
* methods with {@code Objects}</a>.
34+
*
35+
* @author Laurence Gonsalves
36+
* @since 2.0
37+
*/
38+
@ElementTypesAreNonnullByDefault
39+
public final class Objects extends ExtraObjectsMethodsForWeb {
40+
private Objects() {}
41+
42+
/**
43+
* Determines whether two possibly-null objects are equal. Returns:
44+
*
45+
* <ul>
46+
* <li>{@code true} if {@code a} and {@code b} are both null.
47+
* <li>{@code true} if {@code a} and {@code b} are both non-null and they are equal according to
48+
* {@link Object#equals(Object)}.
49+
* <li>{@code false} in all other situations.
50+
* </ul>
51+
*
52+
* <p>This assumes that any non-null objects passed to this function conform to the {@code
53+
* equals()} contract.
54+
*
55+
* <p><b>Note for Java 7 and later:</b> This method should be treated as deprecated; use {@link
56+
* java.util.Objects#equals} instead.
57+
*/
58+
public static boolean equal(@CheckForNull Object a, @CheckForNull Object b) {
59+
return a == b || (a != null && a.equals(b));
60+
}
61+
62+
/**
63+
* Generates a hash code for multiple values. The hash code is generated by calling {@link
64+
* Arrays#hashCode(Object[])}. Note that array arguments to this method, with the exception of a
65+
* single Object array, do not get any special handling; their hash codes are based on identity
66+
* and not contents.
67+
*
68+
* <p>This is useful for implementing {@link Object#hashCode()}. For example, in an object that
69+
* has three properties, {@code x}, {@code y}, and {@code z}, one could write:
70+
*
71+
* <pre>{@code
72+
* public int hashCode() {
73+
* return Objects.hashCode(getX(), getY(), getZ());
74+
* }
75+
* }</pre>
76+
*
77+
* <p><b>Warning:</b> When a single object is supplied, the returned hash code does not equal the
78+
* hash code of that object.
79+
*
80+
* <p><b>Note for Java 7 and later:</b> This method should be treated as deprecated; use {@link
81+
* java.util.Objects#hash} instead.
82+
*/
83+
public static int hashCode(@CheckForNull @Nullable Object... objects) {
84+
return Arrays.hashCode(objects);
85+
}
86+
}
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package com.google.common.eventbus;
21+
22+
import static com.google.common.base.Preconditions.checkNotNull;
23+
24+
/**
25+
* Wraps an event that was posted, but which had no subscribers and thus could not be delivered.
26+
*
27+
* <p>Registering a DeadEvent subscriber is useful for debugging or logging, as it can detect
28+
* misconfigurations in a system's event distribution.
29+
*
30+
* @author Cliff Biffle
31+
* @since 10.0
32+
*/
33+
@ElementTypesAreNonnullByDefault
34+
public class DeadEvent {
35+
36+
private final Object source;
37+
private final Object event;
38+
39+
/**
40+
* Creates a new DeadEvent.
41+
*
42+
* @param source object broadcasting the DeadEvent (generally the {@link EventBus}).
43+
* @param event the event that could not be delivered.
44+
*/
45+
public DeadEvent(Object source, Object event) {
46+
this.source = checkNotNull(source);
47+
this.event = checkNotNull(event);
48+
}
49+
50+
/**
51+
* Returns the object that originated this event (<em>not</em> the object that originated the
52+
* wrapped event). This is generally an {@link EventBus}.
53+
*
54+
* @return the source of this event.
55+
*/
56+
public Object getSource() {
57+
return source;
58+
}
59+
60+
/**
61+
* Returns the wrapped, 'dead' event, which the system was unable to deliver to any registered
62+
* subscriber.
63+
*
64+
* @return the 'dead' event that could not be delivered.
65+
*/
66+
public Object getEvent() {
67+
return event;
68+
}
69+
70+
@Override
71+
public String toString() {
72+
return "DeadEvent{" + "source=" + source + ", event=" + event + '}';
73+
}
74+
}
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package com.google.common.eventbus;
21+
22+
import com.google.common.collect.Queues;
23+
24+
import java.util.Iterator;
25+
import java.util.Queue;
26+
import java.util.concurrent.ConcurrentLinkedQueue;
27+
28+
import static com.google.common.base.Preconditions.checkNotNull;
29+
30+
/**
31+
* Handler for dispatching events to subscribers, providing different event ordering guarantees that
32+
* make sense for different situations.
33+
*
34+
* <p><b>Note:</b> The dispatcher is orthogonal to the subscriber's {@code Executor}. The dispatcher
35+
* controls the order in which events are dispatched, while the executor controls how (i.e. on which
36+
* thread) the subscriber is actually called when an event is dispatched to it.
37+
*
38+
* @author Colin Decker
39+
*/
40+
@ElementTypesAreNonnullByDefault
41+
abstract class Dispatcher {
42+
43+
/**
44+
* Returns a dispatcher that queues events that are posted reentrantly on a thread that is already
45+
* dispatching an event, guaranteeing that all events posted on a single thread are dispatched to
46+
* all subscribers in the order they are posted.
47+
*
48+
* <p>When all subscribers are dispatched to using a <i>direct</i> executor (which dispatches on
49+
* the same thread that posts the event), this yields a breadth-first dispatch order on each
50+
* thread. That is, all subscribers to a single event A will be called before any subscribers to
51+
* any events B and C that are posted to the event bus by the subscribers to A.
52+
*/
53+
static Dispatcher perThreadDispatchQueue() {
54+
return new PerThreadQueuedDispatcher();
55+
}
56+
57+
/**
58+
* Returns a dispatcher that queues events that are posted in a single global queue. This behavior
59+
* matches the original behavior of AsyncEventBus exactly, but is otherwise not especially useful.
60+
* For async dispatch, an {@linkplain #immediate() immediate} dispatcher should generally be
61+
* preferable.
62+
*/
63+
static Dispatcher legacyAsync() {
64+
return new LegacyAsyncDispatcher();
65+
}
66+
67+
/**
68+
* Returns a dispatcher that dispatches events to subscribers immediately as they're posted
69+
* without using an intermediate queue to change the dispatch order. This is effectively a
70+
* depth-first dispatch order, vs. breadth-first when using a queue.
71+
*/
72+
static Dispatcher immediate() {
73+
return ImmediateDispatcher.INSTANCE;
74+
}
75+
76+
/** Dispatches the given {@code event} to the given {@code subscribers}. */
77+
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
78+
79+
/** Implementation of a {@link #perThreadDispatchQueue()} dispatcher. */
80+
private static final class PerThreadQueuedDispatcher extends Dispatcher {
81+
82+
// This dispatcher matches the original dispatch behavior of EventBus.
83+
84+
/** Per-thread queue of events to dispatch. */
85+
private final ThreadLocal<Queue<Event>> queue =
86+
new ThreadLocal<Queue<Event>>() {
87+
@Override
88+
protected Queue<Event> initialValue() {
89+
return Queues.newArrayDeque();
90+
}
91+
};
92+
93+
/** Per-thread dispatch state, used to avoid reentrant event dispatching. */
94+
private final ThreadLocal<Boolean> dispatching =
95+
new ThreadLocal<Boolean>() {
96+
@Override
97+
protected Boolean initialValue() {
98+
return false;
99+
}
100+
};
101+
102+
@Override
103+
void dispatch(Object event, Iterator<Subscriber> subscribers) {
104+
checkNotNull(event);
105+
checkNotNull(subscribers);
106+
Queue<Event> queueForThread = queue.get();
107+
queueForThread.offer(new Event(event, subscribers));
108+
109+
if (!dispatching.get()) {
110+
dispatching.set(true);
111+
try {
112+
Event nextEvent;
113+
while ((nextEvent = queueForThread.poll()) != null) {
114+
while (nextEvent.subscribers.hasNext()) {
115+
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
116+
}
117+
}
118+
} finally {
119+
dispatching.remove();
120+
queue.remove();
121+
}
122+
}
123+
}
124+
125+
private static final class Event {
126+
private final Object event;
127+
private final Iterator<Subscriber> subscribers;
128+
129+
private Event(Object event, Iterator<Subscriber> subscribers) {
130+
this.event = event;
131+
this.subscribers = subscribers;
132+
}
133+
}
134+
}
135+
136+
/** Implementation of a {@link #legacyAsync()} dispatcher. */
137+
private static final class LegacyAsyncDispatcher extends Dispatcher {
138+
139+
// This dispatcher matches the original dispatch behavior of AsyncEventBus.
140+
//
141+
// We can't really make any guarantees about the overall dispatch order for this dispatcher in
142+
// a multithreaded environment for a couple of reasons:
143+
//
144+
// 1. Subscribers to events posted on different threads can be interleaved with each other
145+
// freely. (A event on one thread, B event on another could yield any of
146+
// [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.)
147+
// 2. It's possible for subscribers to actually be dispatched to in a different order than they
148+
// were added to the queue. It's easily possible for one thread to take the head of the
149+
// queue, immediately followed by another thread taking the next element in the queue. That
150+
// second thread can then dispatch to the subscriber it took before the first thread does.
151+
//
152+
// All this makes me really wonder if there's any value in queueing here at all. A dispatcher
153+
// that simply loops through the subscribers and dispatches the event to each would actually
154+
// probably provide a stronger order guarantee, though that order would obviously be different
155+
// in some cases.
156+
157+
/** Global event queue. */
158+
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
159+
Queues.newConcurrentLinkedQueue();
160+
161+
@Override
162+
void dispatch(Object event, Iterator<Subscriber> subscribers) {
163+
checkNotNull(event);
164+
while (subscribers.hasNext()) {
165+
queue.add(new EventWithSubscriber(event, subscribers.next()));
166+
}
167+
168+
EventWithSubscriber e;
169+
while ((e = queue.poll()) != null) {
170+
e.subscriber.dispatchEvent(e.event);
171+
}
172+
}
173+
174+
private static final class EventWithSubscriber {
175+
private final Object event;
176+
private final Subscriber subscriber;
177+
178+
private EventWithSubscriber(Object event, Subscriber subscriber) {
179+
this.event = event;
180+
this.subscriber = subscriber;
181+
}
182+
}
183+
}
184+
185+
/** Implementation of {@link #immediate()}. */
186+
private static final class ImmediateDispatcher extends Dispatcher {
187+
private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher();
188+
189+
@Override
190+
void dispatch(Object event, Iterator<Subscriber> subscribers) {
191+
checkNotNull(event);
192+
while (subscribers.hasNext()) {
193+
subscribers.next().dispatchEvent(event);
194+
}
195+
}
196+
}
197+
}

0 commit comments

Comments
 (0)