Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix RecursionError because of repeated channel reconnections. #380

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

liuyaqiu
Copy link

@liuyaqiu liuyaqiu commented Dec 16, 2021

Fix RecursionError because of repeated channel reconnections.

@liuyaqiu
Copy link
Author

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/celery/app/trace.py", line 441, in trace_task
    task.backend.store_result(
  File "/usr/local/lib/python3.8/dist-packages/celery/backends/rpc.py", line 202, in store_result
    producer.publish(
  File "/usr/local/lib/python3.8/dist-packages/kombu/messaging.py", line 177, in publish
    return _publish(
  File "/usr/local/lib/python3.8/dist-packages/kombu/connection.py", line 524, in _ensured
    return fun(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/kombu/messaging.py", line 199, in _publish
    return channel.basic_publish(
  File "/usr/local/lib/python3.8/dist-packages/amqp/channel.py", line 1775, in _basic_publish
    self.connection.drain_events(timeout=0)
  File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 523, in drain_events
    while not self.blocking_read(timeout):
  File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 529, in blocking_read
    return self.on_inbound_frame(frame)
  File "/usr/local/lib/python3.8/dist-packages/amqp/method_framing.py", line 53, in on_frame
    callback(channel, method_sig, buf, None)
  File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 535, in on_inbound_method
    return self.channels[channel_id].dispatch_method(
  File "/usr/local/lib/python3.8/dist-packages/amqp/abstract_channel.py", line 143, in dispatch_method
    listener(*args)
  File "/usr/local/lib/python3.8/dist-packages/amqp/channel.py", line 276, in _on_close
    self._do_revive()
  File "/usr/local/lib/python3.8/dist-packages/amqp/channel.py", line 161, in _do_revive
    self.open()
  File "/usr/local/lib/python3.8/dist-packages/amqp/channel.py", line 432, in open
    return self.send_method(
  File "/usr/local/lib/python3.8/dist-packages/amqp/abstract_channel.py", line 66, in send_method
    return self.wait(wait, returns_tuple=returns_tuple)
  File "/usr/local/lib/python3.8/dist-packages/amqp/abstract_channel.py", line 86, in wait
    self.connection.drain_events(timeout=timeout)
  File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 523, in drain_events
    while not self.blocking_read(timeout):
  File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 529, in blocking_read
    return self.on_inbound_frame(frame)
  File "/usr/local/lib/python3.8/dist-packages/amqp/method_framing.py", line 53, in on_frame
    callback(channel, method_sig, buf, None)
  File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 535, in on_inbound_method
    return self.channels[channel_id].dispatch_method(
  File "/usr/local/lib/python3.8/dist-packages/amqp/abstract_channel.py", line 143, in dispatch_method
    listener(*args)
  File "/usr/local/lib/python3.8/dist-packages/amqp/channel.py", line 276, in _on_close
    self._do_revive()
  File "/usr/local/lib/python3.8/dist-packages/amqp/channel.py", line 161, in _do_revive
    self.open()

Finally raise RecursionError:

File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 529, in blocking_read     return self.on_inbound_frame(frame)   File "/usr/local/lib/python3.8/dist-packages/amqp/method_framing.py", line 53, in on_frame     callback(channel, method_sig, buf, None)   File "/usr/local/lib/python3.8/dist-packages/amqp/connection.py", line 535, in on_inbound_method     return self.channels[channel_id].dispatch_method(   File "/usr/local/lib/python3.8/dist-packages/amqp/abstract_channel.py", line 131, in dispatch_method     one_shot = self._pending.pop(method_sig) RecursionError: maximum recursion depth exceeded while calling a Python object

Copy link
Member

@auvipy auvipy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add unit test for the changes? so that we can reproduce the issue

@auvipy
Copy link
Member

auvipy commented Dec 16, 2021

@pawl @michael-lazar can you guys please try this patch?

@liuyaqiu
Copy link
Author

liuyaqiu commented Dec 16, 2021

@auvipy
Thanks. But I think this may lead some other problem. Because when I use it in my project, it makes the celery worker can't consume tasks. I think my solution is wrong and lead some other problems.

I don't know much about this project. I think it is a problem because it repeats in my production environment. But I don't know how to reproduce it and really solve it.

@liuyaqiu liuyaqiu marked this pull request as draft December 16, 2021 12:48
@michael-lazar
Copy link
Contributor

@liuyaqiu Is this a recent issue for you or has this been happening for a while?

@liuyaqiu
Copy link
Author

@liuyaqiu Is this a recent issue for you or has this been happening for a while?

This has been happening for a while. But it now always repeats on my production environment. I think it is because of:

  1. The celery worker(client) wanted to publish a message.
  2. the client found that the channel is closing, but its connection is not closed. It found a S:CLOSE frame.
  3. the client call on_close(), sent a spec.Channel.CloseOk. Then because the connection is not closed, it wants to revive the channel. call _do_revive()
  4. During _do_revive(), calling open() and send spec.Channel.Open and wait for spec.Channel.OpenOk
  5. but the next frame is still a S:CLOSE frame. so go to the setp 3.

I think the rabbitmq may be in wrong status, so the client received too much S:CLOSE frame, and leads that too many on_close() is called.

Then, the RecurssionError is not captured by the celery framework, celery think it is a task's runtime error, so it reported the task failed. In fact, the task didn't event start. (The task failed when it publish task's status to rabbitmq backend).

I think now my solution is a quick fix for this problem. When the client found too much on_close, it should stop channel reviving and raise ChannelError, rather than repeated to cause a RecurssionError which can't be catched by downstream application.

A better idea may be:
When a channel is reviving, ignore all frame other than S:OPEN-OK. Then the channel should not auto reviving after too much open operation during a period, and then exit to avoid infinity loop.

@auvipy auvipy requested a review from matusvalo December 17, 2021 16:07
Copy link
Member

@matusvalo matusvalo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I agree with following statement:

I think now my solution is a quick fix for this problem. When the client found too much on_close, it should stop channel reviving and raise ChannelError, rather than repeated to cause a RecurssionError which can't be catched by downstream application.

A better idea may be:
When a channel is reviving, ignore all frame other than S:OPEN-OK. Then the channel should not auto reviving after too much open operation during a period, and then exit to avoid infinity loop.

@matusvalo
Copy link
Member

Me personally, I prefer to have final fix. this PR is honestly just dirty fix which can lead to other hidden problems.

@liuyaqiu
Copy link
Author

Me personally, I prefer to have final fix. this PR is honestly just dirty fix which can lead to other hidden problems.

Thanks. I will try to solve it in a better way.

@auvipy
Copy link
Member

auvipy commented Jan 12, 2022

@pawl if you have time in coming days

@roni-cye
Copy link

roni-cye commented Apr 7, 2022

@auvipy @liuyaqiu @matusvalo Hello guys, I had this issue too, any updates about it ?

@liuyaqiu
Copy link
Author

liuyaqiu commented Apr 14, 2022

@matusvalo Hello guys, I had this issue too, any updates about it ?

I don't know your problem context. Previously I call a subtask synchronously in a parent task and use the rpc result backend to store task state and result, I try to get subtask's state and result in the parent task. But now I don't use rpc result backend and use the mongodb result backend. Previously my error is encountered when I get the subtask's state and result from rpc result backend. And now I just use RabbitMQ as broker, there is no such error.

And you should not use the rpc result backend in production environment because the rpc result backend will create a unique queue for every task to store its state and result. Then this leads to too many result queues in RabbitMQ, which will waste resource of RabbitMQ and harm RabbitMQ's performance.

@liuyaqiu
Copy link
Author

@liuyaqiu What you're describing is the old AMQP backend. The RPC backend uses RabbitMQ's Pub/Sub capabilities.

What I am describing remains in the version v5.2.1. Is this changed in the master?

@auvipy
Copy link
Member

auvipy commented Nov 12, 2022

@liuyaqiu What you're describing is the old AMQP backend. The RPC backend uses RabbitMQ's Pub/Sub capabilities.

What I am describing remains in the version v5.2.1. Is this changed in the master?

no you are right, that didn't changed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants