commit
b2e1c74aa4
|
@ -0,0 +1,9 @@
|
|||
|
||||
install:
|
||||
python3 setup.py install
|
||||
|
||||
build:
|
||||
python3 setup.py sdist
|
||||
|
||||
upload:
|
||||
twine upload dist/*
|
38
README.md
38
README.md
|
@ -0,0 +1,38 @@
|
|||
# Welcome to the unofficial intellivoid-spam-protection wiki!
|
||||
|
||||
This library handles All Requests done to https://api.intellivoid.net/spamprotection/v1/lookup. To understand how this is meant to be used, please see read the following [Documentation](https://github.com/pokurt/intellivoid-spam-protection/wiki)
|
||||
|
||||
## Getting Started
|
||||
- Installing the library:
|
||||
|
||||
`pip install git+https://github.com/pokurt/intellivoid-spam-protection`
|
||||
|
||||
- For those who wants to try out Development Builds:
|
||||
|
||||
`pip install git+https://github.com/pokurt/intellivoid-spam-protection@dev`
|
||||
|
||||
## Usage
|
||||
|
||||
```
|
||||
from spamprotection import SPBClient
|
||||
import asyncio
|
||||
|
||||
client = SPBClient()
|
||||
|
||||
async def main():
|
||||
user = input("Enter a Username or UserID to check Spam Prediction on SPB: ")
|
||||
status = await client.check_blacklist(user)
|
||||
if status.success:
|
||||
print((await text_parser(status.private_telegram_id)))
|
||||
else:
|
||||
print("Polish Cow did not Approve this!")
|
||||
|
||||
if __name__ == "__main__":
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
loop.close()
|
||||
```
|
||||
|
||||
## Examples
|
||||
|
||||
All Examples are in [Examples Directory](https://github.com/pokurt/intellivoid-spam-protection/tree/master/examples)
|
1
TODO
1
TODO
|
@ -1,4 +1,3 @@
|
|||
# Fix types into it's path
|
||||
# raise a valid error at HostDownError
|
||||
# Damn Docstrings
|
||||
# understand why the heck it freazes on wrong user id or username on python3.7
|
||||
|
|
49
example.py
49
example.py
|
@ -1,49 +0,0 @@
|
|||
from spamprotection import SPBClient
|
||||
from spamprotection.types import Blacklist
|
||||
import asyncio
|
||||
|
||||
# initializing the client
|
||||
client = SPBClient()
|
||||
|
||||
|
||||
async def main():
|
||||
# calling for status
|
||||
user = input("Enter a Username or UserID to check Spam Prediction on SPB: ")
|
||||
status = await client.check_blacklist(user)
|
||||
# check if status got a successful response
|
||||
if status.success:
|
||||
print((await text_parser(status)))
|
||||
else:
|
||||
print("Polish Cow did not Approve this!")
|
||||
|
||||
|
||||
async def text_parser(status: Blacklist):
|
||||
text = "Private TelegramID: {}\n".format(status.private_telegram_id)
|
||||
text += "Entity Type: {}\n".format(status.entity_type)
|
||||
if status.is_blacklisted:
|
||||
text += "Blacklist Flag: {}\n".format(status.blacklist_flag)
|
||||
text += "Blacklist Reason: {}\n".format(status.blacklist_reason)
|
||||
text += "Original PrivateID: {}\n".format(status.original_private_id)
|
||||
if status.is_potential_spammer:
|
||||
text += "This user is a Spammer\n"
|
||||
if status.is_operator:
|
||||
text += "This user is an Operator\n"
|
||||
if status.is_agent:
|
||||
text += "This user is an Agent\n"
|
||||
if status.is_whitelisted:
|
||||
text += "This user is Whitelisted\n"
|
||||
if status.intellivoid_accounts_verified:
|
||||
text += "This user is an Intellivoid Verified Account\n"
|
||||
if status.is_official:
|
||||
text += "This is an Official Account\n"
|
||||
text += "Language: {}\n".format(status.language)
|
||||
text += "Language Probability: {}\n".format(status.probability)
|
||||
text += "Ham Prediction: {}\n".format(status.ham_prediction)
|
||||
text += "Spam Prediction: {}\n".format(status.spam_prediction)
|
||||
text += "Last Updated On: {}\n".format(status.last_updated)
|
||||
return text
|
||||
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
loop.close()
|
|
@ -0,0 +1 @@
|
|||
"""Examples Directory."""
|
|
@ -0,0 +1,68 @@
|
|||
from spamprotection import SPBClient
|
||||
from spamprotection.types import Blacklist
|
||||
import asyncio
|
||||
|
||||
# initializing the client
|
||||
client = SPBClient()
|
||||
|
||||
|
||||
async def main():
|
||||
# calling for status
|
||||
user = input("Enter a Username or UserID: ")
|
||||
status = await client.check_blacklist(user)
|
||||
# check if status got a successful response
|
||||
if status.success:
|
||||
print((await text_parser(status)))
|
||||
else:
|
||||
print("Polish Cow did not Approve this!")
|
||||
|
||||
|
||||
async def text_parser(status: Blacklist):
|
||||
text = "Private TelegramID: {}\n".format(
|
||||
status.private_telegram_id
|
||||
)
|
||||
text += "Entity Type: {}\n".format(status.entity_type)
|
||||
if status.attributes.is_blacklisted:
|
||||
text += "Blacklist Flag: {}\n".format(
|
||||
status.attributes.blacklist_flag
|
||||
)
|
||||
text += "Blacklist Reason: {}\n".format(
|
||||
status.attributes.blacklist_reason
|
||||
)
|
||||
text += "Original PrivateID: {}\n".format(
|
||||
status.attributes.original_private_id
|
||||
)
|
||||
if status.attributes.is_potential_spammer:
|
||||
text += "This user is a Spammer\n"
|
||||
if status.attributes.is_operator:
|
||||
text += "This user is an Operator\n"
|
||||
if status.attributes.is_agent:
|
||||
text += "This user is an Agent\n"
|
||||
if status.attributes.is_whitelisted:
|
||||
text += "This user is Whitelisted\n"
|
||||
if status.attributes.intellivoid_accounts_verified:
|
||||
text += "This user is an Intellivoid Verified Account\n"
|
||||
if status.attributes.is_official:
|
||||
text += "This is an Official Account\n"
|
||||
text += "Language: {}\n".format(
|
||||
status.language_prediction.language
|
||||
)
|
||||
text += "Language Probability: {}\n".format(
|
||||
status.language_prediction.probability
|
||||
)
|
||||
text += "Ham Prediction: {}\n".format(
|
||||
status.spam_prediction.ham_prediction
|
||||
)
|
||||
text += "Spam Prediction: {}\n".format(
|
||||
status.spam_prediction.spam_prediction
|
||||
)
|
||||
text += "Last Updated On: {}\n".format(
|
||||
status.last_updated
|
||||
)
|
||||
return text
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
loop = asyncio.get_event_loop()
|
||||
loop.run_until_complete(main())
|
||||
loop.close()
|
|
@ -0,0 +1,67 @@
|
|||
from spamprotection.sync import SPBClient
|
||||
from spamprotection.types import Blacklist
|
||||
|
||||
# initializing the client
|
||||
client = SPBClient()
|
||||
|
||||
|
||||
def main():
|
||||
# calling for status
|
||||
user = input("Enter a Username or UserID: ")
|
||||
status = client.check_blacklist(user)
|
||||
# check if status got a successful response
|
||||
if status.success:
|
||||
print((text_parser(status)))
|
||||
else:
|
||||
print("Polish Cow did not Approve this!")
|
||||
|
||||
|
||||
def text_parser(status: Blacklist):
|
||||
text = "Private TelegramID: {}\n".format(
|
||||
status.private_telegram_id
|
||||
)
|
||||
text += "Entity Type: {}\n".format(
|
||||
status.entity_type
|
||||
)
|
||||
if status.attributes.is_blacklisted:
|
||||
text += "Blacklist Flag: {}\n".format(
|
||||
status.attributes.blacklist_flag
|
||||
)
|
||||
text += "Blacklist Reason: {}\n".format(
|
||||
status.attributes.blacklist_reason
|
||||
)
|
||||
text += "Original PrivateID: {}\n".format(
|
||||
status.attributes.original_private_id
|
||||
)
|
||||
if status.attributes.is_potential_spammer:
|
||||
text += "This user is a Spammer\n"
|
||||
if status.attributes.is_operator:
|
||||
text += "This user is an Operator\n"
|
||||
if status.attributes.is_agent:
|
||||
text += "This user is an Agent\n"
|
||||
if status.attributes.is_whitelisted:
|
||||
text += "This user is Whitelisted\n"
|
||||
if status.attributes.intellivoid_accounts_verified:
|
||||
text += "This user is an Intellivoid Verified Account\n"
|
||||
if status.attributes.is_official:
|
||||
text += "This is an Official Account\n"
|
||||
text += "Language: {}\n".format(
|
||||
status.language_prediction.language
|
||||
)
|
||||
text += "Language Probability: {}\n".format(
|
||||
status.language_prediction.probability
|
||||
)
|
||||
text += "Ham Prediction: {}\n".format(
|
||||
status.spam_prediction.ham_prediction
|
||||
)
|
||||
text += "Spam Prediction: {}\n".format(
|
||||
status.spam_prediction.spam_prediction
|
||||
)
|
||||
text += "Last Updated On: {}\n".format(
|
||||
status.last_updated
|
||||
)
|
||||
return text
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -1,2 +1,3 @@
|
|||
aiohttp==3.7.3
|
||||
asyncio==3.4.3
|
||||
requests==2.20
|
||||
|
|
|
@ -0,0 +1,4 @@
|
|||
aiohttp==3.7.3
|
||||
asyncio==3.4.3
|
||||
requests==2.20
|
||||
twine==3.3.0
|
4
setup.py
4
setup.py
|
@ -1,5 +1,6 @@
|
|||
#!/usr/bin/env python
|
||||
import setuptools
|
||||
from spamprotection import __version__
|
||||
|
||||
with open("README.md", "r") as fh:
|
||||
long_description = fh.read()
|
||||
|
@ -10,7 +11,7 @@ with open("requirements.txt") as f:
|
|||
|
||||
setuptools.setup(
|
||||
name="Intellivoid SPB",
|
||||
version="0.0.1",
|
||||
version=__version__,
|
||||
description="Unofficial SPB API Wrapper",
|
||||
long_description=long_description,
|
||||
long_description_content_type="text/markdown",
|
||||
|
@ -20,6 +21,7 @@ setuptools.setup(
|
|||
url="https://github.com/pokurt/intellivoid-spam-protection",
|
||||
packages=setuptools.find_packages(),
|
||||
classifiers=[
|
||||
"Development Status :: 3 - Alpha",
|
||||
"Programming Language :: Python :: 3.8",
|
||||
],
|
||||
install_requires=dependencies,
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
"""Initial Directory."""
|
||||
from .client import SPBClient # noqa: F401
|
||||
|
||||
__version__ = "0.0.1"
|
||||
__version__ = "0.0.4"
|
||||
|
|
|
@ -8,11 +8,24 @@ from .types import Blacklist
|
|||
|
||||
class SPBClient:
|
||||
def __init__(
|
||||
self, *, host: str = "https://api.intellivoid.net/spamprotection/v1/lookup"
|
||||
self,
|
||||
*,
|
||||
host: str = "https://api.intellivoid.net/spamprotection/v1/lookup"
|
||||
) -> None:
|
||||
self._host = host
|
||||
|
||||
async def do_request(self, user_id: str, method: str = "get"):
|
||||
async def do_request(
|
||||
self,
|
||||
user_id: str,
|
||||
):
|
||||
"""[Requests to the url]
|
||||
|
||||
Args:
|
||||
user_id (str): [username or user_id can be passed into the arg]
|
||||
|
||||
Returns:
|
||||
[json]: [json response of the output]
|
||||
"""
|
||||
async with aiohttp.ClientSession() as ses:
|
||||
request = await ses.get(f"{self._host}?query={user_id}")
|
||||
try:
|
||||
|
@ -20,18 +33,40 @@ class SPBClient:
|
|||
except JSONDecodeError:
|
||||
return await request.text(), request
|
||||
|
||||
async def raw_output(self, user_id: Union[int, str]) -> Union[Blacklist, bool]:
|
||||
async def raw_output(
|
||||
self,
|
||||
user_id: Union[int, str]
|
||||
):
|
||||
"""[raw json output]
|
||||
|
||||
Args:
|
||||
user_id (Union[int, str]): [can pass user_id or username]
|
||||
|
||||
Returns:
|
||||
[json]: [returns json response]
|
||||
"""
|
||||
try:
|
||||
data, _ = await self.do_request(user_id)
|
||||
return data
|
||||
except UnknownError:
|
||||
return False
|
||||
|
||||
async def check_blacklist(self, user_id: Union[int, str]) -> Union[Blacklist, bool]:
|
||||
async def check_blacklist(
|
||||
self,
|
||||
user_id: Union[int, str]
|
||||
) -> Union[Blacklist, bool]:
|
||||
"""[checks spb for blacklist]
|
||||
|
||||
Args:
|
||||
user_id (Union[int, str]): [can pass user_id or username]
|
||||
|
||||
Returns:
|
||||
Union[Blacklist, bool]: [Blacklist type]
|
||||
"""
|
||||
try:
|
||||
data, _ = await self.do_request(user_id)
|
||||
return Blacklist(**data)
|
||||
except UnknownError:
|
||||
return False
|
||||
except aiohttp.client_exceptions.ClientConnectorError:
|
||||
return "Api is down at the moment"
|
||||
return "Api is down at the moment"
|
||||
|
|
|
@ -0,0 +1,70 @@
|
|||
import requests
|
||||
|
||||
from .errors import UnknownError
|
||||
from .types import Blacklist
|
||||
|
||||
from json import JSONDecodeError
|
||||
from typing import Union
|
||||
|
||||
|
||||
class SPBClient:
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
host: str = "https://api.intellivoid.net/spamprotection/v1/lookup"
|
||||
) -> None:
|
||||
self._host = host
|
||||
|
||||
def do_request(
|
||||
self,
|
||||
user_id: str,
|
||||
):
|
||||
"""[Requests to the url]
|
||||
|
||||
Args:
|
||||
user_id (str): [username or user_id can be passed into the arg]
|
||||
|
||||
Returns:
|
||||
[json]: [json response of the output]
|
||||
"""
|
||||
request = requests.get(f"{self._host}?query={user_id}")
|
||||
try:
|
||||
return request.json(), request
|
||||
except JSONDecodeError:
|
||||
return request.text(), request
|
||||
|
||||
def raw_output(
|
||||
self,
|
||||
user_id: Union[int, str]
|
||||
):
|
||||
"""[raw json output]
|
||||
|
||||
Args:
|
||||
user_id (Union[int, str]): [can pass user_id or username]
|
||||
|
||||
Returns:
|
||||
[json]: [returns json response]
|
||||
"""
|
||||
try:
|
||||
data, _ = self.do_request(user_id)
|
||||
return data
|
||||
except UnknownError:
|
||||
return False
|
||||
|
||||
def check_blacklist(
|
||||
self,
|
||||
user_id: Union[int, str]
|
||||
) -> Union[Blacklist, bool]:
|
||||
"""[checks spb for blacklist]
|
||||
|
||||
Args:
|
||||
user_id (Union[int, str]): [can pass user_id or username]
|
||||
|
||||
Returns:
|
||||
Union[Blacklist, bool]: [Blacklist type]
|
||||
"""
|
||||
try:
|
||||
data, _ = self.do_request(user_id)
|
||||
return Blacklist(**data)
|
||||
except UnknownError:
|
||||
return False
|
|
@ -1,4 +1,5 @@
|
|||
from datetime import datetime
|
||||
from .utils import attrdict
|
||||
|
||||
|
||||
class SPB:
|
||||
|
@ -10,29 +11,21 @@ class SPB:
|
|||
|
||||
|
||||
class Blacklist(SPB):
|
||||
def __init__(self, success: bool, response_code: int, results=dict, **kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
success: bool,
|
||||
response_code: int,
|
||||
results=dict,
|
||||
**kwargs
|
||||
):
|
||||
self.success = success
|
||||
self.response_code = response_code
|
||||
self.private_telegram_id = results["private_telegram_id"]
|
||||
self.entity_type = results["entity_type"]
|
||||
# attributes
|
||||
self.is_blacklisted = results["attributes"]["is_blacklisted"]
|
||||
self.blacklist_flag = results["attributes"]["blacklist_flag"]
|
||||
self.blacklist_reason = results["attributes"]["blacklist_reason"]
|
||||
self.original_private_id = results["attributes"]["original_private_id"]
|
||||
self.is_potential_spammer = results["attributes"]["is_potential_spammer"]
|
||||
self.is_operator = results["attributes"]["is_operator"]
|
||||
self.is_agent = results["attributes"]["is_agent"]
|
||||
self.is_whitelisted = results["attributes"]["is_whitelisted"]
|
||||
self.intellivoid_accounts_verified = results["attributes"][
|
||||
"intellivoid_accounts_verified"
|
||||
]
|
||||
self.is_official = results["attributes"]["is_official"]
|
||||
# language_prediction
|
||||
self.language = results["language_prediction"]["language"]
|
||||
self.probability = results["language_prediction"]["probability"]
|
||||
# spam_prediction
|
||||
self.ham_prediction = results["spam_prediction"]["ham_prediction"]
|
||||
self.spam_prediction = results["spam_prediction"]["spam_prediction"]
|
||||
# last_updated
|
||||
self.last_updated = datetime.fromtimestamp(results["last_updated"])
|
||||
try:
|
||||
self.private_telegram_id = results["private_telegram_id"]
|
||||
self.entity_type = results["entity_type"]
|
||||
self.attributes = attrdict(results["attributes"])
|
||||
self.language_prediction = attrdict(results["language_prediction"])
|
||||
self.spam_prediction = attrdict(results["spam_prediction"])
|
||||
self.last_updated = datetime.fromtimestamp(results["last_updated"])
|
||||
except TypeError:
|
||||
pass
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
class attrdict(dict):
|
||||
def __init__(self, *args, **kwargs):
|
||||
dict.__init__(self, *args, **kwargs)
|
||||
|
||||
def __getattr__(self, name):
|
||||
return self[name]
|
Loading…
Reference in New Issue