You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
padelclub_backend/mailing/admin.py

317 lines
13 KiB

from django.contrib import admin
from django.shortcuts import render, redirect
from django.contrib import messages
from django.urls import path, reverse
from django.utils.html import format_html
from django.contrib.auth import get_user_model
from django.http import HttpResponseRedirect
from django.core.mail import send_mail
from django.conf import settings
from django.utils import timezone
from .models import MailingList, Subscriber, EmailTemplate, Campaign, EmailLog
from biz.models import Prospect
from sync.admin import SyncedObjectAdmin
User = get_user_model()
@admin.register(MailingList)
class MailingListAdmin(SyncedObjectAdmin):
list_display = ('name', 'subscriber_count', 'creation_date')
search_fields = ('name', 'description')
readonly_fields = ('subscriber_count',)
@admin.register(EmailTemplate)
class EmailTemplateAdmin(SyncedObjectAdmin):
list_display = ('name', 'subject', 'creation_date')
search_fields = ('name', 'subject')
fieldsets = (
(None, {
'fields': ('name', 'subject')
}),
('Content', {
'fields': ('html_content', 'text_content'),
'description': 'HTML content supports images and rich formatting. Text content is a fallback for plain text email clients.'
}),
)
@admin.register(Subscriber)
class SubscriberAdmin(SyncedObjectAdmin):
list_display = ('email', 'full_name', 'is_active', 'creation_date', 'get_mailing_lists')
list_filter = ('is_active', 'mailing_lists', 'creation_date')
search_fields = ('email', 'first_name', 'last_name')
filter_horizontal = ('mailing_lists',)
readonly_fields = ('unsubscribe_token',)
fieldsets = (
(None, {
'fields': ('email', 'first_name', 'last_name', 'is_active')
}),
('Mailing Lists', {
'fields': ('mailing_lists',)
}),
('System', {
'fields': ('unsubscribe_token', 'user', 'prospect'),
'classes': ('collapse',)
}),
)
actions = ['import_from_users', 'import_from_prospects', 'deactivate_subscribers']
def get_mailing_lists(self, obj):
return ", ".join([ml.name for ml in obj.mailing_lists.all()])
get_mailing_lists.short_description = 'Mailing Lists'
def get_urls(self):
urls = super().get_urls()
custom_urls = [
path('import-users/', self.admin_site.admin_view(self.import_users_view), name='mailing_subscriber_import_users'),
path('import-prospects/', self.admin_site.admin_view(self.import_prospects_view), name='mailing_subscriber_import_prospects'),
]
return custom_urls + urls
def import_users_view(self, request):
if request.method == 'POST':
mailing_list_id = request.POST.get('mailing_list')
try:
mailing_list = MailingList.objects.get(id=mailing_list_id)
users = User.objects.all()
created_count = 0
for user in users:
if user.email:
subscriber, created = Subscriber.objects.get_or_create(
email=user.email,
defaults={
'first_name': user.first_name,
'last_name': user.last_name,
'user': user,
}
)
subscriber.mailing_lists.add(mailing_list)
if created:
created_count += 1
messages.success(request, f'Imported {created_count} users into mailing list "{mailing_list.name}"')
return redirect('admin:mailing_subscriber_changelist')
except MailingList.DoesNotExist:
messages.error(request, 'Selected mailing list does not exist')
mailing_lists = MailingList.objects.all()
context = {
'title': 'Import Users to Mailing List',
'mailing_lists': mailing_lists,
}
return render(request, 'admin/mailing/subscriber/import_users.html', context)
def import_prospects_view(self, request):
if request.method == 'POST':
mailing_list_id = request.POST.get('mailing_list')
try:
mailing_list = MailingList.objects.get(id=mailing_list_id)
prospects = Prospect.objects.filter(email__isnull=False)
created_count = 0
for prospect in prospects:
subscriber, created = Subscriber.objects.get_or_create(
email=prospect.email,
defaults={
'first_name': prospect.first_name,
'last_name': prospect.last_name,
'prospect': prospect,
}
)
subscriber.mailing_lists.add(mailing_list)
if created:
created_count += 1
messages.success(request, f'Imported {created_count} prospects into mailing list "{mailing_list.name}"')
return redirect('admin:mailing_subscriber_changelist')
except MailingList.DoesNotExist:
messages.error(request, 'Selected mailing list does not exist')
mailing_lists = MailingList.objects.all()
context = {
'title': 'Import Prospects to Mailing List',
'mailing_lists': mailing_lists,
}
return render(request, 'admin/mailing/subscriber/import_prospects.html', context)
def import_from_users(self, request, queryset):
return redirect('admin:mailing_subscriber_import_users')
import_from_users.short_description = "Import from Users"
def import_from_prospects(self, request, queryset):
return redirect('admin:mailing_subscriber_import_prospects')
import_from_prospects.short_description = "Import from Prospects"
def deactivate_subscribers(self, request, queryset):
count = queryset.update(is_active=False)
messages.success(request, f'Deactivated {count} subscribers')
deactivate_subscribers.short_description = "Deactivate selected subscribers"
@admin.register(Campaign)
class CampaignAdmin(SyncedObjectAdmin):
list_display = ('name', 'subject', 'is_sent', 'sent_at', 'total_sent', 'total_opened', 'total_clicked')
list_filter = ('is_sent', 'sent_at', 'mailing_lists')
search_fields = ('name', 'subject')
filter_horizontal = ('mailing_lists',)
readonly_fields = ('is_sent', 'sent_at', 'total_sent', 'total_delivered', 'total_opened', 'total_clicked', 'total_bounced')
fieldsets = (
(None, {
'fields': ('name', 'subject', 'mailing_lists')
}),
('Content', {
'fields': ('template', 'html_content', 'text_content'),
'description': 'Use a template OR fill in content directly'
}),
('Statistics', {
'fields': ('is_sent', 'sent_at', 'total_sent', 'total_delivered', 'total_opened', 'total_clicked', 'total_bounced'),
'classes': ('collapse',)
}),
)
actions = ['send_campaign']
def send_campaign(self, request, queryset):
if queryset.count() != 1:
messages.error(request, "Please select exactly one campaign to send.")
return
campaign = queryset.first()
if campaign.is_sent:
messages.error(request, "This campaign has already been sent.")
return
return redirect('admin:mailing_campaign_send', campaign.id)
send_campaign.short_description = "Send campaign"
def get_urls(self):
urls = super().get_urls()
custom_urls = [
path('<uuid:campaign_id>/send/', self.admin_site.admin_view(self.send_campaign_view), name='mailing_campaign_send'),
]
return custom_urls + urls
def send_campaign_view(self, request, campaign_id):
try:
campaign = Campaign.objects.get(id=campaign_id)
except Campaign.DoesNotExist:
messages.error(request, "Campaign not found.")
return redirect('admin:mailing_campaign_changelist')
if campaign.is_sent:
messages.error(request, "This campaign has already been sent.")
return redirect('admin:mailing_campaign_changelist')
if request.method == 'POST':
# Send the campaign
subscribers = Subscriber.objects.filter(
mailing_lists__in=campaign.mailing_lists.all(),
is_active=True
).distinct()
content = campaign.get_content()
subject = campaign.get_subject()
sent_count = 0
failed_count = 0
for subscriber in subscribers:
try:
# Create email log entry
email_log, created = EmailLog.objects.get_or_create(
campaign=campaign,
subscriber=subscriber,
defaults={'sent_at': timezone.now()}
)
if not created and email_log.is_sent:
continue # Skip if already sent
# Prepare email content with tracking and unsubscribe link
unsubscribe_url = request.build_absolute_uri(
reverse('mailing:unsubscribe', kwargs={'token': subscriber.unsubscribe_token})
)
# Add tracking pixel for email opens
tracking_pixel_url = request.build_absolute_uri(
reverse('mailing:track_open', kwargs={'tracking_id': email_log.tracking_id})
)
html_content = content['html']
if html_content:
# Add tracking pixel (invisible 1x1 image)
html_content += f'<img src="{tracking_pixel_url}" width="1" height="1" style="display:none;" />'
html_content += f'<br><br><a href="{unsubscribe_url}">Unsubscribe</a>'
text_content = content['text']
if text_content:
text_content += f'\n\nUnsubscribe: {unsubscribe_url}'
# Send email
send_mail(
subject=subject,
message=text_content or '',
from_email=settings.DEFAULT_FROM_EMAIL,
recipient_list=[subscriber.email],
html_message=html_content,
fail_silently=False,
)
email_log.sent_at = timezone.now()
email_log.save()
sent_count += 1
except Exception as e:
email_log.error_message = str(e)
email_log.save()
failed_count += 1
# Update campaign statistics
campaign.total_sent = sent_count
campaign.sent_at = timezone.now()
campaign.is_sent = True
campaign.save()
if failed_count > 0:
messages.warning(request, f'Campaign sent to {sent_count} subscribers. {failed_count} emails failed.')
else:
messages.success(request, f'Campaign sent successfully to {sent_count} subscribers.')
return redirect('admin:mailing_campaign_changelist')
# Calculate how many subscribers would receive this email
subscriber_count = Subscriber.objects.filter(
mailing_lists__in=campaign.mailing_lists.all(),
is_active=True
).distinct().count()
context = {
'title': f'Send Campaign: {campaign.name}',
'campaign': campaign,
'subscriber_count': subscriber_count,
}
return render(request, 'admin/mailing/campaign/send_campaign.html', context)
@admin.register(EmailLog)
class EmailLogAdmin(SyncedObjectAdmin):
list_display = ('campaign', 'subscriber', 'is_sent', 'is_opened', 'is_clicked', 'sent_at')
list_filter = ('campaign', 'sent_at', 'opened_at', 'clicked_at', 'bounced_at')
search_fields = ('subscriber__email', 'campaign__name')
readonly_fields = ('tracking_id', 'sent_at', 'delivered_at', 'opened_at', 'clicked_at', 'bounced_at')
fieldsets = (
(None, {
'fields': ('campaign', 'subscriber', 'tracking_id')
}),
('Tracking', {
'fields': ('sent_at', 'delivered_at', 'opened_at', 'clicked_at', 'bounced_at', 'error_message')
}),
)