1919import java .lang .invoke .MethodHandles ;
2020import java .util .ArrayList ;
2121import java .util .Collections ;
22- import java .util .EnumSet ;
2322import java .util .HashSet ;
2423import java .util .List ;
2524import java .util .Objects ;
3332import org .apache .activemq .artemis .api .core .ActiveMQAddressDoesNotExistException ;
3433import org .apache .activemq .artemis .api .core .ActiveMQBuffer ;
3534import org .apache .activemq .artemis .api .core .ActiveMQException ;
36- import org .apache .activemq .artemis .api .core .ActiveMQQueueExistsException ;
3735import org .apache .activemq .artemis .api .core .ActiveMQSecurityException ;
36+ import org .apache .activemq .artemis .api .core .AutoCreateResult ;
3837import org .apache .activemq .artemis .api .core .ICoreMessage ;
3938import org .apache .activemq .artemis .api .core .QueueConfiguration ;
4039import org .apache .activemq .artemis .api .core .RoutingType ;
41- import org .apache .activemq .artemis .api .core .SimpleString ;
4240import org .apache .activemq .artemis .api .core .client .ActiveMQClient ;
4341import org .apache .activemq .artemis .core .message .impl .CoreMessage ;
4442import org .apache .activemq .artemis .core .protocol .stomp .v10 .StompFrameHandlerV10 ;
4745import org .apache .activemq .artemis .core .remoting .impl .netty .TransportConstants ;
4846import org .apache .activemq .artemis .core .server .ActiveMQServerLogger ;
4947import org .apache .activemq .artemis .core .server .ServerConsumer ;
50- import org .apache .activemq .artemis .core .server .ServerSession ;
51- import org .apache .activemq .artemis .core .server .impl .AddressInfo ;
52- import org .apache .activemq .artemis .core .settings .impl .AddressSettings ;
5348import org .apache .activemq .artemis .selector .filter .FilterException ;
5449import org .apache .activemq .artemis .selector .impl .SelectorParser ;
5550import org .apache .activemq .artemis .spi .core .protocol .AbstractRemotingConnection ;
5651import org .apache .activemq .artemis .spi .core .remoting .Acceptor ;
5752import org .apache .activemq .artemis .spi .core .remoting .Connection ;
5853import org .apache .activemq .artemis .spi .core .remoting .ReadyListener ;
59- import org .apache .activemq .artemis .utils .CompositeAddress ;
6054import org .apache .activemq .artemis .utils .ConfigurationHelper ;
6155import org .apache .activemq .artemis .utils .ExecutorFactory ;
6256import org .apache .activemq .artemis .utils .VersionLoader ;
@@ -166,63 +160,6 @@ public boolean hasBytes() {
166160 this .minLargeMessageSize = ConfigurationHelper .getIntProperty (TransportConstants .STOMP_MIN_LARGE_MESSAGE_SIZE , ConfigurationHelper .getIntProperty (TransportConstants .STOMP_MIN_LARGE_MESSAGE_SIZE_DEPRECATED , ActiveMQClient .DEFAULT_MIN_LARGE_MESSAGE_SIZE , acceptorUsed .getConfiguration ()), acceptorUsed .getConfiguration ());
167161 }
168162
169- // TODO this should take a type - send or receive so it knows whether to check the address or the queue
170- public void checkDestination (String destination ) throws ActiveMQStompException {
171- if (!manager .destinationExists (destination )) {
172- throw BUNDLE .destinationNotExist (destination ).setHandler (frameHandler );
173- }
174- }
175-
176- public void autoCreateDestinationIfPossible (String destination , RoutingType routingType ) throws ActiveMQStompException {
177- try {
178- SimpleString simpleDestination = SimpleString .of (destination );
179- AddressInfo addressInfo = manager .getServer ().getAddressInfo (simpleDestination );
180- AddressSettings addressSettings = manager .getServer ().getAddressSettingsRepository ().getMatch (destination );
181- RoutingType effectiveAddressRoutingType = Objects .requireNonNullElse (routingType , addressSettings .getDefaultAddressRoutingType ());
182- ServerSession session = getSession ().getCoreSession ();
183- /*
184- * If the address doesn't exist then it is created if possible.
185- * If the address does exist but doesn't support the routing-type then the address is updated if possible.
186- */
187- if (addressInfo == null ) {
188- if (addressSettings .isAutoCreateAddresses ()) {
189- session .createAddress (simpleDestination , effectiveAddressRoutingType , true );
190- }
191- } else if (!addressInfo .getRoutingTypes ().contains (effectiveAddressRoutingType )) {
192- if (addressSettings .isAutoCreateAddresses ()) {
193- EnumSet <RoutingType > routingTypes = EnumSet .noneOf (RoutingType .class );
194- for (RoutingType existingRoutingType : addressInfo .getRoutingTypes ()) {
195- routingTypes .add (existingRoutingType );
196- }
197- routingTypes .add (effectiveAddressRoutingType );
198- manager .getServer ().updateAddressInfo (simpleDestination , routingTypes );
199- }
200- }
201-
202- // auto create the queue if the address is ANYCAST or FQQN
203- if ((CompositeAddress .isFullyQualified (destination ) || effectiveAddressRoutingType == RoutingType .ANYCAST ) && addressSettings .isAutoCreateQueues () && manager .getServer ().locateQueue (simpleDestination ) == null ) {
204- session .createQueue (QueueConfiguration .of (destination ).setRoutingType (effectiveAddressRoutingType ).setAutoCreated (true ));
205- }
206- } catch (ActiveMQQueueExistsException e ) {
207- // ignore
208- } catch (Exception e ) {
209- logger .debug ("Exception while auto-creating destination" , e );
210- throw new ActiveMQStompException (e .getMessage (), e ).setHandler (frameHandler );
211- }
212- }
213-
214- public void checkRoutingSemantics (String destination , RoutingType routingType ) throws ActiveMQStompException {
215- AddressInfo addressInfo = manager .getServer ().getAddressInfo (SimpleString .of (destination ));
216-
217- // may be null here if, for example, the management address is being checked
218- if (addressInfo != null ) {
219- Set <RoutingType > actualDeliveryModesOfAddress = addressInfo .getRoutingTypes ();
220- if (routingType != null && !actualDeliveryModesOfAddress .contains (routingType )) {
221- throw BUNDLE .illegalSemantics (routingType .toString (), actualDeliveryModesOfAddress .toString ());
222- }
223- }
224- }
225-
226163 @ Override
227164 public void destroy () {
228165 if (DESTROYED_UPDATER .compareAndSet (this , 0 , 1 )) {
@@ -551,9 +488,48 @@ StompPostReceiptFunction subscribe(String destination,
551488 RoutingType subscriptionType ,
552489 Integer consumerWindowSize ) throws ActiveMQStompException {
553490 validateSelector (selector );
554- autoCreateDestinationIfPossible (destination , subscriptionType );
555- checkDestination (destination );
556- checkRoutingSemantics (destination , subscriptionType );
491+ checkAutoCreate (destination , subscriptionType );
492+ String subscriptionID = getSubscriptionID (destination , id );
493+
494+ try {
495+ return manager .subscribe (this ,
496+ subscriptionID ,
497+ durableSubscriptionName ,
498+ destination ,
499+ getSelector (selector , noLocal ),
500+ Objects .requireNonNullElse (ack , Stomp .Headers .Subscribe .AckModeValues .AUTO ),
501+ noLocal ,
502+ consumerWindowSize );
503+ } catch (ActiveMQStompException e ) {
504+ throw e ;
505+ } catch (Exception e ) {
506+ throw BUNDLE .errorCreatingSubscription (subscriptionID , e ).setHandler (frameHandler );
507+ }
508+ }
509+
510+ protected void checkAutoCreate (String destination , RoutingType subscriptionType ) throws ActiveMQStompException {
511+ AutoCreateResult autoCreateResult ;
512+ try {
513+ RoutingType routingType = getSubscriptionRoutingType (destination , subscriptionType );
514+ autoCreateResult = getSession ().getCoreSession ().checkAutoCreate (QueueConfiguration .of (destination ).setRoutingType (routingType ));
515+ } catch (Exception e ) {
516+ logger .debug ("Exception while auto-creating destination" , e );
517+ throw new ActiveMQStompException (e .getMessage (), e ).setHandler (frameHandler );
518+ }
519+ if (autoCreateResult == AutoCreateResult .NOT_FOUND ) {
520+ throw BUNDLE .destinationNotExist (destination ).setHandler (frameHandler );
521+ }
522+ }
523+
524+ private RoutingType getSubscriptionRoutingType (String destination , RoutingType subscriptionType ) {
525+ if (subscriptionType == null ) {
526+ return getManager ().getServer ().getAddressSettingsRepository ().getMatch (destination ).getDefaultAddressRoutingType ();
527+ } else {
528+ return subscriptionType ;
529+ }
530+ }
531+
532+ private String getSelector (String selector , boolean noLocal ) {
557533 if (noLocal ) {
558534 String noLocalFilter = "(" + CONNECTION_ID_PROPERTY_NAME_STRING + " <> '" + getID ().toString () + "' OR " + CONNECTION_ID_PROPERTY_NAME_STRING + " IS NULL)" ;
559535 if (selector == null ) {
@@ -562,11 +538,10 @@ StompPostReceiptFunction subscribe(String destination,
562538 selector = "(" + selector + ") AND " + noLocalFilter ;
563539 }
564540 }
541+ return selector ;
542+ }
565543
566- if (ack == null ) {
567- ack = Stomp .Headers .Subscribe .AckModeValues .AUTO ;
568- }
569-
544+ private String getSubscriptionID (String destination , String id ) throws ActiveMQStompException {
570545 String subscriptionID = null ;
571546 if (id != null ) {
572547 subscriptionID = id ;
@@ -576,14 +551,7 @@ StompPostReceiptFunction subscribe(String destination,
576551 }
577552 subscriptionID = "subscription/" + destination ;
578553 }
579-
580- try {
581- return manager .subscribe (this , subscriptionID , durableSubscriptionName , destination , selector , ack , noLocal , consumerWindowSize );
582- } catch (ActiveMQStompException e ) {
583- throw e ;
584- } catch (Exception e ) {
585- throw BUNDLE .errorCreatingSubscription (subscriptionID , e ).setHandler (frameHandler );
586- }
554+ return subscriptionID ;
587555 }
588556
589557 private void validateSelector (String selector ) throws ActiveMQStompException {
0 commit comments