@@ -22,8 +22,6 @@ use dust_dds::{
2222 } ,
2323 listener:: NO_LISTENER ,
2424 publication:: data_writer:: DataWriter ,
25- runtime:: DdsRuntime ,
26- std_runtime:: StdRuntime ,
2725 subscription:: data_reader:: DataReader ,
2826} ;
2927use rand:: { Rng , random, thread_rng} ;
@@ -227,6 +225,16 @@ struct Options {
227225 /// uses take()/read() instead of take_next_instance() read_next_instance()
228226 #[ clap( short = 'K' , long = "take-read" ) ]
229227 take_read : bool ,
228+
229+ /// ContentFilteredTopic filter expression (quotes required around the expression). Cannot be used with -c on
230+ /// subscriber applications
231+ #[ clap( short = 'F' , long = "cft" ) ]
232+ cft_expression : Option < String > ,
233+
234+ /// If set, the modulo operation is applied to the shapesize. This will make that shapesize is in the range [1,N].
235+ /// This only applies if shapesize is increased (-z 0)
236+ #[ clap( short = 'Q' , long = "size-modulo" ) ]
237+ size_modulo : Option < i32 > ,
230238}
231239
232240impl Options {
@@ -240,7 +248,7 @@ impl Options {
240248 Ok ( ( ) )
241249 }
242250
243- fn color_for_publisher ( & self ) -> String {
251+ fn interpret_color ( & self ) -> String {
244252 match self . color . clone ( ) {
245253 Some ( color) => color,
246254 None => {
@@ -331,10 +339,10 @@ impl Options {
331339}
332340
333341struct Listener ;
334- impl < R : DdsRuntime > DomainParticipantListener < R > for Listener {
342+ impl DomainParticipantListener for Listener {
335343 async fn on_inconsistent_topic (
336344 & mut self ,
337- the_topic : TopicAsync < R > ,
345+ the_topic : TopicAsync ,
338346 _status : InconsistentTopicStatus ,
339347 ) {
340348 println ! (
@@ -346,7 +354,7 @@ impl<R: DdsRuntime> DomainParticipantListener<R> for Listener {
346354
347355 async fn on_offered_incompatible_qos (
348356 & mut self ,
349- the_writer : dust_dds:: dds_async:: data_writer:: DataWriterAsync < R , ( ) > ,
357+ the_writer : dust_dds:: dds_async:: data_writer:: DataWriterAsync < ( ) > ,
350358 status : dust_dds:: infrastructure:: status:: OfferedIncompatibleQosStatus ,
351359 ) {
352360 let policy_name = qos_policy_name ( status. last_policy_id ) ;
@@ -361,7 +369,7 @@ impl<R: DdsRuntime> DomainParticipantListener<R> for Listener {
361369
362370 async fn on_publication_matched (
363371 & mut self ,
364- the_writer : dust_dds:: dds_async:: data_writer:: DataWriterAsync < R , ( ) > ,
372+ the_writer : dust_dds:: dds_async:: data_writer:: DataWriterAsync < ( ) > ,
365373 status : dust_dds:: infrastructure:: status:: PublicationMatchedStatus ,
366374 ) {
367375 if !the_writer. get_topic ( ) . get_name ( ) . starts_with ( "DCPS" ) {
@@ -377,7 +385,7 @@ impl<R: DdsRuntime> DomainParticipantListener<R> for Listener {
377385
378386 async fn on_offered_deadline_missed (
379387 & mut self ,
380- the_writer : dust_dds:: dds_async:: data_writer:: DataWriterAsync < R , ( ) > ,
388+ the_writer : dust_dds:: dds_async:: data_writer:: DataWriterAsync < ( ) > ,
381389 status : dust_dds:: infrastructure:: status:: OfferedDeadlineMissedStatus ,
382390 ) {
383391 println ! (
@@ -391,7 +399,7 @@ impl<R: DdsRuntime> DomainParticipantListener<R> for Listener {
391399
392400 async fn on_liveliness_lost (
393401 & mut self ,
394- the_writer : dust_dds:: dds_async:: data_writer:: DataWriterAsync < R , ( ) > ,
402+ the_writer : dust_dds:: dds_async:: data_writer:: DataWriterAsync < ( ) > ,
395403 status : dust_dds:: infrastructure:: status:: LivelinessLostStatus ,
396404 ) {
397405 println ! (
@@ -405,7 +413,7 @@ impl<R: DdsRuntime> DomainParticipantListener<R> for Listener {
405413
406414 async fn on_requested_incompatible_qos (
407415 & mut self ,
408- the_reader : dust_dds:: dds_async:: data_reader:: DataReaderAsync < R , ( ) > ,
416+ the_reader : dust_dds:: dds_async:: data_reader:: DataReaderAsync < ( ) > ,
409417 status : dust_dds:: infrastructure:: status:: RequestedIncompatibleQosStatus ,
410418 ) {
411419 let policy_name = qos_policy_name ( status. last_policy_id ) ;
@@ -420,7 +428,7 @@ impl<R: DdsRuntime> DomainParticipantListener<R> for Listener {
420428
421429 async fn on_subscription_matched (
422430 & mut self ,
423- the_reader : dust_dds:: dds_async:: data_reader:: DataReaderAsync < R , ( ) > ,
431+ the_reader : dust_dds:: dds_async:: data_reader:: DataReaderAsync < ( ) > ,
424432 status : dust_dds:: infrastructure:: status:: SubscriptionMatchedStatus ,
425433 ) {
426434 if !the_reader
@@ -440,7 +448,7 @@ impl<R: DdsRuntime> DomainParticipantListener<R> for Listener {
440448
441449 async fn on_requested_deadline_missed (
442450 & mut self ,
443- the_reader : dust_dds:: dds_async:: data_reader:: DataReaderAsync < R , ( ) > ,
451+ the_reader : dust_dds:: dds_async:: data_reader:: DataReaderAsync < ( ) > ,
444452 status : dust_dds:: infrastructure:: status:: RequestedDeadlineMissedStatus ,
445453 ) {
446454 println ! (
@@ -454,7 +462,7 @@ impl<R: DdsRuntime> DomainParticipantListener<R> for Listener {
454462
455463 async fn on_liveliness_changed (
456464 & mut self ,
457- the_reader : dust_dds:: dds_async:: data_reader:: DataReaderAsync < R , ( ) > ,
465+ the_reader : dust_dds:: dds_async:: data_reader:: DataReaderAsync < ( ) > ,
458466 status : dust_dds:: infrastructure:: status:: LivelinessChangedStatus ,
459467 ) {
460468 println ! (
@@ -495,9 +503,9 @@ fn move_shape(
495503}
496504
497505fn init_publisher (
498- participant : & DomainParticipant < StdRuntime > ,
506+ participant : & DomainParticipant ,
499507 options : Options ,
500- ) -> Result < DataWriter < StdRuntime , ShapeType > , InitializeError > {
508+ ) -> Result < DataWriter < ShapeType > , InitializeError > {
501509 let topic = participant
502510 . lookup_topicdescription ( & options. topic_name )
503511 . expect ( "lookup_topicdescription succeeds" )
@@ -510,7 +518,7 @@ fn init_publisher(
510518 println ! (
511519 "Create writer for topic: {} color: {}" ,
512520 options. topic_name,
513- options. color_for_publisher ( )
521+ options. interpret_color ( )
514522 ) ;
515523
516524 let mut data_writer_qos = DataWriterQos {
@@ -541,7 +549,7 @@ fn init_publisher(
541549}
542550
543551fn run_publisher (
544- data_writer : & DataWriter < StdRuntime , ShapeType > ,
552+ data_writer : & DataWriter < ShapeType > ,
545553 options : Options ,
546554 all_done : Receiver < ( ) > ,
547555) -> Result < ( ) , RunningError > {
@@ -550,7 +558,7 @@ fn run_publisher(
550558 let da_width = 240 ;
551559 let da_height = 270 ;
552560 let mut shape = ShapeType {
553- color : options. color_for_publisher ( ) ,
561+ color : options. interpret_color ( ) ,
554562 x : random :: < i32 > ( ) % da_width,
555563 y : random :: < i32 > ( ) % da_height,
556564 shapesize : options. shapesize ,
@@ -571,7 +579,12 @@ fn run_publisher(
571579
572580 while all_done. try_recv ( ) . is_err ( ) {
573581 if options. shapesize == 0 {
574- shape. shapesize += 1 ;
582+ if let Some ( size_modulo) = options. size_modulo {
583+ // Size cannot be 0, so increase it after modulo operation
584+ shape. shapesize = ( shape. shapesize % size_modulo) + 1 ;
585+ } else {
586+ shape. shapesize += 1 ;
587+ }
575588 }
576589
577590 move_shape ( & mut shape, & mut x_vel, & mut y_vel, da_width, da_height) ;
@@ -594,9 +607,9 @@ fn run_publisher(
594607}
595608
596609fn init_subscriber (
597- participant : & DomainParticipant < StdRuntime > ,
610+ participant : & DomainParticipant ,
598611 options : Options ,
599- ) -> Result < DataReader < StdRuntime , ShapeType > , InitializeError > {
612+ ) -> Result < DataReader < ShapeType > , InitializeError > {
600613 let topic = participant
601614 . lookup_topicdescription ( & options. topic_name )
602615 . expect ( "lookup_topicdescription succeeds" )
@@ -621,18 +634,21 @@ fn init_subscriber(
621634 ) ;
622635 }
623636
624- let data_reader = match options. color {
625- // filter on specified color
626- Some ( color) => {
637+ let data_reader = match options. cft_expression {
638+ Some ( cft_expression) => {
627639 let filtered_topic_name = options. topic_name + "_filtered" ;
628- println ! (
629- "Create reader for topic: {} color: {}" ,
630- filtered_topic_name, & color
631- ) ;
640+ println ! ( "ContentFilterTopic = \" {cft_expression}\" " ) ;
641+ println ! ( "Create reader for topic: {} " , filtered_topic_name) ;
642+ let color = cft_expression
643+ . split ( "=" )
644+ . nth ( 1 )
645+ . unwrap ( )
646+ . trim_matches ( & [ ' ' , '\'' ] )
647+ . to_string ( ) ;
632648 let content_filtered_topic = participant. create_contentfilteredtopic (
633649 & filtered_topic_name,
634650 & topic,
635- String :: from ( "color = %0" ) ,
651+ cft_expression ,
636652 vec ! [ color] ,
637653 ) ?;
638654
@@ -643,7 +659,6 @@ fn init_subscriber(
643659 NO_STATUS ,
644660 ) ?
645661 }
646- // No filter on specified color
647662 None => {
648663 println ! ( "Create reader for topic: {} " , options. topic_name) ;
649664 subscriber. create_datareader :: < ShapeType > (
@@ -659,7 +674,7 @@ fn init_subscriber(
659674}
660675
661676fn run_subscriber (
662- data_reader : & DataReader < StdRuntime , ShapeType > ,
677+ data_reader : & DataReader < ShapeType > ,
663678 options : Options ,
664679 all_done : Receiver < ( ) > ,
665680) -> Result < ( ) , RunningError > {
@@ -713,7 +728,7 @@ fn run_subscriber(
713728 Ok ( ( ) )
714729}
715730
716- fn initialize ( options : & Options ) -> Result < DomainParticipant < StdRuntime > , InitializeError > {
731+ fn initialize ( options : & Options ) -> Result < DomainParticipant , InitializeError > {
717732 let participant_factory = DomainParticipantFactory :: get_instance ( ) ;
718733 let participant = participant_factory. create_participant (
719734 options. domain_id ,
0 commit comments