from django.contrib import admin from django.shortcuts import render, redirect from django.contrib import messages from django.urls import path, reverse from django.utils.html import format_html from django.contrib.auth import get_user_model from django.http import HttpResponseRedirect from django.core.mail import send_mail from django.conf import settings from django.utils import timezone from .models import MailingList, Subscriber, EmailTemplate, Campaign, EmailLog from biz.models import Prospect from sync.admin import SyncedObjectAdmin User = get_user_model() @admin.register(MailingList) class MailingListAdmin(SyncedObjectAdmin): list_display = ('name', 'subscriber_count', 'creation_date') search_fields = ('name', 'description') readonly_fields = ('subscriber_count',) @admin.register(EmailTemplate) class EmailTemplateAdmin(SyncedObjectAdmin): list_display = ('name', 'subject', 'creation_date') search_fields = ('name', 'subject') fieldsets = ( (None, { 'fields': ('name', 'subject') }), ('Content', { 'fields': ('html_content', 'text_content'), 'description': 'HTML content supports images and rich formatting. Text content is a fallback for plain text email clients.' }), ) @admin.register(Subscriber) class SubscriberAdmin(SyncedObjectAdmin): list_display = ('email', 'full_name', 'is_active', 'creation_date', 'get_mailing_lists') list_filter = ('is_active', 'mailing_lists', 'creation_date') search_fields = ('email', 'first_name', 'last_name') filter_horizontal = ('mailing_lists',) readonly_fields = ('unsubscribe_token',) fieldsets = ( (None, { 'fields': ('email', 'first_name', 'last_name', 'is_active') }), ('Mailing Lists', { 'fields': ('mailing_lists',) }), ('System', { 'fields': ('unsubscribe_token', 'user', 'prospect'), 'classes': ('collapse',) }), ) actions = ['import_from_users', 'import_from_prospects', 'deactivate_subscribers'] def get_mailing_lists(self, obj): return ", ".join([ml.name for ml in obj.mailing_lists.all()]) get_mailing_lists.short_description = 'Mailing Lists' def get_urls(self): urls = super().get_urls() custom_urls = [ path('import-users/', self.admin_site.admin_view(self.import_users_view), name='mailing_subscriber_import_users'), path('import-prospects/', self.admin_site.admin_view(self.import_prospects_view), name='mailing_subscriber_import_prospects'), ] return custom_urls + urls def import_users_view(self, request): if request.method == 'POST': mailing_list_id = request.POST.get('mailing_list') try: mailing_list = MailingList.objects.get(id=mailing_list_id) users = User.objects.all() created_count = 0 for user in users: if user.email: subscriber, created = Subscriber.objects.get_or_create( email=user.email, defaults={ 'first_name': user.first_name, 'last_name': user.last_name, 'user': user, } ) subscriber.mailing_lists.add(mailing_list) if created: created_count += 1 messages.success(request, f'Imported {created_count} users into mailing list "{mailing_list.name}"') return redirect('admin:mailing_subscriber_changelist') except MailingList.DoesNotExist: messages.error(request, 'Selected mailing list does not exist') mailing_lists = MailingList.objects.all() context = { 'title': 'Import Users to Mailing List', 'mailing_lists': mailing_lists, } return render(request, 'admin/mailing/subscriber/import_users.html', context) def import_prospects_view(self, request): if request.method == 'POST': mailing_list_id = request.POST.get('mailing_list') try: mailing_list = MailingList.objects.get(id=mailing_list_id) prospects = Prospect.objects.filter(email__isnull=False) created_count = 0 for prospect in prospects: subscriber, created = Subscriber.objects.get_or_create( email=prospect.email, defaults={ 'first_name': prospect.first_name, 'last_name': prospect.last_name, 'prospect': prospect, } ) subscriber.mailing_lists.add(mailing_list) if created: created_count += 1 messages.success(request, f'Imported {created_count} prospects into mailing list "{mailing_list.name}"') return redirect('admin:mailing_subscriber_changelist') except MailingList.DoesNotExist: messages.error(request, 'Selected mailing list does not exist') mailing_lists = MailingList.objects.all() context = { 'title': 'Import Prospects to Mailing List', 'mailing_lists': mailing_lists, } return render(request, 'admin/mailing/subscriber/import_prospects.html', context) def import_from_users(self, request, queryset): return redirect('admin:mailing_subscriber_import_users') import_from_users.short_description = "Import from Users" def import_from_prospects(self, request, queryset): return redirect('admin:mailing_subscriber_import_prospects') import_from_prospects.short_description = "Import from Prospects" def deactivate_subscribers(self, request, queryset): count = queryset.update(is_active=False) messages.success(request, f'Deactivated {count} subscribers') deactivate_subscribers.short_description = "Deactivate selected subscribers" @admin.register(Campaign) class CampaignAdmin(SyncedObjectAdmin): list_display = ('name', 'subject', 'is_sent', 'sent_at', 'total_sent', 'total_opened', 'total_clicked') list_filter = ('is_sent', 'sent_at', 'mailing_lists') search_fields = ('name', 'subject') filter_horizontal = ('mailing_lists',) readonly_fields = ('is_sent', 'sent_at', 'total_sent', 'total_delivered', 'total_opened', 'total_clicked', 'total_bounced') fieldsets = ( (None, { 'fields': ('name', 'subject', 'mailing_lists') }), ('Content', { 'fields': ('template', 'html_content', 'text_content'), 'description': 'Use a template OR fill in content directly' }), ('Statistics', { 'fields': ('is_sent', 'sent_at', 'total_sent', 'total_delivered', 'total_opened', 'total_clicked', 'total_bounced'), 'classes': ('collapse',) }), ) actions = ['send_campaign'] def send_campaign(self, request, queryset): if queryset.count() != 1: messages.error(request, "Please select exactly one campaign to send.") return campaign = queryset.first() if campaign.is_sent: messages.error(request, "This campaign has already been sent.") return return redirect('admin:mailing_campaign_send', campaign.id) send_campaign.short_description = "Send campaign" def get_urls(self): urls = super().get_urls() custom_urls = [ path('/send/', self.admin_site.admin_view(self.send_campaign_view), name='mailing_campaign_send'), ] return custom_urls + urls def send_campaign_view(self, request, campaign_id): try: campaign = Campaign.objects.get(id=campaign_id) except Campaign.DoesNotExist: messages.error(request, "Campaign not found.") return redirect('admin:mailing_campaign_changelist') if campaign.is_sent: messages.error(request, "This campaign has already been sent.") return redirect('admin:mailing_campaign_changelist') if request.method == 'POST': # Send the campaign subscribers = Subscriber.objects.filter( mailing_lists__in=campaign.mailing_lists.all(), is_active=True ).distinct() content = campaign.get_content() subject = campaign.get_subject() sent_count = 0 failed_count = 0 for subscriber in subscribers: try: # Create email log entry email_log, created = EmailLog.objects.get_or_create( campaign=campaign, subscriber=subscriber, defaults={'sent_at': timezone.now()} ) if not created and email_log.is_sent: continue # Skip if already sent # Prepare email content with tracking and unsubscribe link unsubscribe_url = request.build_absolute_uri( reverse('mailing:unsubscribe', kwargs={'token': subscriber.unsubscribe_token}) ) # Add tracking pixel for email opens tracking_pixel_url = request.build_absolute_uri( reverse('mailing:track_open', kwargs={'tracking_id': email_log.tracking_id}) ) html_content = content['html'] if html_content: # Add tracking pixel (invisible 1x1 image) html_content += f'' html_content += f'

