Skip to content

[rocketmq-spark] ask a master for help, repeated consumption occurs when the program restarts #901

@kai23333

Description

@kai23333

Reference code : https://github.com/apache/rocketmq-externals/blob/master/rocketmq-spark/src/test/java/org/apache/rocketmq/spark/streaming/RocketMqUtilsTest.java

code

My code :

image

Consumer Strategy Have chosen lastest, This setting doesn't seem to work, Program restart consumes historical data,

How to solve this problem?

The complete code is as follows:

        try {

            Map<String, String> optionParams = new HashMap<>();
            optionParams.put(RocketMQConfig.NAME_SERVER_ADDR, nameSrvAddr);
            SparkConf sparkConf = new SparkConf().setAppName("JavaCustomReceiver").setMaster("local[*]");
            JavaStreamingContext sc = new JavaStreamingContext(sparkConf, new Duration(duration));

            List<String> topics = new ArrayList<>();
            if (StringUtils.hasText(topic)) {
                for (String s : topic.split(";")) {
                    topics.add(s);
                }
            }

            LocationStrategy locationStrategy = LocationStrategy.PreferConsistent();

            JavaInputDStream<MessageExt> stream = RocketMqUtils.createJavaMQPullStream(sc, groupId,
                    topics, ConsumerStrategy.lastest(), false, false, false, locationStrategy, optionParams);

            stream.foreachRDD(new VoidFunction<JavaRDD<MessageExt>>() {

                private static final long serialVersionUID = 1L;

                @Override
                public void call(JavaRDD<MessageExt> messageExtJavaRDD) throws Exception {

                    JavaRDD<GPSRDD> GPSRDDJavaRDD = messageExtJavaRDD.map(new Function<MessageExt, GPSRDD>() {

                        private static final long serialVersionUID = 1L;

                        @Override
                        public GPSRDD call(MessageExt messageExt) throws Exception {

                            GPSRDD gps = new GPSRDD();
                            String xxx = new String(messageExt.getBody());
                            System.out.println(xxx);
                            return gps;
                        }
                    });

                }
            });

            sc.start();

        } catch (Exception e) {
            e.printStackTrace();
        }

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions