本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《 阿里云开发者社区用户服务协议 》和 《 阿里云开发者社区知识产权保护指引 》。如果您发现本社区中有涉嫌抄袭的内容,填写 侵权投诉表单 进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。 最近用Python写了一个爬虫项目,为了方便,用Java做了一个控制端,然后用RabbitMq将他们串起来

首先Java端的代码,生产者与消费者都采用的单例模式,其中消费者在tomcat启动时自动进行消费。话不多说,上代码

//消费者
public class ScrapyRabbitCon{
    //队列名
    private final static String QUEUE_NAME = "pythonjava";
    private static ScrapyRabbitCon rabbitmq;
    public static ScrapyRabbitCon getRabbit() {
        if(rabbitmq==null){
            try {
                rabbitmq = new ScrapyRabbitCon();
            } catch (IOException | TimeoutException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
        return rabbitmq;
    private ScrapyRabbitCon() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setPort(5672);
//        factory.setConnectionTimeout(2);
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                String message = new String(body, "UTF-8");
                //此处采用Swing弹窗显示接收到的消息
                JOptionPane.showMessageDialog(null, message, "ERROR", JOptionPane.ERROR_MESSAGE);
                System.out.println(message);
        channel.basicConsume(QUEUE_NAME,true, consumer);
    //生产者
    public class ScrapyRabbitPro {
    //队列名
    private final static String QUEUE_NAME = "javapython";
    private Channel channel;
    private static ScrapyRabbitPro sendRabbit;
    public static ScrapyRabbitPro getSendRabbit(){
        if(sendRabbit==null){
            try {
                sendRabbit = new ScrapyRabbitPro();
            } catch (IOException | TimeoutException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
        return sendRabbit;
    private ScrapyRabbitPro() throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setPort(5672);
//        factory.setConnectionTimeout(2);
        Connection connection = factory.newConnection();
        channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
    public void send(JSONObject message){
        try {
            channel.basicPublish("", QUEUE_NAME, null, message.toString().getBytes("utf-8"));
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        System.out.println("Producer Send +'" + message + "'");

以上是Java实现RabbitMq的代码,其中生产者封装了一个send方法,调用send方法可将对应的json格式消息发送,由Python端的消费者进行消费。

def callback(ch, method, properties, body):  # 定义一个回调函数,用来接收生产者发送的消息
    global TASKINFO, TASKSTATUS
    body = body.decode('utf-8')
    js = json.loads(body)
    taskid = js.get("taskid")
    TASKINFO = get_taskinfo(taskid)
    TASKSTATUS = get_taskstatus(taskid)
    mq = get_or_save_mq("pythonjava")
    if js.get("method") == 'start':
        writeconf(taskid)
        t1 = threading.Thread(target=go, args=(mq,))
        t1.start()
    if js.get("method") == 'stop':
        t2 = threading.Thread(target=ki, args=(mq,))
        t2.start()
credentials = pika.PlainCredentials('guest', 'guest')
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
channel = connection.channel()
channel.queue_declare(queue='javapython')
channel.basic_consume(callback,
                      queue='javapython',
                      no_ack=True)
print('[消费者] waiting for msg .')
channel.start_consuming()  # 开始循环取消息

以上是Python消费者的代码,目前消费者代码是以脚本形式完成的,作为整个爬虫的入口,消费者代码监听来自Java控制端的命令来控制整个爬虫的运行。

def get_or_save_mq(queue_name):
    mq = MQ_DICT.get(queue_name)
    if mq:
        return mq
    else:
        mq = InitMq(queue_name)
        MQ_DICT[queue_name] = mq
        return mq
class InitMq:
    def __init__(self, uuid):
        queue = uuid
        print("***********初始化MQ驱动*************")
        credentials = pika.PlainCredentials('guest', 'guest')
        connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/', credentials))
        self.channel = connection.channel()
        self.channel.queue_declare(queue=queue)
        self.routing_key = queue
    def send_data(self, body):
        self.channel.basic_publish(exchange='', routing_key=self.routing_key, body=body.encode('utf-8'))

以上是Python中生产者代码,此生产者将爬虫端产生的错误信息与提示信息发到Java控制端。