Unsubscribe' text_content = content['text'] if text_content: text_content += f'\n\nUnsubscribe: {unsubscribe_url}' # Send email send_mail( subject=subject, message=text_content or '', from_email=settings.DEFAULT_FROM_EMAIL, recipient_list=[subscriber.email], html_message=html_content, fail_silently=False, ) email_log.sent_at = timezone.now() email_log.save() sent_count += 1 except Exception as e: email_log.error_message = str(e) email_log.save() failed_count += 1 # Update campaign statistics campaign.total_sent = sent_count campaign.sent_at = timezone.now() campaign.is_sent = True campaign.save() if failed_count > 0: messages.warning(request, f'Campaign sent to {sent_count} subscribers. {failed_count} emails failed.') else: messages.success(request, f'Campaign sent successfully to {sent_count} subscribers.') return redirect('admin:mailing_campaign_changelist') # Calculate how many subscribers would receive this email subscriber_count = Subscriber.objects.filter( mailing_lists__in=campaign.mailing_lists.all(), is_active=True ).distinct().count() context = { 'title': f'Send Campaign: {campaign.name}', 'campaign': campaign, 'subscriber_count': subscriber_count, } return render(request, 'admin/mailing/campaign/send_campaign.html', context) @admin.register(EmailLog) class EmailLogAdmin(SyncedObjectAdmin): list_display = ('campaign', 'subscriber', 'is_sent', 'is_opened', 'is_clicked', 'sent_at') list_filter = ('campaign', 'sent_at', 'opened_at', 'clicked_at', 'bounced_at') search_fields = ('subscriber__email', 'campaign__name') readonly_fields = ('tracking_id', 'sent_at', 'delivered_at', 'opened_at', 'clicked_at', 'bounced_at') fieldsets = ( (None, { 'fields': ('campaign', 'subscriber', 'tracking_id') }), ('Tracking', { 'fields': ('sent_at', 'delivered_at', 'opened_at', 'clicked_at', 'bounced_at', 'error_message') }), )